diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-28 11:23:13 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-09-28 11:23:13 +0200 |
| commit | 6196895bfd0334052afa4fb91b00adb259a661b6 (patch) | |
| tree | 8a14988b30f6f5758b1f9f982d0086296eb5d416 /opendc-telemetry/opendc-telemetry-compute | |
| parent | 993c65d9c287d8db2db9ff1f95abb414803a502c (diff) | |
| parent | 94d8ee69e52dcd375a662a08c198aa29670362fb (diff) | |
merge: Simplify usage of ComputeMetricExporter
This pull request addresses some issues with the current implementation of
the `ComputeMetricExporter` class.
In particular, the construction of `ComputeMetricExporter` does not require a `Clock` anymore.
- Ensure shutdown of exporter is called
- Do not require clock for ComputeMetricExporter
- Do not recover guests in non-error state
- Write null values explicitly in Parquet exporter
- Report cause of compute exporter failure
**Breaking API Changes**
- `ComputeMetricExporter` is now an abstract class that can be extended to collect metrics
- `ParquetComputeMonitor` has been renamed to `ParquetComputeMetricExporter` and extends `ComputeMetricExporter`
Diffstat (limited to 'opendc-telemetry/opendc-telemetry-compute')
3 files changed, 59 insertions, 23 deletions
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt index e9449634..738ec38b 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt @@ -55,6 +55,9 @@ public class ComputeMetricAggregator { // ComputeService "scheduler.hosts" -> { for (point in metric.longSumData.points) { + // Record the timestamp for the service + service.recordTimestamp(point) + when (point.attributes[STATE_KEY]) { "up" -> service.hostsUp = point.value.toInt() "down" -> service.hostsDown = point.value.toInt() @@ -163,12 +166,16 @@ public class ComputeMetricAggregator { val server = getServer(servers, point) if (server != null) { + server.recordTimestamp(point) + when (point.attributes[STATE_KEY]) { "up" -> server.uptime = point.value "down" -> server.downtime = point.value } server.host = agg.host } else { + agg.recordTimestamp(point) + when (point.attributes[STATE_KEY]) { "up" -> agg.uptime = point.value "down" -> agg.downtime = point.value @@ -197,15 +204,15 @@ public class ComputeMetricAggregator { /** * Collect the data via the [monitor]. */ - public fun collect(now: Instant, monitor: ComputeMonitor) { - monitor.record(_service.collect(now)) + public fun collect(monitor: ComputeMonitor) { + monitor.record(_service.collect()) for (host in _hosts.values) { - monitor.record(host.collect(now)) + monitor.record(host.collect()) } for (server in _servers.values) { - monitor.record(server.collect(now)) + monitor.record(server.collect()) } } @@ -237,6 +244,8 @@ public class ComputeMetricAggregator { * An aggregator for service metrics before they are reported. */ internal class ServiceAggregator { + private var timestamp = Long.MIN_VALUE + @JvmField var hostsUp = 0 @JvmField var hostsDown = 0 @@ -250,7 +259,10 @@ public class ComputeMetricAggregator { /** * Finish the aggregation for this cycle. */ - fun collect(now: Instant): ServiceData = toServiceData(now) + fun collect(): ServiceData { + val now = Instant.ofEpochMilli(timestamp) + return toServiceData(now) + } /** * Convert the aggregator state to an immutable [ServiceData]. @@ -258,6 +270,13 @@ public class ComputeMetricAggregator { private fun toServiceData(now: Instant): ServiceData { return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) } + + /** + * Record the timestamp of a [point] for this aggregator. + */ + fun recordTimestamp(point: PointData) { + timestamp = point.epochNanos / 1_000_000L // ns to ms + } } /** @@ -275,6 +294,8 @@ public class ComputeMetricAggregator { resource.attributes[HOST_MEM_CAPACITY] ?: 0, ) + private var timestamp = Long.MIN_VALUE + @JvmField var guestsTerminated = 0 @JvmField var guestsRunning = 0 @JvmField var guestsError = 0 @@ -307,7 +328,8 @@ public class ComputeMetricAggregator { /** * Finish the aggregation for this cycle. */ - fun collect(now: Instant): HostData { + fun collect(): HostData { + val now = Instant.ofEpochMilli(timestamp) val data = toHostData(now) // Reset intermediate state for next aggregation @@ -360,6 +382,13 @@ public class ComputeMetricAggregator { if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null ) } + + /** + * Record the timestamp of a [point] for this aggregator. + */ + fun recordTimestamp(point: PointData) { + timestamp = point.epochNanos / 1_000_000L // ns to ms + } } /** @@ -383,8 +412,9 @@ public class ComputeMetricAggregator { /** * The [HostInfo] of the host on which the server is hosted. */ - var host: HostInfo? = null + @JvmField var host: HostInfo? = null + private var timestamp = Long.MIN_VALUE @JvmField var uptime: Long = 0 private var previousUptime = 0L @JvmField var downtime: Long = 0 @@ -404,7 +434,8 @@ public class ComputeMetricAggregator { /** * Finish the aggregation for this cycle. */ - fun collect(now: Instant): ServerData { + fun collect(): ServerData { + val now = Instant.ofEpochMilli(timestamp) val data = toServerData(now) previousUptime = uptime @@ -439,6 +470,13 @@ public class ComputeMetricAggregator { cpuLostTime - previousCpuLostTime ) } + + /** + * Record the timestamp of a [point] for this aggregator. + */ + fun recordTimestamp(point: PointData) { + timestamp = point.epochNanos / 1_000_000L // ns to ms + } } private companion object { diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt index ea96f721..3ab6c7b2 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt @@ -25,12 +25,17 @@ package org.opendc.telemetry.compute import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.metrics.data.* import io.opentelemetry.sdk.metrics.export.MetricExporter -import java.time.Clock +import mu.KotlinLogging /** * A [MetricExporter] that redirects data to a [ComputeMonitor] implementation. */ -public class ComputeMetricExporter(private val clock: Clock, private val monitor: ComputeMonitor) : MetricExporter { +public abstract class ComputeMetricExporter : MetricExporter, ComputeMonitor { + /** + * The logging instance for this exporter. + */ + private val logger = KotlinLogging.logger {} + /** * A [ComputeMetricAggregator] that actually performs the aggregation. */ @@ -39,9 +44,11 @@ public class ComputeMetricExporter(private val clock: Clock, private val monitor override fun export(metrics: Collection<MetricData>): CompletableResultCode { return try { agg.process(metrics) - agg.collect(clock.instant(), monitor) + agg.collect(this) + CompletableResultCode.ofSuccess() } catch (e: Throwable) { + logger.warn(e) { "Failed to export results" } CompletableResultCode.ofFailure() } } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt index 25d346fb..ce89061b 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt @@ -22,22 +22,13 @@ package org.opendc.telemetry.compute -import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricProducer import org.opendc.telemetry.compute.table.ServiceData -import java.time.Instant /** * Collect the metrics of the compute service. */ -public fun collectServiceMetrics(timestamp: Instant, metricProducer: MetricProducer): ServiceData { - return extractServiceMetrics(timestamp, metricProducer.collectAllMetrics()) -} - -/** - * Extract a [ServiceData] object from the specified list of metric data. - */ -public fun extractServiceMetrics(timestamp: Instant, metrics: Collection<MetricData>): ServiceData { +public fun collectServiceMetrics(metricProducer: MetricProducer): ServiceData { lateinit var serviceData: ServiceData val agg = ComputeMetricAggregator() val monitor = object : ComputeMonitor { @@ -46,7 +37,7 @@ public fun extractServiceMetrics(timestamp: Instant, metrics: Collection<MetricD } } - agg.process(metrics) - agg.collect(timestamp, monitor) + agg.process(metricProducer.collectAllMetrics()) + agg.collect(monitor) return serviceData } |
