diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-22 12:43:01 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-28 10:52:03 +0200 |
| commit | 30cd010f1f98262aa7f264bb3c3eb6028b8495c5 (patch) | |
| tree | 51d4a2f4c2339e0cf308735143327574f2e72263 /opendc-telemetry | |
| parent | 5fa0cf915ecf643e94a0de972125e8f862308f80 (diff) | |
refactor(telemetry): Do not require clock for ComputeMetricExporter
This change drops the requirement for a clock parameter when
constructing a ComputeMetricExporter, since it will now derive the
timestamp from the recorded metrics.
Diffstat (limited to 'opendc-telemetry')
3 files changed, 51 insertions, 22 deletions
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt index e9449634..679d5944 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt @@ -55,6 +55,9 @@ public class ComputeMetricAggregator { // ComputeService "scheduler.hosts" -> { for (point in metric.longSumData.points) { + // Record the timestamp for the service + service.recordTimestamp(point) + when (point.attributes[STATE_KEY]) { "up" -> service.hostsUp = point.value.toInt() "down" -> service.hostsDown = point.value.toInt() @@ -163,12 +166,16 @@ public class ComputeMetricAggregator { val server = getServer(servers, point) if (server != null) { + server.recordTimestamp(point) + when (point.attributes[STATE_KEY]) { "up" -> server.uptime = point.value "down" -> server.downtime = point.value } server.host = agg.host } else { + agg.recordTimestamp(point) + when (point.attributes[STATE_KEY]) { "up" -> agg.uptime = point.value "down" -> agg.downtime = point.value @@ -197,15 +204,15 @@ public class ComputeMetricAggregator { /** * Collect the data via the [monitor]. */ - public fun collect(now: Instant, monitor: ComputeMonitor) { - monitor.record(_service.collect(now)) + public fun collect(monitor: ComputeMonitor) { + monitor.record(_service.collect()) for (host in _hosts.values) { - monitor.record(host.collect(now)) + monitor.record(host.collect()) } for (server in _servers.values) { - monitor.record(server.collect(now)) + monitor.record(server.collect()) } } @@ -237,6 +244,8 @@ public class ComputeMetricAggregator { * An aggregator for service metrics before they are reported. */ internal class ServiceAggregator { + private var timestamp = Long.MIN_VALUE + @JvmField var hostsUp = 0 @JvmField var hostsDown = 0 @@ -250,7 +259,10 @@ public class ComputeMetricAggregator { /** * Finish the aggregation for this cycle. */ - fun collect(now: Instant): ServiceData = toServiceData(now) + fun collect(): ServiceData { + val now = Instant.ofEpochMilli(timestamp) + return toServiceData(now) + } /** * Convert the aggregator state to an immutable [ServiceData]. @@ -258,6 +270,13 @@ public class ComputeMetricAggregator { private fun toServiceData(now: Instant): ServiceData { return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) } + + /** + * Record the timestamp of a [point] for this aggregator. + */ + fun recordTimestamp(point: PointData) { + timestamp = point.epochNanos / 1_000_000L // ns to ms + } } /** @@ -275,6 +294,8 @@ public class ComputeMetricAggregator { resource.attributes[HOST_MEM_CAPACITY] ?: 0, ) + private var timestamp = Long.MIN_VALUE + @JvmField var guestsTerminated = 0 @JvmField var guestsRunning = 0 @JvmField var guestsError = 0 @@ -307,7 +328,8 @@ public class ComputeMetricAggregator { /** * Finish the aggregation for this cycle. */ - fun collect(now: Instant): HostData { + fun collect(): HostData { + val now = Instant.ofEpochMilli(timestamp) val data = toHostData(now) // Reset intermediate state for next aggregation @@ -360,6 +382,13 @@ public class ComputeMetricAggregator { if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null ) } + + /** + * Record the timestamp of a [point] for this aggregator. + */ + fun recordTimestamp(point: PointData) { + timestamp = point.epochNanos / 1_000_000L // ns to ms + } } /** @@ -385,6 +414,7 @@ public class ComputeMetricAggregator { */ var host: HostInfo? = null + private var timestamp = Long.MIN_VALUE @JvmField var uptime: Long = 0 private var previousUptime = 0L @JvmField var downtime: Long = 0 @@ -404,7 +434,8 @@ public class ComputeMetricAggregator { /** * Finish the aggregation for this cycle. */ - fun collect(now: Instant): ServerData { + fun collect(): ServerData { + val now = Instant.ofEpochMilli(timestamp) val data = toServerData(now) previousUptime = uptime @@ -439,6 +470,13 @@ public class ComputeMetricAggregator { cpuLostTime - previousCpuLostTime ) } + + /** + * Record the timestamp of a [point] for this aggregator. + */ + fun recordTimestamp(point: PointData) { + timestamp = point.epochNanos / 1_000_000L // ns to ms + } } private companion object { diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt index ea96f721..580cc6fb 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt @@ -25,12 +25,11 @@ package org.opendc.telemetry.compute import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.metrics.data.* import io.opentelemetry.sdk.metrics.export.MetricExporter -import java.time.Clock /** * A [MetricExporter] that redirects data to a [ComputeMonitor] implementation. */ -public class ComputeMetricExporter(private val clock: Clock, private val monitor: ComputeMonitor) : MetricExporter { +public abstract class ComputeMetricExporter : MetricExporter, ComputeMonitor { /** * A [ComputeMetricAggregator] that actually performs the aggregation. */ @@ -39,7 +38,8 @@ public class ComputeMetricExporter(private val clock: Clock, private val monitor override fun export(metrics: Collection<MetricData>): CompletableResultCode { return try { agg.process(metrics) - agg.collect(clock.instant(), monitor) + agg.collect(this) + CompletableResultCode.ofSuccess() } catch (e: Throwable) { CompletableResultCode.ofFailure() diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt index 25d346fb..ce89061b 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt @@ -22,22 +22,13 @@ package org.opendc.telemetry.compute -import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricProducer import org.opendc.telemetry.compute.table.ServiceData -import java.time.Instant /** * Collect the metrics of the compute service. */ -public fun collectServiceMetrics(timestamp: Instant, metricProducer: MetricProducer): ServiceData { - return extractServiceMetrics(timestamp, metricProducer.collectAllMetrics()) -} - -/** - * Extract a [ServiceData] object from the specified list of metric data. - */ -public fun extractServiceMetrics(timestamp: Instant, metrics: Collection<MetricData>): ServiceData { +public fun collectServiceMetrics(metricProducer: MetricProducer): ServiceData { lateinit var serviceData: ServiceData val agg = ComputeMetricAggregator() val monitor = object : ComputeMonitor { @@ -46,7 +37,7 @@ public fun extractServiceMetrics(timestamp: Instant, metrics: Collection<MetricD } } - agg.process(metrics) - agg.collect(timestamp, monitor) + agg.process(metricProducer.collectAllMetrics()) + agg.collect(monitor) return serviceData } |
