summaryrefslogtreecommitdiff
path: root/opendc-telemetry/opendc-telemetry-sdk/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-25 14:53:54 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-25 14:53:54 +0200
commitaa9b32f8cd1467e9718959f400f6777e5d71737d (patch)
treeb88bbede15108c6855d7f94ded4c7054df186a72 /opendc-telemetry/opendc-telemetry-sdk/src
parenteb0e0a3bc557c05a70eead388797ab850ea87366 (diff)
parentb7a71e5b4aa77b41ef41deec2ace42b67a5a13a7 (diff)
merge: Integrate v2.1 progress into public repository
This pull request integrates the changes planned for the v2.1 release of OpenDC into the public Github repository in order to sync the progress of both repositories.
Diffstat (limited to 'opendc-telemetry/opendc-telemetry-sdk/src')
-rw-r--r--opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt50
1 files changed, 16 insertions, 34 deletions
diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
index 9ee16fac..1de235e7 100644
--- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
+++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
@@ -22,18 +22,11 @@
package org.opendc.telemetry.sdk.metrics.export
-import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricExporter
import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.flow.consumeAsFlow
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
import mu.KotlinLogging
-import java.util.*
-import kotlin.coroutines.resume
-import kotlin.coroutines.suspendCoroutine
+import java.time.Duration
/**
* A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every
@@ -44,56 +37,45 @@ import kotlin.coroutines.suspendCoroutine
* @param scope The [CoroutineScope] to run the reader in.
* @param producers The metric producers to gather metrics from.
* @param exporter The export to export the metrics to.
- * @param exportInterval The export interval in milliseconds.
+ * @param exportInterval The export interval.
*/
public class CoroutineMetricReader(
scope: CoroutineScope,
private val producers: List<MetricProducer>,
private val exporter: MetricExporter,
- private val exportInterval: Long = 60_000
+ private val exportInterval: Duration = Duration.ofMinutes(5)
) : AutoCloseable {
private val logger = KotlinLogging.logger {}
- private val chan = Channel<List<MetricData>>(Channel.RENDEZVOUS)
/**
- * The metric reader job.
+ * The background job that is responsible for collecting the metrics every cycle.
*/
- private val readerJob = scope.launch {
- while (isActive) {
- delay(exportInterval)
+ private val job = scope.launch {
+ val intervalMs = exportInterval.toMillis()
- val metrics = mutableListOf<MetricData>()
- for (producer in producers) {
- metrics.addAll(producer.collectAllMetrics())
- }
- chan.send(Collections.unmodifiableList(metrics))
- }
- }
+ try {
+ while (isActive) {
+ delay(intervalMs)
+
+ val metrics = producers.flatMap(MetricProducer::collectAllMetrics)
- /**
- * The exporter job runs in the background to actually export the metrics.
- */
- private val exporterJob = chan.consumeAsFlow()
- .onEach { metrics ->
- suspendCoroutine<Unit> { cont ->
try {
val result = exporter.export(metrics)
result.whenComplete {
if (!result.isSuccess) {
- logger.trace { "Exporter failed" }
+ logger.warn { "Exporter failed" }
}
- cont.resume(Unit)
}
} catch (cause: Throwable) {
logger.warn(cause) { "Exporter threw an Exception" }
- cont.resume(Unit)
}
}
+ } finally {
+ exporter.shutdown()
}
- .launchIn(scope)
+ }
override fun close() {
- readerJob.cancel()
- exporterJob.cancel()
+ job.cancel()
}
}