summaryrefslogtreecommitdiff
path: root/opendc-telemetry/opendc-telemetry-compute/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-22 12:43:01 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-28 10:52:03 +0200
commit30cd010f1f98262aa7f264bb3c3eb6028b8495c5 (patch)
tree51d4a2f4c2339e0cf308735143327574f2e72263 /opendc-telemetry/opendc-telemetry-compute/src
parent5fa0cf915ecf643e94a0de972125e8f862308f80 (diff)
refactor(telemetry): Do not require clock for ComputeMetricExporter
This change drops the requirement for a clock parameter when constructing a ComputeMetricExporter, since it will now derive the timestamp from the recorded metrics.
Diffstat (limited to 'opendc-telemetry/opendc-telemetry-compute/src')
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt52
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt6
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt15
3 files changed, 51 insertions, 22 deletions
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
index e9449634..679d5944 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
@@ -55,6 +55,9 @@ public class ComputeMetricAggregator {
// ComputeService
"scheduler.hosts" -> {
for (point in metric.longSumData.points) {
+ // Record the timestamp for the service
+ service.recordTimestamp(point)
+
when (point.attributes[STATE_KEY]) {
"up" -> service.hostsUp = point.value.toInt()
"down" -> service.hostsDown = point.value.toInt()
@@ -163,12 +166,16 @@ public class ComputeMetricAggregator {
val server = getServer(servers, point)
if (server != null) {
+ server.recordTimestamp(point)
+
when (point.attributes[STATE_KEY]) {
"up" -> server.uptime = point.value
"down" -> server.downtime = point.value
}
server.host = agg.host
} else {
+ agg.recordTimestamp(point)
+
when (point.attributes[STATE_KEY]) {
"up" -> agg.uptime = point.value
"down" -> agg.downtime = point.value
@@ -197,15 +204,15 @@ public class ComputeMetricAggregator {
/**
* Collect the data via the [monitor].
*/
- public fun collect(now: Instant, monitor: ComputeMonitor) {
- monitor.record(_service.collect(now))
+ public fun collect(monitor: ComputeMonitor) {
+ monitor.record(_service.collect())
for (host in _hosts.values) {
- monitor.record(host.collect(now))
+ monitor.record(host.collect())
}
for (server in _servers.values) {
- monitor.record(server.collect(now))
+ monitor.record(server.collect())
}
}
@@ -237,6 +244,8 @@ public class ComputeMetricAggregator {
* An aggregator for service metrics before they are reported.
*/
internal class ServiceAggregator {
+ private var timestamp = Long.MIN_VALUE
+
@JvmField var hostsUp = 0
@JvmField var hostsDown = 0
@@ -250,7 +259,10 @@ public class ComputeMetricAggregator {
/**
* Finish the aggregation for this cycle.
*/
- fun collect(now: Instant): ServiceData = toServiceData(now)
+ fun collect(): ServiceData {
+ val now = Instant.ofEpochMilli(timestamp)
+ return toServiceData(now)
+ }
/**
* Convert the aggregator state to an immutable [ServiceData].
@@ -258,6 +270,13 @@ public class ComputeMetricAggregator {
private fun toServiceData(now: Instant): ServiceData {
return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError)
}
+
+ /**
+ * Record the timestamp of a [point] for this aggregator.
+ */
+ fun recordTimestamp(point: PointData) {
+ timestamp = point.epochNanos / 1_000_000L // ns to ms
+ }
}
/**
@@ -275,6 +294,8 @@ public class ComputeMetricAggregator {
resource.attributes[HOST_MEM_CAPACITY] ?: 0,
)
+ private var timestamp = Long.MIN_VALUE
+
@JvmField var guestsTerminated = 0
@JvmField var guestsRunning = 0
@JvmField var guestsError = 0
@@ -307,7 +328,8 @@ public class ComputeMetricAggregator {
/**
* Finish the aggregation for this cycle.
*/
- fun collect(now: Instant): HostData {
+ fun collect(): HostData {
+ val now = Instant.ofEpochMilli(timestamp)
val data = toHostData(now)
// Reset intermediate state for next aggregation
@@ -360,6 +382,13 @@ public class ComputeMetricAggregator {
if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null
)
}
+
+ /**
+ * Record the timestamp of a [point] for this aggregator.
+ */
+ fun recordTimestamp(point: PointData) {
+ timestamp = point.epochNanos / 1_000_000L // ns to ms
+ }
}
/**
@@ -385,6 +414,7 @@ public class ComputeMetricAggregator {
*/
var host: HostInfo? = null
+ private var timestamp = Long.MIN_VALUE
@JvmField var uptime: Long = 0
private var previousUptime = 0L
@JvmField var downtime: Long = 0
@@ -404,7 +434,8 @@ public class ComputeMetricAggregator {
/**
* Finish the aggregation for this cycle.
*/
- fun collect(now: Instant): ServerData {
+ fun collect(): ServerData {
+ val now = Instant.ofEpochMilli(timestamp)
val data = toServerData(now)
previousUptime = uptime
@@ -439,6 +470,13 @@ public class ComputeMetricAggregator {
cpuLostTime - previousCpuLostTime
)
}
+
+ /**
+ * Record the timestamp of a [point] for this aggregator.
+ */
+ fun recordTimestamp(point: PointData) {
+ timestamp = point.epochNanos / 1_000_000L // ns to ms
+ }
}
private companion object {
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
index ea96f721..580cc6fb 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
@@ -25,12 +25,11 @@ package org.opendc.telemetry.compute
import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.data.*
import io.opentelemetry.sdk.metrics.export.MetricExporter
-import java.time.Clock
/**
* A [MetricExporter] that redirects data to a [ComputeMonitor] implementation.
*/
-public class ComputeMetricExporter(private val clock: Clock, private val monitor: ComputeMonitor) : MetricExporter {
+public abstract class ComputeMetricExporter : MetricExporter, ComputeMonitor {
/**
* A [ComputeMetricAggregator] that actually performs the aggregation.
*/
@@ -39,7 +38,8 @@ public class ComputeMetricExporter(private val clock: Clock, private val monitor
override fun export(metrics: Collection<MetricData>): CompletableResultCode {
return try {
agg.process(metrics)
- agg.collect(clock.instant(), monitor)
+ agg.collect(this)
+
CompletableResultCode.ofSuccess()
} catch (e: Throwable) {
CompletableResultCode.ofFailure()
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
index 25d346fb..ce89061b 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
@@ -22,22 +22,13 @@
package org.opendc.telemetry.compute
-import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricProducer
import org.opendc.telemetry.compute.table.ServiceData
-import java.time.Instant
/**
* Collect the metrics of the compute service.
*/
-public fun collectServiceMetrics(timestamp: Instant, metricProducer: MetricProducer): ServiceData {
- return extractServiceMetrics(timestamp, metricProducer.collectAllMetrics())
-}
-
-/**
- * Extract a [ServiceData] object from the specified list of metric data.
- */
-public fun extractServiceMetrics(timestamp: Instant, metrics: Collection<MetricData>): ServiceData {
+public fun collectServiceMetrics(metricProducer: MetricProducer): ServiceData {
lateinit var serviceData: ServiceData
val agg = ComputeMetricAggregator()
val monitor = object : ComputeMonitor {
@@ -46,7 +37,7 @@ public fun extractServiceMetrics(timestamp: Instant, metrics: Collection<MetricD
}
}
- agg.process(metrics)
- agg.collect(timestamp, monitor)
+ agg.process(metricProducer.collectAllMetrics())
+ agg.collect(monitor)
return serviceData
}