summaryrefslogtreecommitdiff
path: root/opendc-telemetry
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-telemetry')
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt3
-rw-r--r--opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt52
2 files changed, 19 insertions, 36 deletions
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
index d3d983b9..01df0e69 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
@@ -33,6 +33,7 @@ import org.opendc.compute.service.driver.HostState
import org.opendc.telemetry.compute.table.ServiceData
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.time.Clock
+import java.time.Duration
/**
* Attach the specified monitor to the OpenDC Compute service.
@@ -42,7 +43,7 @@ public suspend fun withMonitor(
clock: Clock,
metricProducer: MetricProducer,
monitor: ComputeMonitor,
- exportInterval: Long = 5L * 60 * 1000, /* Every 5 min (which is the granularity of the workload trace) */
+ exportInterval: Duration = Duration.ofMinutes(5), /* Every 5 min (which is the granularity of the workload trace) */
block: suspend CoroutineScope.() -> Unit
): Unit = coroutineScope {
// Monitor host events
diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
index 9ee16fac..8f19ab81 100644
--- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
+++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
@@ -26,14 +26,8 @@ import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricExporter
import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.flow.consumeAsFlow
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
import mu.KotlinLogging
-import java.util.*
-import kotlin.coroutines.resume
-import kotlin.coroutines.suspendCoroutine
+import java.time.Duration
/**
* A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every
@@ -44,56 +38,44 @@ import kotlin.coroutines.suspendCoroutine
* @param scope The [CoroutineScope] to run the reader in.
* @param producers The metric producers to gather metrics from.
* @param exporter The export to export the metrics to.
- * @param exportInterval The export interval in milliseconds.
+ * @param exportInterval The export interval.
*/
public class CoroutineMetricReader(
scope: CoroutineScope,
private val producers: List<MetricProducer>,
private val exporter: MetricExporter,
- private val exportInterval: Long = 60_000
+ private val exportInterval: Duration = Duration.ofMinutes(1)
) : AutoCloseable {
private val logger = KotlinLogging.logger {}
- private val chan = Channel<List<MetricData>>(Channel.RENDEZVOUS)
/**
- * The metric reader job.
+ * The background job that is responsible for collecting the metrics every cycle.
*/
- private val readerJob = scope.launch {
+ private val job = scope.launch {
+ val intervalMs = exportInterval.toMillis()
+
while (isActive) {
- delay(exportInterval)
+ delay(intervalMs)
val metrics = mutableListOf<MetricData>()
for (producer in producers) {
metrics.addAll(producer.collectAllMetrics())
}
- chan.send(Collections.unmodifiableList(metrics))
- }
- }
- /**
- * The exporter job runs in the background to actually export the metrics.
- */
- private val exporterJob = chan.consumeAsFlow()
- .onEach { metrics ->
- suspendCoroutine<Unit> { cont ->
- try {
- val result = exporter.export(metrics)
- result.whenComplete {
- if (!result.isSuccess) {
- logger.trace { "Exporter failed" }
- }
- cont.resume(Unit)
+ try {
+ val result = exporter.export(metrics)
+ result.whenComplete {
+ if (!result.isSuccess) {
+ logger.trace { "Exporter failed" }
}
- } catch (cause: Throwable) {
- logger.warn(cause) { "Exporter threw an Exception" }
- cont.resume(Unit)
}
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Exporter threw an Exception" }
}
}
- .launchIn(scope)
+ }
override fun close() {
- readerJob.cancel()
- exporterJob.cancel()
+ job.cancel()
}
}