diff options
Diffstat (limited to 'opendc-telemetry/opendc-telemetry-sdk/src')
| -rw-r--r-- | opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt | 50 |
1 files changed, 16 insertions, 34 deletions
diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt index 9ee16fac..1de235e7 100644 --- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt +++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt @@ -22,18 +22,11 @@ package org.opendc.telemetry.sdk.metrics.export -import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricExporter import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.consumeAsFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import mu.KotlinLogging -import java.util.* -import kotlin.coroutines.resume -import kotlin.coroutines.suspendCoroutine +import java.time.Duration /** * A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every @@ -44,56 +37,45 @@ import kotlin.coroutines.suspendCoroutine * @param scope The [CoroutineScope] to run the reader in. * @param producers The metric producers to gather metrics from. * @param exporter The export to export the metrics to. - * @param exportInterval The export interval in milliseconds. + * @param exportInterval The export interval. */ public class CoroutineMetricReader( scope: CoroutineScope, private val producers: List<MetricProducer>, private val exporter: MetricExporter, - private val exportInterval: Long = 60_000 + private val exportInterval: Duration = Duration.ofMinutes(5) ) : AutoCloseable { private val logger = KotlinLogging.logger {} - private val chan = Channel<List<MetricData>>(Channel.RENDEZVOUS) /** - * The metric reader job. + * The background job that is responsible for collecting the metrics every cycle. */ - private val readerJob = scope.launch { - while (isActive) { - delay(exportInterval) + private val job = scope.launch { + val intervalMs = exportInterval.toMillis() - val metrics = mutableListOf<MetricData>() - for (producer in producers) { - metrics.addAll(producer.collectAllMetrics()) - } - chan.send(Collections.unmodifiableList(metrics)) - } - } + try { + while (isActive) { + delay(intervalMs) + + val metrics = producers.flatMap(MetricProducer::collectAllMetrics) - /** - * The exporter job runs in the background to actually export the metrics. - */ - private val exporterJob = chan.consumeAsFlow() - .onEach { metrics -> - suspendCoroutine<Unit> { cont -> try { val result = exporter.export(metrics) result.whenComplete { if (!result.isSuccess) { - logger.trace { "Exporter failed" } + logger.warn { "Exporter failed" } } - cont.resume(Unit) } } catch (cause: Throwable) { logger.warn(cause) { "Exporter threw an Exception" } - cont.resume(Unit) } } + } finally { + exporter.shutdown() } - .launchIn(scope) + } override fun close() { - readerJob.cancel() - exporterJob.cancel() + job.cancel() } } |
