summaryrefslogtreecommitdiff
path: root/opendc-telemetry
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-telemetry')
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/build.gradle.kts36
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt486
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt59
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt47
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt43
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt51
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt50
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt28
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt43
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt37
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt39
-rw-r--r--opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt50
12 files changed, 935 insertions, 34 deletions
diff --git a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
new file mode 100644
index 00000000..cd8cb57a
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Telemetry for OpenDC Compute"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+}
+
+dependencies {
+ api(platform(projects.opendcPlatform))
+ api(projects.opendcTelemetry.opendcTelemetrySdk)
+
+ implementation(libs.opentelemetry.semconv)
+ implementation(libs.kotlin.logging)
+}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
new file mode 100644
index 00000000..738ec38b
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
@@ -0,0 +1,486 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute
+
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.data.PointData
+import io.opentelemetry.sdk.resources.Resource
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import org.opendc.telemetry.compute.table.*
+import java.time.Instant
+import kotlin.math.roundToLong
+
+/**
+ * Helper class responsible for aggregating [MetricData] into [ServiceData], [HostData] and [ServerData].
+ */
+public class ComputeMetricAggregator {
+ private val _service = ServiceAggregator()
+ private val _hosts = mutableMapOf<String, HostAggregator>()
+ private val _servers = mutableMapOf<String, ServerAggregator>()
+
+ /**
+ * Process the specified [metrics] for this cycle.
+ */
+ public fun process(metrics: Collection<MetricData>) {
+ val service = _service
+ val hosts = _hosts
+ val servers = _servers
+
+ for (metric in metrics) {
+ val resource = metric.resource
+
+ when (metric.name) {
+ // ComputeService
+ "scheduler.hosts" -> {
+ for (point in metric.longSumData.points) {
+ // Record the timestamp for the service
+ service.recordTimestamp(point)
+
+ when (point.attributes[STATE_KEY]) {
+ "up" -> service.hostsUp = point.value.toInt()
+ "down" -> service.hostsDown = point.value.toInt()
+ }
+ }
+ }
+ "scheduler.servers" -> {
+ for (point in metric.longSumData.points) {
+ when (point.attributes[STATE_KEY]) {
+ "pending" -> service.serversPending = point.value.toInt()
+ "active" -> service.serversActive = point.value.toInt()
+ }
+ }
+ }
+ "scheduler.attempts" -> {
+ for (point in metric.longSumData.points) {
+ when (point.attributes[RESULT_KEY]) {
+ "success" -> service.attemptsSuccess = point.value.toInt()
+ "failure" -> service.attemptsFailure = point.value.toInt()
+ "error" -> service.attemptsError = point.value.toInt()
+ }
+ }
+ }
+ "scheduler.latency" -> {
+ for (point in metric.doubleHistogramData.points) {
+ val server = getServer(servers, point) ?: continue
+ server.schedulingLatency = (point.sum / point.count).roundToLong()
+ }
+ }
+
+ // SimHost
+ "system.guests" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longSumData.points) {
+ when (point.attributes[STATE_KEY]) {
+ "terminated" -> agg.guestsTerminated = point.value.toInt()
+ "running" -> agg.guestsRunning = point.value.toInt()
+ "error" -> agg.guestsRunning = point.value.toInt()
+ "invalid" -> agg.guestsInvalid = point.value.toInt()
+ }
+ }
+ }
+ "system.cpu.limit" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.doubleGaugeData.points) {
+ val server = getServer(servers, point)
+
+ if (server != null) {
+ server.cpuLimit = point.value
+ server.host = agg.host
+ } else {
+ agg.cpuLimit = point.value
+ }
+ }
+ }
+ "system.cpu.usage" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.cpuUsage = metric.doubleGaugeData.points.first().value
+ }
+ "system.cpu.demand" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.cpuDemand = metric.doubleGaugeData.points.first().value
+ }
+ "system.cpu.utilization" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.cpuUtilization = metric.doubleGaugeData.points.first().value
+ }
+ "system.cpu.time" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longSumData.points) {
+ val server = getServer(servers, point)
+ val state = point.attributes[STATE_KEY]
+ if (server != null) {
+ when (state) {
+ "active" -> server.cpuActiveTime = point.value
+ "idle" -> server.cpuIdleTime = point.value
+ "steal" -> server.cpuStealTime = point.value
+ "lost" -> server.cpuLostTime = point.value
+ }
+ server.host = agg.host
+ } else {
+ when (state) {
+ "active" -> agg.cpuActiveTime = point.value
+ "idle" -> agg.cpuIdleTime = point.value
+ "steal" -> agg.cpuStealTime = point.value
+ "lost" -> agg.cpuLostTime = point.value
+ }
+ }
+ }
+ }
+ "system.power.usage" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.powerUsage = metric.doubleGaugeData.points.first().value
+ }
+ "system.power.total" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.powerTotal = metric.doubleSumData.points.first().value
+ }
+ "system.time" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longSumData.points) {
+ val server = getServer(servers, point)
+
+ if (server != null) {
+ server.recordTimestamp(point)
+
+ when (point.attributes[STATE_KEY]) {
+ "up" -> server.uptime = point.value
+ "down" -> server.downtime = point.value
+ }
+ server.host = agg.host
+ } else {
+ agg.recordTimestamp(point)
+
+ when (point.attributes[STATE_KEY]) {
+ "up" -> agg.uptime = point.value
+ "down" -> agg.downtime = point.value
+ }
+ }
+ }
+ }
+ "system.time.boot" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longGaugeData.points) {
+ val server = getServer(servers, point)
+
+ if (server != null) {
+ server.bootTime = point.value
+ server.host = agg.host
+ } else {
+ agg.bootTime = point.value
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Collect the data via the [monitor].
+ */
+ public fun collect(monitor: ComputeMonitor) {
+ monitor.record(_service.collect())
+
+ for (host in _hosts.values) {
+ monitor.record(host.collect())
+ }
+
+ for (server in _servers.values) {
+ monitor.record(server.collect())
+ }
+ }
+
+ /**
+ * Obtain the [HostAggregator] for the specified [resource].
+ */
+ private fun getHost(hosts: MutableMap<String, HostAggregator>, resource: Resource): HostAggregator? {
+ val id = resource.attributes[HOST_ID]
+ return if (id != null) {
+ hosts.computeIfAbsent(id) { HostAggregator(resource) }
+ } else {
+ null
+ }
+ }
+
+ /**
+ * Obtain the [ServerAggregator] for the specified [point].
+ */
+ private fun getServer(servers: MutableMap<String, ServerAggregator>, point: PointData): ServerAggregator? {
+ val id = point.attributes[ResourceAttributes.HOST_ID]
+ return if (id != null) {
+ servers.computeIfAbsent(id) { ServerAggregator(point.attributes) }
+ } else {
+ null
+ }
+ }
+
+ /**
+ * An aggregator for service metrics before they are reported.
+ */
+ internal class ServiceAggregator {
+ private var timestamp = Long.MIN_VALUE
+
+ @JvmField var hostsUp = 0
+ @JvmField var hostsDown = 0
+
+ @JvmField var serversPending = 0
+ @JvmField var serversActive = 0
+
+ @JvmField var attemptsSuccess = 0
+ @JvmField var attemptsFailure = 0
+ @JvmField var attemptsError = 0
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun collect(): ServiceData {
+ val now = Instant.ofEpochMilli(timestamp)
+ return toServiceData(now)
+ }
+
+ /**
+ * Convert the aggregator state to an immutable [ServiceData].
+ */
+ private fun toServiceData(now: Instant): ServiceData {
+ return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError)
+ }
+
+ /**
+ * Record the timestamp of a [point] for this aggregator.
+ */
+ fun recordTimestamp(point: PointData) {
+ timestamp = point.epochNanos / 1_000_000L // ns to ms
+ }
+ }
+
+ /**
+ * An aggregator for host metrics before they are reported.
+ */
+ internal class HostAggregator(resource: Resource) {
+ /**
+ * The static information about this host.
+ */
+ val host = HostInfo(
+ resource.attributes[HOST_ID]!!,
+ resource.attributes[HOST_NAME] ?: "",
+ resource.attributes[HOST_ARCH] ?: "",
+ resource.attributes[HOST_NCPUS]?.toInt() ?: 0,
+ resource.attributes[HOST_MEM_CAPACITY] ?: 0,
+ )
+
+ private var timestamp = Long.MIN_VALUE
+
+ @JvmField var guestsTerminated = 0
+ @JvmField var guestsRunning = 0
+ @JvmField var guestsError = 0
+ @JvmField var guestsInvalid = 0
+
+ @JvmField var cpuLimit = 0.0
+ @JvmField var cpuUsage = 0.0
+ @JvmField var cpuDemand = 0.0
+ @JvmField var cpuUtilization = 0.0
+
+ @JvmField var cpuActiveTime = 0L
+ @JvmField var cpuIdleTime = 0L
+ @JvmField var cpuStealTime = 0L
+ @JvmField var cpuLostTime = 0L
+ private var previousCpuActiveTime = 0L
+ private var previousCpuIdleTime = 0L
+ private var previousCpuStealTime = 0L
+ private var previousCpuLostTime = 0L
+
+ @JvmField var powerUsage = 0.0
+ @JvmField var powerTotal = 0.0
+ private var previousPowerTotal = 0.0
+
+ @JvmField var uptime = 0L
+ private var previousUptime = 0L
+ @JvmField var downtime = 0L
+ private var previousDowntime = 0L
+ @JvmField var bootTime = Long.MIN_VALUE
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun collect(): HostData {
+ val now = Instant.ofEpochMilli(timestamp)
+ val data = toHostData(now)
+
+ // Reset intermediate state for next aggregation
+ previousCpuActiveTime = cpuActiveTime
+ previousCpuIdleTime = cpuIdleTime
+ previousCpuStealTime = cpuStealTime
+ previousCpuLostTime = cpuLostTime
+ previousPowerTotal = powerTotal
+ previousUptime = uptime
+ previousDowntime = downtime
+
+ guestsTerminated = 0
+ guestsRunning = 0
+ guestsError = 0
+ guestsInvalid = 0
+
+ cpuLimit = 0.0
+ cpuUsage = 0.0
+ cpuDemand = 0.0
+ cpuUtilization = 0.0
+
+ powerUsage = 0.0
+
+ return data
+ }
+
+ /**
+ * Convert the aggregator state to an immutable [HostData] instance.
+ */
+ private fun toHostData(now: Instant): HostData {
+ return HostData(
+ now,
+ host,
+ guestsTerminated,
+ guestsRunning,
+ guestsError,
+ guestsInvalid,
+ cpuLimit,
+ cpuUsage,
+ cpuDemand,
+ cpuUtilization,
+ cpuActiveTime - previousCpuActiveTime,
+ cpuIdleTime - previousCpuIdleTime,
+ cpuStealTime - previousCpuStealTime,
+ cpuLostTime - previousCpuLostTime,
+ powerUsage,
+ powerTotal - previousPowerTotal,
+ uptime - previousUptime,
+ downtime - previousDowntime,
+ if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null
+ )
+ }
+
+ /**
+ * Record the timestamp of a [point] for this aggregator.
+ */
+ fun recordTimestamp(point: PointData) {
+ timestamp = point.epochNanos / 1_000_000L // ns to ms
+ }
+ }
+
+ /**
+ * An aggregator for server metrics before they are reported.
+ */
+ internal class ServerAggregator(attributes: Attributes) {
+ /**
+ * The static information about this server.
+ */
+ val server = ServerInfo(
+ attributes[ResourceAttributes.HOST_ID]!!,
+ attributes[ResourceAttributes.HOST_NAME]!!,
+ attributes[ResourceAttributes.HOST_TYPE]!!,
+ attributes[ResourceAttributes.HOST_ARCH]!!,
+ attributes[ResourceAttributes.HOST_IMAGE_ID]!!,
+ attributes[ResourceAttributes.HOST_IMAGE_NAME]!!,
+ attributes[AttributeKey.longKey("host.num_cpus")]!!.toInt(),
+ attributes[AttributeKey.longKey("host.mem_capacity")]!!,
+ )
+
+ /**
+ * The [HostInfo] of the host on which the server is hosted.
+ */
+ @JvmField var host: HostInfo? = null
+
+ private var timestamp = Long.MIN_VALUE
+ @JvmField var uptime: Long = 0
+ private var previousUptime = 0L
+ @JvmField var downtime: Long = 0
+ private var previousDowntime = 0L
+ @JvmField var bootTime: Long = 0
+ @JvmField var schedulingLatency = 0L
+ @JvmField var cpuLimit = 0.0
+ @JvmField var cpuActiveTime = 0L
+ @JvmField var cpuIdleTime = 0L
+ @JvmField var cpuStealTime = 0L
+ @JvmField var cpuLostTime = 0L
+ private var previousCpuActiveTime = 0L
+ private var previousCpuIdleTime = 0L
+ private var previousCpuStealTime = 0L
+ private var previousCpuLostTime = 0L
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun collect(): ServerData {
+ val now = Instant.ofEpochMilli(timestamp)
+ val data = toServerData(now)
+
+ previousUptime = uptime
+ previousDowntime = downtime
+ previousCpuActiveTime = cpuActiveTime
+ previousCpuIdleTime = cpuIdleTime
+ previousCpuStealTime = cpuStealTime
+ previousCpuLostTime = cpuLostTime
+
+ host = null
+ cpuLimit = 0.0
+
+ return data
+ }
+
+ /**
+ * Convert the aggregator state into an immutable [ServerData].
+ */
+ private fun toServerData(now: Instant): ServerData {
+ return ServerData(
+ now,
+ server,
+ host,
+ uptime - previousUptime,
+ downtime - previousDowntime,
+ if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null,
+ schedulingLatency,
+ cpuLimit,
+ cpuActiveTime - previousCpuActiveTime,
+ cpuIdleTime - previousCpuIdleTime,
+ cpuStealTime - previousCpuStealTime,
+ cpuLostTime - previousCpuLostTime
+ )
+ }
+
+ /**
+ * Record the timestamp of a [point] for this aggregator.
+ */
+ fun recordTimestamp(point: PointData) {
+ timestamp = point.epochNanos / 1_000_000L // ns to ms
+ }
+ }
+
+ private companion object {
+ private val STATE_KEY = AttributeKey.stringKey("state")
+ private val RESULT_KEY = AttributeKey.stringKey("result")
+ }
+}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
new file mode 100644
index 00000000..3ab6c7b2
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute
+
+import io.opentelemetry.sdk.common.CompletableResultCode
+import io.opentelemetry.sdk.metrics.data.*
+import io.opentelemetry.sdk.metrics.export.MetricExporter
+import mu.KotlinLogging
+
+/**
+ * A [MetricExporter] that redirects data to a [ComputeMonitor] implementation.
+ */
+public abstract class ComputeMetricExporter : MetricExporter, ComputeMonitor {
+ /**
+ * The logging instance for this exporter.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * A [ComputeMetricAggregator] that actually performs the aggregation.
+ */
+ private val agg = ComputeMetricAggregator()
+
+ override fun export(metrics: Collection<MetricData>): CompletableResultCode {
+ return try {
+ agg.process(metrics)
+ agg.collect(this)
+
+ CompletableResultCode.ofSuccess()
+ } catch (e: Throwable) {
+ logger.warn(e) { "Failed to export results" }
+ CompletableResultCode.ofFailure()
+ }
+ }
+
+ override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
+
+ override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
+}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
new file mode 100644
index 00000000..d51bcab4
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute
+
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServerData
+import org.opendc.telemetry.compute.table.ServiceData
+
+/**
+ * A monitor that tracks the metrics and events of the OpenDC Compute service.
+ */
+public interface ComputeMonitor {
+ /**
+ * Record the specified [data].
+ */
+ public fun record(data: ServerData) {}
+
+ /**
+ * Record the specified [data].
+ */
+ public fun record(data: HostData) {}
+
+ /**
+ * Record the specified [data].
+ */
+ public fun record(data: ServiceData) {}
+}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
new file mode 100644
index 00000000..ce89061b
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute
+
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import org.opendc.telemetry.compute.table.ServiceData
+
+/**
+ * Collect the metrics of the compute service.
+ */
+public fun collectServiceMetrics(metricProducer: MetricProducer): ServiceData {
+ lateinit var serviceData: ServiceData
+ val agg = ComputeMetricAggregator()
+ val monitor = object : ComputeMonitor {
+ override fun record(data: ServiceData) {
+ serviceData = data
+ }
+ }
+
+ agg.process(metricProducer.collectAllMetrics())
+ agg.collect(monitor)
+ return serviceData
+}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt
new file mode 100644
index 00000000..7dca6186
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("HostAttributes")
+package org.opendc.telemetry.compute
+
+import io.opentelemetry.api.common.AttributeKey
+
+/**
+ * The identifier of the node hosting virtual machines.
+ */
+public val HOST_ID: AttributeKey<String> = AttributeKey.stringKey("node.id")
+
+/**
+ * The name of the node hosting virtual machines.
+ */
+public val HOST_NAME: AttributeKey<String> = AttributeKey.stringKey("node.name")
+
+/**
+ * The CPU architecture of the host node.
+ */
+public val HOST_ARCH: AttributeKey<String> = AttributeKey.stringKey("node.arch")
+
+/**
+ * The number of CPUs in the host node.
+ */
+public val HOST_NCPUS: AttributeKey<Long> = AttributeKey.longKey("node.num_cpus")
+
+/**
+ * The amount of memory installed in the host node in MiB.
+ */
+public val HOST_MEM_CAPACITY: AttributeKey<Long> = AttributeKey.longKey("node.mem_capacity")
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
new file mode 100644
index 00000000..8e787b97
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute.table
+
+import java.time.Instant
+
+/**
+ * A trace entry for a particular host.
+ */
+public data class HostData(
+ val timestamp: Instant,
+ val host: HostInfo,
+ val guestsTerminated: Int,
+ val guestsRunning: Int,
+ val guestsError: Int,
+ val guestsInvalid: Int,
+ val cpuLimit: Double,
+ val cpuUsage: Double,
+ val cpuDemand: Double,
+ val cpuUtilization: Double,
+ val cpuActiveTime: Long,
+ val cpuIdleTime: Long,
+ val cpuStealTime: Long,
+ val cpuLostTime: Long,
+ val powerUsage: Double,
+ val powerTotal: Double,
+ val uptime: Long,
+ val downtime: Long,
+ val bootTime: Instant?
+)
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt
new file mode 100644
index 00000000..d9a5906b
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute.table
+
+/**
+ * Information about a host exposed to the telemetry service.
+ */
+public data class HostInfo(val id: String, val name: String, val arch: String, val cpuCount: Int, val memCapacity: Long)
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
new file mode 100644
index 00000000..c48bff3a
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute.table
+
+import java.time.Instant
+
+/**
+ * A trace entry for a particular server.
+ */
+public data class ServerData(
+ val timestamp: Instant,
+ val server: ServerInfo,
+ val host: HostInfo?,
+ val uptime: Long,
+ val downtime: Long,
+ val bootTime: Instant?,
+ val schedulingLatency: Long,
+ val cpuLimit: Double,
+ val cpuActiveTime: Long,
+ val cpuIdleTime: Long,
+ val cpuStealTime: Long,
+ val cpuLostTime: Long,
+)
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt
new file mode 100644
index 00000000..b16e5f3d
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute.table
+
+/**
+ * Static information about a server exposed to the telemetry service.
+ */
+public data class ServerInfo(
+ val id: String,
+ val name: String,
+ val type: String,
+ val arch: String,
+ val imageId: String,
+ val imageName: String,
+ val cpuCount: Int,
+ val memCapacity: Long
+)
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
new file mode 100644
index 00000000..6db1399d
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute.table
+
+import java.time.Instant
+
+/**
+ * A trace entry for the compute service.
+ */
+public data class ServiceData(
+ val timestamp: Instant,
+ val hostsUp: Int,
+ val hostsDown: Int,
+ val serversPending: Int,
+ val serversActive: Int,
+ val attemptsSuccess: Int,
+ val attemptsFailure: Int,
+ val attemptsError: Int
+)
diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
index 9ee16fac..1de235e7 100644
--- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
+++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
@@ -22,18 +22,11 @@
package org.opendc.telemetry.sdk.metrics.export
-import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricExporter
import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.flow.consumeAsFlow
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
import mu.KotlinLogging
-import java.util.*
-import kotlin.coroutines.resume
-import kotlin.coroutines.suspendCoroutine
+import java.time.Duration
/**
* A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every
@@ -44,56 +37,45 @@ import kotlin.coroutines.suspendCoroutine
* @param scope The [CoroutineScope] to run the reader in.
* @param producers The metric producers to gather metrics from.
* @param exporter The export to export the metrics to.
- * @param exportInterval The export interval in milliseconds.
+ * @param exportInterval The export interval.
*/
public class CoroutineMetricReader(
scope: CoroutineScope,
private val producers: List<MetricProducer>,
private val exporter: MetricExporter,
- private val exportInterval: Long = 60_000
+ private val exportInterval: Duration = Duration.ofMinutes(5)
) : AutoCloseable {
private val logger = KotlinLogging.logger {}
- private val chan = Channel<List<MetricData>>(Channel.RENDEZVOUS)
/**
- * The metric reader job.
+ * The background job that is responsible for collecting the metrics every cycle.
*/
- private val readerJob = scope.launch {
- while (isActive) {
- delay(exportInterval)
+ private val job = scope.launch {
+ val intervalMs = exportInterval.toMillis()
- val metrics = mutableListOf<MetricData>()
- for (producer in producers) {
- metrics.addAll(producer.collectAllMetrics())
- }
- chan.send(Collections.unmodifiableList(metrics))
- }
- }
+ try {
+ while (isActive) {
+ delay(intervalMs)
+
+ val metrics = producers.flatMap(MetricProducer::collectAllMetrics)
- /**
- * The exporter job runs in the background to actually export the metrics.
- */
- private val exporterJob = chan.consumeAsFlow()
- .onEach { metrics ->
- suspendCoroutine<Unit> { cont ->
try {
val result = exporter.export(metrics)
result.whenComplete {
if (!result.isSuccess) {
- logger.trace { "Exporter failed" }
+ logger.warn { "Exporter failed" }
}
- cont.resume(Unit)
}
} catch (cause: Throwable) {
logger.warn(cause) { "Exporter threw an Exception" }
- cont.resume(Unit)
}
}
+ } finally {
+ exporter.shutdown()
}
- .launchIn(scope)
+ }
override fun close() {
- readerJob.cancel()
- exporterJob.cancel()
+ job.cancel()
}
}