diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-17 17:12:20 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-09-17 17:12:20 +0200 |
| commit | c1b9719aad10566c9d17f9eb757236c58a602b89 (patch) | |
| tree | 2755ea2d44256116e6dc08a57a64b37a36331249 /opendc-telemetry/opendc-telemetry-sdk/src | |
| parent | 2cd3bd18e548a72d64afe0e7f59487f4747d722f (diff) | |
| parent | e2537c59bef0645b948e92553cc5a42a8c0f7256 (diff) | |
merge: Standardize simulator metrics
This pull request standardizes the metrics emitted by the simulator based on OpenTelemetry conventions.
From now on, all metrics exposed by the simulator are exported through OpenTelemetry
following the recommended practices for naming, collection, etc.
**Implementation Notes**
- Improve ParquetDataWriter implementation
- Simplify CoroutineMetricReader
- Create separate MeterProvider per service/host
- Standardize compute scheduler metrics
- Standardize SimHost metrics
- Use logical types for Parquet output columns
**External Dependencies**
- Update to OpenTelemetry 1.6.0
**Breaking API Changes**
- Instead of supplying a `Meter` instances, key classes are now responsible for constructing
a `Meter` instance from the supplied `MeterProvider`.
- Export format has been changed to suit the outputted metrics
- Energy experiments shell has been removed
Diffstat (limited to 'opendc-telemetry/opendc-telemetry-sdk/src')
| -rw-r--r-- | opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt | 52 |
1 files changed, 17 insertions, 35 deletions
diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt index 9ee16fac..07f0ff7f 100644 --- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt +++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt @@ -26,14 +26,8 @@ import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricExporter import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.consumeAsFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import mu.KotlinLogging -import java.util.* -import kotlin.coroutines.resume -import kotlin.coroutines.suspendCoroutine +import java.time.Duration /** * A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every @@ -44,56 +38,44 @@ import kotlin.coroutines.suspendCoroutine * @param scope The [CoroutineScope] to run the reader in. * @param producers The metric producers to gather metrics from. * @param exporter The export to export the metrics to. - * @param exportInterval The export interval in milliseconds. + * @param exportInterval The export interval. */ public class CoroutineMetricReader( scope: CoroutineScope, private val producers: List<MetricProducer>, private val exporter: MetricExporter, - private val exportInterval: Long = 60_000 + private val exportInterval: Duration = Duration.ofMinutes(5) ) : AutoCloseable { private val logger = KotlinLogging.logger {} - private val chan = Channel<List<MetricData>>(Channel.RENDEZVOUS) /** - * The metric reader job. + * The background job that is responsible for collecting the metrics every cycle. */ - private val readerJob = scope.launch { + private val job = scope.launch { + val intervalMs = exportInterval.toMillis() + while (isActive) { - delay(exportInterval) + delay(intervalMs) val metrics = mutableListOf<MetricData>() for (producer in producers) { metrics.addAll(producer.collectAllMetrics()) } - chan.send(Collections.unmodifiableList(metrics)) - } - } - /** - * The exporter job runs in the background to actually export the metrics. - */ - private val exporterJob = chan.consumeAsFlow() - .onEach { metrics -> - suspendCoroutine<Unit> { cont -> - try { - val result = exporter.export(metrics) - result.whenComplete { - if (!result.isSuccess) { - logger.trace { "Exporter failed" } - } - cont.resume(Unit) + try { + val result = exporter.export(metrics) + result.whenComplete { + if (!result.isSuccess) { + logger.trace { "Exporter failed" } } - } catch (cause: Throwable) { - logger.warn(cause) { "Exporter threw an Exception" } - cont.resume(Unit) } + } catch (cause: Throwable) { + logger.warn(cause) { "Exporter threw an Exception" } } } - .launchIn(scope) + } override fun close() { - readerJob.cancel() - exporterJob.cancel() + job.cancel() } } |
