summaryrefslogtreecommitdiff
path: root/opendc-workflow/opendc-workflow-workload
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-09 16:10:00 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-02-15 18:08:37 +0100
commit02c215ad57e1e4d56c54d22be58e1845bdeebf25 (patch)
tree7794b53ca3bb6fa197a118cee92114135be15def /opendc-workflow/opendc-workflow-workload
parent48c04fb74ee170f58f292b077c62b4da237f507e (diff)
refactor: Update OpenTelemetry to version 1.11
This change updates the OpenDC codebase to use OpenTelemetry v1.11, which stabilizes the metrics API. This stabilization brings quite a few breaking changes, so significant changes are necessary inside the OpenDC codebase.
Diffstat (limited to 'opendc-workflow/opendc-workflow-workload')
-rw-r--r--opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt49
1 files changed, 45 insertions, 4 deletions
diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
index 0198900f..a7d0ed6c 100644
--- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
+++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
@@ -22,8 +22,13 @@
package org.opendc.workflow.workload
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality
import io.opentelemetry.sdk.metrics.export.MetricProducer
+import io.opentelemetry.sdk.metrics.export.MetricReader
+import io.opentelemetry.sdk.metrics.export.MetricReaderFactory
import io.opentelemetry.sdk.resources.Resource
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import kotlinx.coroutines.coroutineScope
@@ -34,6 +39,7 @@ import org.opendc.telemetry.sdk.toOtelClock
import org.opendc.workflow.api.Job
import org.opendc.workflow.service.WorkflowService
import java.time.Clock
+import java.util.*
import kotlin.coroutines.CoroutineContext
/**
@@ -58,23 +64,47 @@ public class WorkflowServiceHelper(
/**
* The [MetricProducer] exposed by the [WorkflowService].
*/
- public val metricProducer: MetricProducer
+ public lateinit var metricProducer: MetricProducer
+ private set
+
+ /**
+ * The [MeterProvider] used for the service.
+ */
+ private val _meterProvider: SdkMeterProvider
+
+ /**
+ * The list of [MetricReader]s that have been registered with the runner.
+ */
+ private val _metricReaders = mutableListOf<MetricReader>()
init {
val resource = Resource.builder()
.put(ResourceAttributes.SERVICE_NAME, "opendc-workflow")
.build()
- val meterProvider = SdkMeterProvider.builder()
+ _meterProvider = SdkMeterProvider.builder()
.setClock(clock.toOtelClock())
.setResource(resource)
+ .registerMetricReader { producer ->
+ metricProducer = producer
+
+ val metricReaders = _metricReaders
+ object : MetricReader {
+ override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE
+ override fun flush(): CompletableResultCode {
+ return CompletableResultCode.ofAll(metricReaders.map { it.flush() })
+ }
+ override fun shutdown(): CompletableResultCode {
+ return CompletableResultCode.ofAll(metricReaders.map { it.shutdown() })
+ }
+ }
+ }
.build()
- metricProducer = meterProvider
service = WorkflowService(
context,
clock,
- meterProvider,
+ _meterProvider,
computeClient,
schedulerSpec.schedulingQuantum,
jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy,
@@ -116,8 +146,19 @@ public class WorkflowServiceHelper(
}
}
+ /**
+ * Register a [MetricReader] for this helper.
+ *
+ * @param factory The factory for the reader to register.
+ */
+ public fun registerMetricReader(factory: MetricReaderFactory) {
+ val reader = factory.apply(metricProducer)
+ _metricReaders.add(reader)
+ }
+
override fun close() {
computeClient.close()
service.close()
+ _meterProvider.close()
}
}