summaryrefslogtreecommitdiff
path: root/opendc-telemetry/opendc-telemetry-compute
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-telemetry/opendc-telemetry-compute')
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt54
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt13
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt15
3 files changed, 59 insertions, 23 deletions
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
index e9449634..738ec38b 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
@@ -55,6 +55,9 @@ public class ComputeMetricAggregator {
// ComputeService
"scheduler.hosts" -> {
for (point in metric.longSumData.points) {
+ // Record the timestamp for the service
+ service.recordTimestamp(point)
+
when (point.attributes[STATE_KEY]) {
"up" -> service.hostsUp = point.value.toInt()
"down" -> service.hostsDown = point.value.toInt()
@@ -163,12 +166,16 @@ public class ComputeMetricAggregator {
val server = getServer(servers, point)
if (server != null) {
+ server.recordTimestamp(point)
+
when (point.attributes[STATE_KEY]) {
"up" -> server.uptime = point.value
"down" -> server.downtime = point.value
}
server.host = agg.host
} else {
+ agg.recordTimestamp(point)
+
when (point.attributes[STATE_KEY]) {
"up" -> agg.uptime = point.value
"down" -> agg.downtime = point.value
@@ -197,15 +204,15 @@ public class ComputeMetricAggregator {
/**
* Collect the data via the [monitor].
*/
- public fun collect(now: Instant, monitor: ComputeMonitor) {
- monitor.record(_service.collect(now))
+ public fun collect(monitor: ComputeMonitor) {
+ monitor.record(_service.collect())
for (host in _hosts.values) {
- monitor.record(host.collect(now))
+ monitor.record(host.collect())
}
for (server in _servers.values) {
- monitor.record(server.collect(now))
+ monitor.record(server.collect())
}
}
@@ -237,6 +244,8 @@ public class ComputeMetricAggregator {
* An aggregator for service metrics before they are reported.
*/
internal class ServiceAggregator {
+ private var timestamp = Long.MIN_VALUE
+
@JvmField var hostsUp = 0
@JvmField var hostsDown = 0
@@ -250,7 +259,10 @@ public class ComputeMetricAggregator {
/**
* Finish the aggregation for this cycle.
*/
- fun collect(now: Instant): ServiceData = toServiceData(now)
+ fun collect(): ServiceData {
+ val now = Instant.ofEpochMilli(timestamp)
+ return toServiceData(now)
+ }
/**
* Convert the aggregator state to an immutable [ServiceData].
@@ -258,6 +270,13 @@ public class ComputeMetricAggregator {
private fun toServiceData(now: Instant): ServiceData {
return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError)
}
+
+ /**
+ * Record the timestamp of a [point] for this aggregator.
+ */
+ fun recordTimestamp(point: PointData) {
+ timestamp = point.epochNanos / 1_000_000L // ns to ms
+ }
}
/**
@@ -275,6 +294,8 @@ public class ComputeMetricAggregator {
resource.attributes[HOST_MEM_CAPACITY] ?: 0,
)
+ private var timestamp = Long.MIN_VALUE
+
@JvmField var guestsTerminated = 0
@JvmField var guestsRunning = 0
@JvmField var guestsError = 0
@@ -307,7 +328,8 @@ public class ComputeMetricAggregator {
/**
* Finish the aggregation for this cycle.
*/
- fun collect(now: Instant): HostData {
+ fun collect(): HostData {
+ val now = Instant.ofEpochMilli(timestamp)
val data = toHostData(now)
// Reset intermediate state for next aggregation
@@ -360,6 +382,13 @@ public class ComputeMetricAggregator {
if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null
)
}
+
+ /**
+ * Record the timestamp of a [point] for this aggregator.
+ */
+ fun recordTimestamp(point: PointData) {
+ timestamp = point.epochNanos / 1_000_000L // ns to ms
+ }
}
/**
@@ -383,8 +412,9 @@ public class ComputeMetricAggregator {
/**
* The [HostInfo] of the host on which the server is hosted.
*/
- var host: HostInfo? = null
+ @JvmField var host: HostInfo? = null
+ private var timestamp = Long.MIN_VALUE
@JvmField var uptime: Long = 0
private var previousUptime = 0L
@JvmField var downtime: Long = 0
@@ -404,7 +434,8 @@ public class ComputeMetricAggregator {
/**
* Finish the aggregation for this cycle.
*/
- fun collect(now: Instant): ServerData {
+ fun collect(): ServerData {
+ val now = Instant.ofEpochMilli(timestamp)
val data = toServerData(now)
previousUptime = uptime
@@ -439,6 +470,13 @@ public class ComputeMetricAggregator {
cpuLostTime - previousCpuLostTime
)
}
+
+ /**
+ * Record the timestamp of a [point] for this aggregator.
+ */
+ fun recordTimestamp(point: PointData) {
+ timestamp = point.epochNanos / 1_000_000L // ns to ms
+ }
}
private companion object {
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
index ea96f721..3ab6c7b2 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
@@ -25,12 +25,17 @@ package org.opendc.telemetry.compute
import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.data.*
import io.opentelemetry.sdk.metrics.export.MetricExporter
-import java.time.Clock
+import mu.KotlinLogging
/**
* A [MetricExporter] that redirects data to a [ComputeMonitor] implementation.
*/
-public class ComputeMetricExporter(private val clock: Clock, private val monitor: ComputeMonitor) : MetricExporter {
+public abstract class ComputeMetricExporter : MetricExporter, ComputeMonitor {
+ /**
+ * The logging instance for this exporter.
+ */
+ private val logger = KotlinLogging.logger {}
+
/**
* A [ComputeMetricAggregator] that actually performs the aggregation.
*/
@@ -39,9 +44,11 @@ public class ComputeMetricExporter(private val clock: Clock, private val monitor
override fun export(metrics: Collection<MetricData>): CompletableResultCode {
return try {
agg.process(metrics)
- agg.collect(clock.instant(), monitor)
+ agg.collect(this)
+
CompletableResultCode.ofSuccess()
} catch (e: Throwable) {
+ logger.warn(e) { "Failed to export results" }
CompletableResultCode.ofFailure()
}
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
index 25d346fb..ce89061b 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
@@ -22,22 +22,13 @@
package org.opendc.telemetry.compute
-import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricProducer
import org.opendc.telemetry.compute.table.ServiceData
-import java.time.Instant
/**
* Collect the metrics of the compute service.
*/
-public fun collectServiceMetrics(timestamp: Instant, metricProducer: MetricProducer): ServiceData {
- return extractServiceMetrics(timestamp, metricProducer.collectAllMetrics())
-}
-
-/**
- * Extract a [ServiceData] object from the specified list of metric data.
- */
-public fun extractServiceMetrics(timestamp: Instant, metrics: Collection<MetricData>): ServiceData {
+public fun collectServiceMetrics(metricProducer: MetricProducer): ServiceData {
lateinit var serviceData: ServiceData
val agg = ComputeMetricAggregator()
val monitor = object : ComputeMonitor {
@@ -46,7 +37,7 @@ public fun extractServiceMetrics(timestamp: Instant, metrics: Collection<MetricD
}
}
- agg.process(metrics)
- agg.collect(timestamp, monitor)
+ agg.process(metricProducer.collectAllMetrics())
+ agg.collect(monitor)
return serviceData
}