summaryrefslogtreecommitdiff
path: root/opendc-workflow
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-09 16:10:00 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-02-15 18:08:37 +0100
commit02c215ad57e1e4d56c54d22be58e1845bdeebf25 (patch)
tree7794b53ca3bb6fa197a118cee92114135be15def /opendc-workflow
parent48c04fb74ee170f58f292b077c62b4da237f507e (diff)
refactor: Update OpenTelemetry to version 1.11
This change updates the OpenDC codebase to use OpenTelemetry v1.11, which stabilizes the metrics API. This stabilization brings quite a few breaking changes, so significant changes are necessary inside the OpenDC codebase.
Diffstat (limited to 'opendc-workflow')
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt4
-rw-r--r--opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt49
2 files changed, 48 insertions, 5 deletions
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index 214d5135..1fd332b9 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -33,6 +33,7 @@ import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.VCpuWeigher
import org.opendc.compute.workload.ComputeServiceHelper
+import org.opendc.compute.workload.telemetry.NoopTelemetryManager
import org.opendc.compute.workload.topology.HostSpec
import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider
import org.opendc.simulator.compute.model.MachineModel
@@ -70,7 +71,8 @@ internal class WorkflowServiceTest {
filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0))
)
- val computeHelper = ComputeServiceHelper(coroutineContext, clock, computeScheduler, schedulingQuantum = Duration.ofSeconds(1))
+
+ val computeHelper = ComputeServiceHelper(coroutineContext, clock, NoopTelemetryManager(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1))
repeat(HOST_COUNT) { computeHelper.registerHost(createHostSpec(it)) }
diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
index 0198900f..a7d0ed6c 100644
--- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
+++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
@@ -22,8 +22,13 @@
package org.opendc.workflow.workload
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality
import io.opentelemetry.sdk.metrics.export.MetricProducer
+import io.opentelemetry.sdk.metrics.export.MetricReader
+import io.opentelemetry.sdk.metrics.export.MetricReaderFactory
import io.opentelemetry.sdk.resources.Resource
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import kotlinx.coroutines.coroutineScope
@@ -34,6 +39,7 @@ import org.opendc.telemetry.sdk.toOtelClock
import org.opendc.workflow.api.Job
import org.opendc.workflow.service.WorkflowService
import java.time.Clock
+import java.util.*
import kotlin.coroutines.CoroutineContext
/**
@@ -58,23 +64,47 @@ public class WorkflowServiceHelper(
/**
* The [MetricProducer] exposed by the [WorkflowService].
*/
- public val metricProducer: MetricProducer
+ public lateinit var metricProducer: MetricProducer
+ private set
+
+ /**
+ * The [MeterProvider] used for the service.
+ */
+ private val _meterProvider: SdkMeterProvider
+
+ /**
+ * The list of [MetricReader]s that have been registered with the runner.
+ */
+ private val _metricReaders = mutableListOf<MetricReader>()
init {
val resource = Resource.builder()
.put(ResourceAttributes.SERVICE_NAME, "opendc-workflow")
.build()
- val meterProvider = SdkMeterProvider.builder()
+ _meterProvider = SdkMeterProvider.builder()
.setClock(clock.toOtelClock())
.setResource(resource)
+ .registerMetricReader { producer ->
+ metricProducer = producer
+
+ val metricReaders = _metricReaders
+ object : MetricReader {
+ override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE
+ override fun flush(): CompletableResultCode {
+ return CompletableResultCode.ofAll(metricReaders.map { it.flush() })
+ }
+ override fun shutdown(): CompletableResultCode {
+ return CompletableResultCode.ofAll(metricReaders.map { it.shutdown() })
+ }
+ }
+ }
.build()
- metricProducer = meterProvider
service = WorkflowService(
context,
clock,
- meterProvider,
+ _meterProvider,
computeClient,
schedulerSpec.schedulingQuantum,
jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy,
@@ -116,8 +146,19 @@ public class WorkflowServiceHelper(
}
}
+ /**
+ * Register a [MetricReader] for this helper.
+ *
+ * @param factory The factory for the reader to register.
+ */
+ public fun registerMetricReader(factory: MetricReaderFactory) {
+ val reader = factory.apply(metricProducer)
+ _metricReaders.add(reader)
+ }
+
override fun close() {
computeClient.close()
service.close()
+ _meterProvider.close()
}
}