diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-13 14:48:02 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-17 16:48:02 +0200 |
| commit | 5f0b6b372487d79594cf59010822e160f351e0be (patch) | |
| tree | acad4d2e496da334bafc1fd256e73bbd3d5c481c /opendc-telemetry | |
| parent | eef8ea3ab40a4e4a12ba96f839c35c5804884bc1 (diff) | |
refactor(telemetry): Simplify CoroutineMetricReader
This change simplifies the CoroutineMetricReader implementation by
removing the seperation of reader and exporter jobs.
Diffstat (limited to 'opendc-telemetry')
2 files changed, 19 insertions, 36 deletions
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt index d3d983b9..01df0e69 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt @@ -33,6 +33,7 @@ import org.opendc.compute.service.driver.HostState import org.opendc.telemetry.compute.table.ServiceData import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.time.Clock +import java.time.Duration /** * Attach the specified monitor to the OpenDC Compute service. @@ -42,7 +43,7 @@ public suspend fun withMonitor( clock: Clock, metricProducer: MetricProducer, monitor: ComputeMonitor, - exportInterval: Long = 5L * 60 * 1000, /* Every 5 min (which is the granularity of the workload trace) */ + exportInterval: Duration = Duration.ofMinutes(5), /* Every 5 min (which is the granularity of the workload trace) */ block: suspend CoroutineScope.() -> Unit ): Unit = coroutineScope { // Monitor host events diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt index 9ee16fac..8f19ab81 100644 --- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt +++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt @@ -26,14 +26,8 @@ import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricExporter import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.consumeAsFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import mu.KotlinLogging -import java.util.* -import kotlin.coroutines.resume -import kotlin.coroutines.suspendCoroutine +import java.time.Duration /** * A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every @@ -44,56 +38,44 @@ import kotlin.coroutines.suspendCoroutine * @param scope The [CoroutineScope] to run the reader in. * @param producers The metric producers to gather metrics from. * @param exporter The export to export the metrics to. - * @param exportInterval The export interval in milliseconds. + * @param exportInterval The export interval. */ public class CoroutineMetricReader( scope: CoroutineScope, private val producers: List<MetricProducer>, private val exporter: MetricExporter, - private val exportInterval: Long = 60_000 + private val exportInterval: Duration = Duration.ofMinutes(1) ) : AutoCloseable { private val logger = KotlinLogging.logger {} - private val chan = Channel<List<MetricData>>(Channel.RENDEZVOUS) /** - * The metric reader job. + * The background job that is responsible for collecting the metrics every cycle. */ - private val readerJob = scope.launch { + private val job = scope.launch { + val intervalMs = exportInterval.toMillis() + while (isActive) { - delay(exportInterval) + delay(intervalMs) val metrics = mutableListOf<MetricData>() for (producer in producers) { metrics.addAll(producer.collectAllMetrics()) } - chan.send(Collections.unmodifiableList(metrics)) - } - } - /** - * The exporter job runs in the background to actually export the metrics. - */ - private val exporterJob = chan.consumeAsFlow() - .onEach { metrics -> - suspendCoroutine<Unit> { cont -> - try { - val result = exporter.export(metrics) - result.whenComplete { - if (!result.isSuccess) { - logger.trace { "Exporter failed" } - } - cont.resume(Unit) + try { + val result = exporter.export(metrics) + result.whenComplete { + if (!result.isSuccess) { + logger.trace { "Exporter failed" } } - } catch (cause: Throwable) { - logger.warn(cause) { "Exporter threw an Exception" } - cont.resume(Unit) } + } catch (cause: Throwable) { + logger.warn(cause) { "Exporter threw an Exception" } } } - .launchIn(scope) + } override fun close() { - readerJob.cancel() - exporterJob.cancel() + job.cancel() } } |
