diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-09 16:10:00 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-02-15 18:08:37 +0100 |
| commit | 02c215ad57e1e4d56c54d22be58e1845bdeebf25 (patch) | |
| tree | 7794b53ca3bb6fa197a118cee92114135be15def /opendc-workflow | |
| parent | 48c04fb74ee170f58f292b077c62b4da237f507e (diff) | |
refactor: Update OpenTelemetry to version 1.11
This change updates the OpenDC codebase to use OpenTelemetry v1.11,
which stabilizes the metrics API. This stabilization brings quite a few
breaking changes, so significant changes are necessary inside the OpenDC
codebase.
Diffstat (limited to 'opendc-workflow')
2 files changed, 48 insertions, 5 deletions
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 214d5135..1fd332b9 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -33,6 +33,7 @@ import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.VCpuWeigher import org.opendc.compute.workload.ComputeServiceHelper +import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.HostSpec import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider import org.opendc.simulator.compute.model.MachineModel @@ -70,7 +71,8 @@ internal class WorkflowServiceTest { filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) ) - val computeHelper = ComputeServiceHelper(coroutineContext, clock, computeScheduler, schedulingQuantum = Duration.ofSeconds(1)) + + val computeHelper = ComputeServiceHelper(coroutineContext, clock, NoopTelemetryManager(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1)) repeat(HOST_COUNT) { computeHelper.registerHost(createHostSpec(it)) } diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt index 0198900f..a7d0ed6c 100644 --- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt @@ -22,8 +22,13 @@ package org.opendc.workflow.workload +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.data.AggregationTemporality import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.metrics.export.MetricReader +import io.opentelemetry.sdk.metrics.export.MetricReaderFactory import io.opentelemetry.sdk.resources.Resource import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import kotlinx.coroutines.coroutineScope @@ -34,6 +39,7 @@ import org.opendc.telemetry.sdk.toOtelClock import org.opendc.workflow.api.Job import org.opendc.workflow.service.WorkflowService import java.time.Clock +import java.util.* import kotlin.coroutines.CoroutineContext /** @@ -58,23 +64,47 @@ public class WorkflowServiceHelper( /** * The [MetricProducer] exposed by the [WorkflowService]. */ - public val metricProducer: MetricProducer + public lateinit var metricProducer: MetricProducer + private set + + /** + * The [MeterProvider] used for the service. + */ + private val _meterProvider: SdkMeterProvider + + /** + * The list of [MetricReader]s that have been registered with the runner. + */ + private val _metricReaders = mutableListOf<MetricReader>() init { val resource = Resource.builder() .put(ResourceAttributes.SERVICE_NAME, "opendc-workflow") .build() - val meterProvider = SdkMeterProvider.builder() + _meterProvider = SdkMeterProvider.builder() .setClock(clock.toOtelClock()) .setResource(resource) + .registerMetricReader { producer -> + metricProducer = producer + + val metricReaders = _metricReaders + object : MetricReader { + override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE + override fun flush(): CompletableResultCode { + return CompletableResultCode.ofAll(metricReaders.map { it.flush() }) + } + override fun shutdown(): CompletableResultCode { + return CompletableResultCode.ofAll(metricReaders.map { it.shutdown() }) + } + } + } .build() - metricProducer = meterProvider service = WorkflowService( context, clock, - meterProvider, + _meterProvider, computeClient, schedulerSpec.schedulingQuantum, jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy, @@ -116,8 +146,19 @@ public class WorkflowServiceHelper( } } + /** + * Register a [MetricReader] for this helper. + * + * @param factory The factory for the reader to register. + */ + public fun registerMetricReader(factory: MetricReaderFactory) { + val reader = factory.apply(metricProducer) + _metricReaders.add(reader) + } + override fun close() { computeClient.close() service.close() + _meterProvider.close() } } |
