From 02c215ad57e1e4d56c54d22be58e1845bdeebf25 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 9 Oct 2021 16:10:00 +0200 Subject: refactor: Update OpenTelemetry to version 1.11 This change updates the OpenDC codebase to use OpenTelemetry v1.11, which stabilizes the metrics API. This stabilization brings quite a few breaking changes, so significant changes are necessary inside the OpenDC codebase. --- .../workflow/workload/WorkflowServiceHelper.kt | 49 ++++++++++++++++++++-- 1 file changed, 45 insertions(+), 4 deletions(-) (limited to 'opendc-workflow/opendc-workflow-workload/src/main') diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt index 0198900f..a7d0ed6c 100644 --- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt @@ -22,8 +22,13 @@ package org.opendc.workflow.workload +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.data.AggregationTemporality import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.metrics.export.MetricReader +import io.opentelemetry.sdk.metrics.export.MetricReaderFactory import io.opentelemetry.sdk.resources.Resource import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import kotlinx.coroutines.coroutineScope @@ -34,6 +39,7 @@ import org.opendc.telemetry.sdk.toOtelClock import org.opendc.workflow.api.Job import org.opendc.workflow.service.WorkflowService import java.time.Clock +import java.util.* import kotlin.coroutines.CoroutineContext /** @@ -58,23 +64,47 @@ public class WorkflowServiceHelper( /** * The [MetricProducer] exposed by the [WorkflowService]. */ - public val metricProducer: MetricProducer + public lateinit var metricProducer: MetricProducer + private set + + /** + * The [MeterProvider] used for the service. + */ + private val _meterProvider: SdkMeterProvider + + /** + * The list of [MetricReader]s that have been registered with the runner. + */ + private val _metricReaders = mutableListOf() init { val resource = Resource.builder() .put(ResourceAttributes.SERVICE_NAME, "opendc-workflow") .build() - val meterProvider = SdkMeterProvider.builder() + _meterProvider = SdkMeterProvider.builder() .setClock(clock.toOtelClock()) .setResource(resource) + .registerMetricReader { producer -> + metricProducer = producer + + val metricReaders = _metricReaders + object : MetricReader { + override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE + override fun flush(): CompletableResultCode { + return CompletableResultCode.ofAll(metricReaders.map { it.flush() }) + } + override fun shutdown(): CompletableResultCode { + return CompletableResultCode.ofAll(metricReaders.map { it.shutdown() }) + } + } + } .build() - metricProducer = meterProvider service = WorkflowService( context, clock, - meterProvider, + _meterProvider, computeClient, schedulerSpec.schedulingQuantum, jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy, @@ -116,8 +146,19 @@ public class WorkflowServiceHelper( } } + /** + * Register a [MetricReader] for this helper. + * + * @param factory The factory for the reader to register. + */ + public fun registerMetricReader(factory: MetricReaderFactory) { + val reader = factory.apply(metricProducer) + _metricReaders.add(reader) + } + override fun close() { computeClient.close() service.close() + _meterProvider.close() } } -- cgit v1.2.3