From b82ae73d064590094f79e26de355060135ed13fd Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 5 May 2022 11:33:47 +0200 Subject: refactor(workflow/service): Remove OpenTelemetry from "workflow" modules This change removes the OpenTelemetry integration from the OpenDC Workflow modules. Previously, we chose to integrate OpenTelemetry to provide a unified way to report metrics to the users. See the previous commit removing it from the "Compute" modules for the reasoning behind this change. --- .../opendc-workflow-workload/build.gradle.kts | 2 - .../workflow/workload/WorkflowServiceHelper.kt | 86 +++------------------- 2 files changed, 10 insertions(+), 78 deletions(-) (limited to 'opendc-workflow/opendc-workflow-workload') diff --git a/opendc-workflow/opendc-workflow-workload/build.gradle.kts b/opendc-workflow/opendc-workflow-workload/build.gradle.kts index b725a69c..17eadf29 100644 --- a/opendc-workflow/opendc-workflow-workload/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-workload/build.gradle.kts @@ -32,6 +32,4 @@ dependencies { implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcTrace.opendcTraceApi) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(libs.opentelemetry.semconv) } diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt index a7d0ed6c..435d0190 100644 --- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt @@ -22,24 +22,13 @@ package org.opendc.workflow.workload -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.data.AggregationTemporality -import io.opentelemetry.sdk.metrics.export.MetricProducer -import io.opentelemetry.sdk.metrics.export.MetricReader -import io.opentelemetry.sdk.metrics.export.MetricReaderFactory -import io.opentelemetry.sdk.resources.Resource -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import org.opendc.compute.api.ComputeClient -import org.opendc.telemetry.sdk.toOtelClock import org.opendc.workflow.api.Job import org.opendc.workflow.service.WorkflowService import java.time.Clock -import java.util.* import kotlin.coroutines.CoroutineContext /** @@ -59,60 +48,16 @@ public class WorkflowServiceHelper( /** * The [WorkflowService] that is constructed by this runner. */ - public val service: WorkflowService - - /** - * The [MetricProducer] exposed by the [WorkflowService]. - */ - public lateinit var metricProducer: MetricProducer - private set - - /** - * The [MeterProvider] used for the service. - */ - private val _meterProvider: SdkMeterProvider - - /** - * The list of [MetricReader]s that have been registered with the runner. - */ - private val _metricReaders = mutableListOf() - - init { - val resource = Resource.builder() - .put(ResourceAttributes.SERVICE_NAME, "opendc-workflow") - .build() - - _meterProvider = SdkMeterProvider.builder() - .setClock(clock.toOtelClock()) - .setResource(resource) - .registerMetricReader { producer -> - metricProducer = producer - - val metricReaders = _metricReaders - object : MetricReader { - override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE - override fun flush(): CompletableResultCode { - return CompletableResultCode.ofAll(metricReaders.map { it.flush() }) - } - override fun shutdown(): CompletableResultCode { - return CompletableResultCode.ofAll(metricReaders.map { it.shutdown() }) - } - } - } - .build() - - service = WorkflowService( - context, - clock, - _meterProvider, - computeClient, - schedulerSpec.schedulingQuantum, - jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy, - jobOrderPolicy = schedulerSpec.jobOrderPolicy, - taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy, - taskOrderPolicy = schedulerSpec.taskOrderPolicy, - ) - } + public val service: WorkflowService = WorkflowService( + context, + clock, + computeClient, + schedulerSpec.schedulingQuantum, + jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy, + jobOrderPolicy = schedulerSpec.jobOrderPolicy, + taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy, + taskOrderPolicy = schedulerSpec.taskOrderPolicy, + ) /** * Run the specified list of [jobs] using the workflow service and suspend execution until all jobs have @@ -146,19 +91,8 @@ public class WorkflowServiceHelper( } } - /** - * Register a [MetricReader] for this helper. - * - * @param factory The factory for the reader to register. - */ - public fun registerMetricReader(factory: MetricReaderFactory) { - val reader = factory.apply(metricProducer) - _metricReaders.add(reader) - } - override fun close() { computeClient.close() service.close() - _meterProvider.close() } } -- cgit v1.2.3