diff options
Diffstat (limited to 'opendc-telemetry')
12 files changed, 935 insertions, 34 deletions
diff --git a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts new file mode 100644 index 00000000..cd8cb57a --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Telemetry for OpenDC Compute" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcTelemetry.opendcTelemetrySdk) + + implementation(libs.opentelemetry.semconv) + implementation(libs.kotlin.logging) +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt new file mode 100644 index 00000000..738ec38b --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt @@ -0,0 +1,486 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute + +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.data.PointData +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import org.opendc.telemetry.compute.table.* +import java.time.Instant +import kotlin.math.roundToLong + +/** + * Helper class responsible for aggregating [MetricData] into [ServiceData], [HostData] and [ServerData]. + */ +public class ComputeMetricAggregator { + private val _service = ServiceAggregator() + private val _hosts = mutableMapOf<String, HostAggregator>() + private val _servers = mutableMapOf<String, ServerAggregator>() + + /** + * Process the specified [metrics] for this cycle. + */ + public fun process(metrics: Collection<MetricData>) { + val service = _service + val hosts = _hosts + val servers = _servers + + for (metric in metrics) { + val resource = metric.resource + + when (metric.name) { + // ComputeService + "scheduler.hosts" -> { + for (point in metric.longSumData.points) { + // Record the timestamp for the service + service.recordTimestamp(point) + + when (point.attributes[STATE_KEY]) { + "up" -> service.hostsUp = point.value.toInt() + "down" -> service.hostsDown = point.value.toInt() + } + } + } + "scheduler.servers" -> { + for (point in metric.longSumData.points) { + when (point.attributes[STATE_KEY]) { + "pending" -> service.serversPending = point.value.toInt() + "active" -> service.serversActive = point.value.toInt() + } + } + } + "scheduler.attempts" -> { + for (point in metric.longSumData.points) { + when (point.attributes[RESULT_KEY]) { + "success" -> service.attemptsSuccess = point.value.toInt() + "failure" -> service.attemptsFailure = point.value.toInt() + "error" -> service.attemptsError = point.value.toInt() + } + } + } + "scheduler.latency" -> { + for (point in metric.doubleHistogramData.points) { + val server = getServer(servers, point) ?: continue + server.schedulingLatency = (point.sum / point.count).roundToLong() + } + } + + // SimHost + "system.guests" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.longSumData.points) { + when (point.attributes[STATE_KEY]) { + "terminated" -> agg.guestsTerminated = point.value.toInt() + "running" -> agg.guestsRunning = point.value.toInt() + "error" -> agg.guestsRunning = point.value.toInt() + "invalid" -> agg.guestsInvalid = point.value.toInt() + } + } + } + "system.cpu.limit" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.doubleGaugeData.points) { + val server = getServer(servers, point) + + if (server != null) { + server.cpuLimit = point.value + server.host = agg.host + } else { + agg.cpuLimit = point.value + } + } + } + "system.cpu.usage" -> { + val agg = getHost(hosts, resource) ?: continue + agg.cpuUsage = metric.doubleGaugeData.points.first().value + } + "system.cpu.demand" -> { + val agg = getHost(hosts, resource) ?: continue + agg.cpuDemand = metric.doubleGaugeData.points.first().value + } + "system.cpu.utilization" -> { + val agg = getHost(hosts, resource) ?: continue + agg.cpuUtilization = metric.doubleGaugeData.points.first().value + } + "system.cpu.time" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.longSumData.points) { + val server = getServer(servers, point) + val state = point.attributes[STATE_KEY] + if (server != null) { + when (state) { + "active" -> server.cpuActiveTime = point.value + "idle" -> server.cpuIdleTime = point.value + "steal" -> server.cpuStealTime = point.value + "lost" -> server.cpuLostTime = point.value + } + server.host = agg.host + } else { + when (state) { + "active" -> agg.cpuActiveTime = point.value + "idle" -> agg.cpuIdleTime = point.value + "steal" -> agg.cpuStealTime = point.value + "lost" -> agg.cpuLostTime = point.value + } + } + } + } + "system.power.usage" -> { + val agg = getHost(hosts, resource) ?: continue + agg.powerUsage = metric.doubleGaugeData.points.first().value + } + "system.power.total" -> { + val agg = getHost(hosts, resource) ?: continue + agg.powerTotal = metric.doubleSumData.points.first().value + } + "system.time" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.longSumData.points) { + val server = getServer(servers, point) + + if (server != null) { + server.recordTimestamp(point) + + when (point.attributes[STATE_KEY]) { + "up" -> server.uptime = point.value + "down" -> server.downtime = point.value + } + server.host = agg.host + } else { + agg.recordTimestamp(point) + + when (point.attributes[STATE_KEY]) { + "up" -> agg.uptime = point.value + "down" -> agg.downtime = point.value + } + } + } + } + "system.time.boot" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.longGaugeData.points) { + val server = getServer(servers, point) + + if (server != null) { + server.bootTime = point.value + server.host = agg.host + } else { + agg.bootTime = point.value + } + } + } + } + } + } + + /** + * Collect the data via the [monitor]. + */ + public fun collect(monitor: ComputeMonitor) { + monitor.record(_service.collect()) + + for (host in _hosts.values) { + monitor.record(host.collect()) + } + + for (server in _servers.values) { + monitor.record(server.collect()) + } + } + + /** + * Obtain the [HostAggregator] for the specified [resource]. + */ + private fun getHost(hosts: MutableMap<String, HostAggregator>, resource: Resource): HostAggregator? { + val id = resource.attributes[HOST_ID] + return if (id != null) { + hosts.computeIfAbsent(id) { HostAggregator(resource) } + } else { + null + } + } + + /** + * Obtain the [ServerAggregator] for the specified [point]. + */ + private fun getServer(servers: MutableMap<String, ServerAggregator>, point: PointData): ServerAggregator? { + val id = point.attributes[ResourceAttributes.HOST_ID] + return if (id != null) { + servers.computeIfAbsent(id) { ServerAggregator(point.attributes) } + } else { + null + } + } + + /** + * An aggregator for service metrics before they are reported. + */ + internal class ServiceAggregator { + private var timestamp = Long.MIN_VALUE + + @JvmField var hostsUp = 0 + @JvmField var hostsDown = 0 + + @JvmField var serversPending = 0 + @JvmField var serversActive = 0 + + @JvmField var attemptsSuccess = 0 + @JvmField var attemptsFailure = 0 + @JvmField var attemptsError = 0 + + /** + * Finish the aggregation for this cycle. + */ + fun collect(): ServiceData { + val now = Instant.ofEpochMilli(timestamp) + return toServiceData(now) + } + + /** + * Convert the aggregator state to an immutable [ServiceData]. + */ + private fun toServiceData(now: Instant): ServiceData { + return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) + } + + /** + * Record the timestamp of a [point] for this aggregator. + */ + fun recordTimestamp(point: PointData) { + timestamp = point.epochNanos / 1_000_000L // ns to ms + } + } + + /** + * An aggregator for host metrics before they are reported. + */ + internal class HostAggregator(resource: Resource) { + /** + * The static information about this host. + */ + val host = HostInfo( + resource.attributes[HOST_ID]!!, + resource.attributes[HOST_NAME] ?: "", + resource.attributes[HOST_ARCH] ?: "", + resource.attributes[HOST_NCPUS]?.toInt() ?: 0, + resource.attributes[HOST_MEM_CAPACITY] ?: 0, + ) + + private var timestamp = Long.MIN_VALUE + + @JvmField var guestsTerminated = 0 + @JvmField var guestsRunning = 0 + @JvmField var guestsError = 0 + @JvmField var guestsInvalid = 0 + + @JvmField var cpuLimit = 0.0 + @JvmField var cpuUsage = 0.0 + @JvmField var cpuDemand = 0.0 + @JvmField var cpuUtilization = 0.0 + + @JvmField var cpuActiveTime = 0L + @JvmField var cpuIdleTime = 0L + @JvmField var cpuStealTime = 0L + @JvmField var cpuLostTime = 0L + private var previousCpuActiveTime = 0L + private var previousCpuIdleTime = 0L + private var previousCpuStealTime = 0L + private var previousCpuLostTime = 0L + + @JvmField var powerUsage = 0.0 + @JvmField var powerTotal = 0.0 + private var previousPowerTotal = 0.0 + + @JvmField var uptime = 0L + private var previousUptime = 0L + @JvmField var downtime = 0L + private var previousDowntime = 0L + @JvmField var bootTime = Long.MIN_VALUE + + /** + * Finish the aggregation for this cycle. + */ + fun collect(): HostData { + val now = Instant.ofEpochMilli(timestamp) + val data = toHostData(now) + + // Reset intermediate state for next aggregation + previousCpuActiveTime = cpuActiveTime + previousCpuIdleTime = cpuIdleTime + previousCpuStealTime = cpuStealTime + previousCpuLostTime = cpuLostTime + previousPowerTotal = powerTotal + previousUptime = uptime + previousDowntime = downtime + + guestsTerminated = 0 + guestsRunning = 0 + guestsError = 0 + guestsInvalid = 0 + + cpuLimit = 0.0 + cpuUsage = 0.0 + cpuDemand = 0.0 + cpuUtilization = 0.0 + + powerUsage = 0.0 + + return data + } + + /** + * Convert the aggregator state to an immutable [HostData] instance. + */ + private fun toHostData(now: Instant): HostData { + return HostData( + now, + host, + guestsTerminated, + guestsRunning, + guestsError, + guestsInvalid, + cpuLimit, + cpuUsage, + cpuDemand, + cpuUtilization, + cpuActiveTime - previousCpuActiveTime, + cpuIdleTime - previousCpuIdleTime, + cpuStealTime - previousCpuStealTime, + cpuLostTime - previousCpuLostTime, + powerUsage, + powerTotal - previousPowerTotal, + uptime - previousUptime, + downtime - previousDowntime, + if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null + ) + } + + /** + * Record the timestamp of a [point] for this aggregator. + */ + fun recordTimestamp(point: PointData) { + timestamp = point.epochNanos / 1_000_000L // ns to ms + } + } + + /** + * An aggregator for server metrics before they are reported. + */ + internal class ServerAggregator(attributes: Attributes) { + /** + * The static information about this server. + */ + val server = ServerInfo( + attributes[ResourceAttributes.HOST_ID]!!, + attributes[ResourceAttributes.HOST_NAME]!!, + attributes[ResourceAttributes.HOST_TYPE]!!, + attributes[ResourceAttributes.HOST_ARCH]!!, + attributes[ResourceAttributes.HOST_IMAGE_ID]!!, + attributes[ResourceAttributes.HOST_IMAGE_NAME]!!, + attributes[AttributeKey.longKey("host.num_cpus")]!!.toInt(), + attributes[AttributeKey.longKey("host.mem_capacity")]!!, + ) + + /** + * The [HostInfo] of the host on which the server is hosted. + */ + @JvmField var host: HostInfo? = null + + private var timestamp = Long.MIN_VALUE + @JvmField var uptime: Long = 0 + private var previousUptime = 0L + @JvmField var downtime: Long = 0 + private var previousDowntime = 0L + @JvmField var bootTime: Long = 0 + @JvmField var schedulingLatency = 0L + @JvmField var cpuLimit = 0.0 + @JvmField var cpuActiveTime = 0L + @JvmField var cpuIdleTime = 0L + @JvmField var cpuStealTime = 0L + @JvmField var cpuLostTime = 0L + private var previousCpuActiveTime = 0L + private var previousCpuIdleTime = 0L + private var previousCpuStealTime = 0L + private var previousCpuLostTime = 0L + + /** + * Finish the aggregation for this cycle. + */ + fun collect(): ServerData { + val now = Instant.ofEpochMilli(timestamp) + val data = toServerData(now) + + previousUptime = uptime + previousDowntime = downtime + previousCpuActiveTime = cpuActiveTime + previousCpuIdleTime = cpuIdleTime + previousCpuStealTime = cpuStealTime + previousCpuLostTime = cpuLostTime + + host = null + cpuLimit = 0.0 + + return data + } + + /** + * Convert the aggregator state into an immutable [ServerData]. + */ + private fun toServerData(now: Instant): ServerData { + return ServerData( + now, + server, + host, + uptime - previousUptime, + downtime - previousDowntime, + if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null, + schedulingLatency, + cpuLimit, + cpuActiveTime - previousCpuActiveTime, + cpuIdleTime - previousCpuIdleTime, + cpuStealTime - previousCpuStealTime, + cpuLostTime - previousCpuLostTime + ) + } + + /** + * Record the timestamp of a [point] for this aggregator. + */ + fun recordTimestamp(point: PointData) { + timestamp = point.epochNanos / 1_000_000L // ns to ms + } + } + + private companion object { + private val STATE_KEY = AttributeKey.stringKey("state") + private val RESULT_KEY = AttributeKey.stringKey("result") + } +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt new file mode 100644 index 00000000..3ab6c7b2 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute + +import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.metrics.data.* +import io.opentelemetry.sdk.metrics.export.MetricExporter +import mu.KotlinLogging + +/** + * A [MetricExporter] that redirects data to a [ComputeMonitor] implementation. + */ +public abstract class ComputeMetricExporter : MetricExporter, ComputeMonitor { + /** + * The logging instance for this exporter. + */ + private val logger = KotlinLogging.logger {} + + /** + * A [ComputeMetricAggregator] that actually performs the aggregation. + */ + private val agg = ComputeMetricAggregator() + + override fun export(metrics: Collection<MetricData>): CompletableResultCode { + return try { + agg.process(metrics) + agg.collect(this) + + CompletableResultCode.ofSuccess() + } catch (e: Throwable) { + logger.warn(e) { "Failed to export results" } + CompletableResultCode.ofFailure() + } + } + + override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + + override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt new file mode 100644 index 00000000..d51bcab4 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute + +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServerData +import org.opendc.telemetry.compute.table.ServiceData + +/** + * A monitor that tracks the metrics and events of the OpenDC Compute service. + */ +public interface ComputeMonitor { + /** + * Record the specified [data]. + */ + public fun record(data: ServerData) {} + + /** + * Record the specified [data]. + */ + public fun record(data: HostData) {} + + /** + * Record the specified [data]. + */ + public fun record(data: ServiceData) {} +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt new file mode 100644 index 00000000..ce89061b --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute + +import io.opentelemetry.sdk.metrics.export.MetricProducer +import org.opendc.telemetry.compute.table.ServiceData + +/** + * Collect the metrics of the compute service. + */ +public fun collectServiceMetrics(metricProducer: MetricProducer): ServiceData { + lateinit var serviceData: ServiceData + val agg = ComputeMetricAggregator() + val monitor = object : ComputeMonitor { + override fun record(data: ServiceData) { + serviceData = data + } + } + + agg.process(metricProducer.collectAllMetrics()) + agg.collect(monitor) + return serviceData +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt new file mode 100644 index 00000000..7dca6186 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("HostAttributes") +package org.opendc.telemetry.compute + +import io.opentelemetry.api.common.AttributeKey + +/** + * The identifier of the node hosting virtual machines. + */ +public val HOST_ID: AttributeKey<String> = AttributeKey.stringKey("node.id") + +/** + * The name of the node hosting virtual machines. + */ +public val HOST_NAME: AttributeKey<String> = AttributeKey.stringKey("node.name") + +/** + * The CPU architecture of the host node. + */ +public val HOST_ARCH: AttributeKey<String> = AttributeKey.stringKey("node.arch") + +/** + * The number of CPUs in the host node. + */ +public val HOST_NCPUS: AttributeKey<Long> = AttributeKey.longKey("node.num_cpus") + +/** + * The amount of memory installed in the host node in MiB. + */ +public val HOST_MEM_CAPACITY: AttributeKey<Long> = AttributeKey.longKey("node.mem_capacity") diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt new file mode 100644 index 00000000..8e787b97 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +import java.time.Instant + +/** + * A trace entry for a particular host. + */ +public data class HostData( + val timestamp: Instant, + val host: HostInfo, + val guestsTerminated: Int, + val guestsRunning: Int, + val guestsError: Int, + val guestsInvalid: Int, + val cpuLimit: Double, + val cpuUsage: Double, + val cpuDemand: Double, + val cpuUtilization: Double, + val cpuActiveTime: Long, + val cpuIdleTime: Long, + val cpuStealTime: Long, + val cpuLostTime: Long, + val powerUsage: Double, + val powerTotal: Double, + val uptime: Long, + val downtime: Long, + val bootTime: Instant? +) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt new file mode 100644 index 00000000..d9a5906b --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +/** + * Information about a host exposed to the telemetry service. + */ +public data class HostInfo(val id: String, val name: String, val arch: String, val cpuCount: Int, val memCapacity: Long) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt new file mode 100644 index 00000000..c48bff3a --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +import java.time.Instant + +/** + * A trace entry for a particular server. + */ +public data class ServerData( + val timestamp: Instant, + val server: ServerInfo, + val host: HostInfo?, + val uptime: Long, + val downtime: Long, + val bootTime: Instant?, + val schedulingLatency: Long, + val cpuLimit: Double, + val cpuActiveTime: Long, + val cpuIdleTime: Long, + val cpuStealTime: Long, + val cpuLostTime: Long, +) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt new file mode 100644 index 00000000..b16e5f3d --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +/** + * Static information about a server exposed to the telemetry service. + */ +public data class ServerInfo( + val id: String, + val name: String, + val type: String, + val arch: String, + val imageId: String, + val imageName: String, + val cpuCount: Int, + val memCapacity: Long +) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt new file mode 100644 index 00000000..6db1399d --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +import java.time.Instant + +/** + * A trace entry for the compute service. + */ +public data class ServiceData( + val timestamp: Instant, + val hostsUp: Int, + val hostsDown: Int, + val serversPending: Int, + val serversActive: Int, + val attemptsSuccess: Int, + val attemptsFailure: Int, + val attemptsError: Int +) diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt index 9ee16fac..1de235e7 100644 --- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt +++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt @@ -22,18 +22,11 @@ package org.opendc.telemetry.sdk.metrics.export -import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricExporter import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.consumeAsFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import mu.KotlinLogging -import java.util.* -import kotlin.coroutines.resume -import kotlin.coroutines.suspendCoroutine +import java.time.Duration /** * A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every @@ -44,56 +37,45 @@ import kotlin.coroutines.suspendCoroutine * @param scope The [CoroutineScope] to run the reader in. * @param producers The metric producers to gather metrics from. * @param exporter The export to export the metrics to. - * @param exportInterval The export interval in milliseconds. + * @param exportInterval The export interval. */ public class CoroutineMetricReader( scope: CoroutineScope, private val producers: List<MetricProducer>, private val exporter: MetricExporter, - private val exportInterval: Long = 60_000 + private val exportInterval: Duration = Duration.ofMinutes(5) ) : AutoCloseable { private val logger = KotlinLogging.logger {} - private val chan = Channel<List<MetricData>>(Channel.RENDEZVOUS) /** - * The metric reader job. + * The background job that is responsible for collecting the metrics every cycle. */ - private val readerJob = scope.launch { - while (isActive) { - delay(exportInterval) + private val job = scope.launch { + val intervalMs = exportInterval.toMillis() - val metrics = mutableListOf<MetricData>() - for (producer in producers) { - metrics.addAll(producer.collectAllMetrics()) - } - chan.send(Collections.unmodifiableList(metrics)) - } - } + try { + while (isActive) { + delay(intervalMs) + + val metrics = producers.flatMap(MetricProducer::collectAllMetrics) - /** - * The exporter job runs in the background to actually export the metrics. - */ - private val exporterJob = chan.consumeAsFlow() - .onEach { metrics -> - suspendCoroutine<Unit> { cont -> try { val result = exporter.export(metrics) result.whenComplete { if (!result.isSuccess) { - logger.trace { "Exporter failed" } + logger.warn { "Exporter failed" } } - cont.resume(Unit) } } catch (cause: Throwable) { logger.warn(cause) { "Exporter threw an Exception" } - cont.resume(Unit) } } + } finally { + exporter.shutdown() } - .launchIn(scope) + } override fun close() { - readerJob.cancel() - exporterJob.cancel() + job.cancel() } } |
