diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-05 11:33:47 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-06 18:37:36 +0200 |
| commit | b82ae73d064590094f79e26de355060135ed13fd (patch) | |
| tree | 9140865f68e8f5864b360bb405ce1e0d0e98a5a4 | |
| parent | c7eec7904e08029b3ab31d3e7b21afa1ea9ab7e6 (diff) | |
refactor(workflow/service): Remove OpenTelemetry from "workflow" modules
This change removes the OpenTelemetry integration from the OpenDC
Workflow modules. Previously, we chose to integrate OpenTelemetry to
provide a unified way to report metrics to the users.
See the previous commit removing it from the "Compute" modules for the
reasoning behind this change.
| -rw-r--r-- | opendc-workflow/opendc-workflow-service/build.gradle.kts | 2 | ||||
| -rw-r--r-- | opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt | 7 | ||||
| -rw-r--r-- | opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt | 67 | ||||
| -rw-r--r-- | opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt (renamed from opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt) | 2 | ||||
| -rw-r--r-- | opendc-workflow/opendc-workflow-workload/build.gradle.kts | 2 | ||||
| -rw-r--r-- | opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt | 86 |
6 files changed, 15 insertions, 151 deletions
diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index 60b5eb13..b6365885 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -30,7 +30,6 @@ plugins { dependencies { api(projects.opendcWorkflow.opendcWorkflowApi) api(projects.opendcCompute.opendcComputeApi) - api(projects.opendcTelemetry.opendcTelemetryApi) implementation(projects.opendcCommon) implementation(libs.kotlin.logging) @@ -38,7 +37,6 @@ dependencies { testImplementation(projects.opendcCompute.opendcComputeWorkload) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testImplementation(projects.opendcTrace.opendcTraceApi) - testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) testRuntimeOnly(projects.opendcTrace.opendcTraceGwf) testRuntimeOnly(libs.log4j.slf4j) } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index b8bc0e33..2436c387 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -22,7 +22,6 @@ package org.opendc.workflow.service -import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient import org.opendc.workflow.api.Job import org.opendc.workflow.service.internal.WorkflowServiceImpl @@ -30,7 +29,7 @@ import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy -import org.opendc.workflow.service.telemetry.SchedulerStats +import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats import java.time.Clock import java.time.Duration import kotlin.coroutines.CoroutineContext @@ -63,7 +62,7 @@ public interface WorkflowService : AutoCloseable { * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. * @param meterProvider The meter provider to use. - * @param compute The compute client to use. + * @param compute The "Compute" client to use. * @param schedulingQuantum The scheduling quantum to use (minimum duration between scheduling cycles). * @param jobAdmissionPolicy The job admission policy to use. * @param jobOrderPolicy The job order policy to use. @@ -73,7 +72,6 @@ public interface WorkflowService : AutoCloseable { public operator fun invoke( context: CoroutineContext, clock: Clock, - meterProvider: MeterProvider, compute: ComputeClient, schedulingQuantum: Duration, jobAdmissionPolicy: JobAdmissionPolicy, @@ -84,7 +82,6 @@ public interface WorkflowService : AutoCloseable { return WorkflowServiceImpl( context, clock, - meterProvider, compute, schedulingQuantum, jobAdmissionPolicy, diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 9c7f18a2..899810a2 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -22,8 +22,6 @@ package org.opendc.workflow.service.internal -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import org.opendc.common.util.Pacer import org.opendc.compute.api.* @@ -34,7 +32,7 @@ import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy -import org.opendc.workflow.service.telemetry.SchedulerStats +import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats import java.time.Clock import java.time.Duration import java.util.* @@ -48,9 +46,8 @@ import kotlin.coroutines.resume public class WorkflowServiceImpl( context: CoroutineContext, private val clock: Clock, - meterProvider: MeterProvider, private val computeClient: ComputeClient, - private val schedulingQuantum: Duration, + schedulingQuantum: Duration, jobAdmissionPolicy: JobAdmissionPolicy, jobOrderPolicy: JobOrderPolicy, taskEligibilityPolicy: TaskEligibilityPolicy, @@ -62,11 +59,6 @@ public class WorkflowServiceImpl( private val scope = CoroutineScope(context + Job()) /** - * The [Meter] to collect metrics of this service. - */ - private val meter = meterProvider.get("org.opendc.workflow.service") - - /** * The incoming jobs ready to be processed by the scheduler. */ private val incomingJobs: MutableSet<JobState> = linkedSetOf() @@ -139,58 +131,11 @@ public class WorkflowServiceImpl( } } - /** - * The number of jobs that have been submitted to the service. - */ - private val submittedJobs = meter.counterBuilder("jobs.submitted") - .setDescription("Number of submitted jobs") - .setUnit("1") - .build() private var _workflowsSubmitted: Int = 0 - - /** - * The number of jobs that are running. - */ - private val runningJobs = meter.upDownCounterBuilder("jobs.active") - .setDescription("Number of jobs running") - .setUnit("1") - .build() private var _workflowsRunning: Int = 0 - - /** - * The number of jobs that have finished running. - */ - private val finishedJobs = meter.counterBuilder("jobs.finished") - .setDescription("Number of jobs that finished running") - .setUnit("1") - .build() private var _workflowsFinished: Int = 0 - - /** - * The number of tasks that have been submitted to the service. - */ - private val submittedTasks = meter.counterBuilder("tasks.submitted") - .setDescription("Number of submitted tasks") - .setUnit("1") - .build() private var _tasksSubmitted: Int = 0 - - /** - * The number of jobs that are running. - */ - private val runningTasks = meter.upDownCounterBuilder("tasks.active") - .setDescription("Number of tasks running") - .setUnit("1") - .build() private var _tasksRunning: Int = 0 - - /** - * The number of jobs that have finished running. - */ - private val finishedTasks = meter.counterBuilder("tasks.finished") - .setDescription("Number of tasks that finished running") - .setUnit("1") - .build() private var _tasksFinished: Int = 0 /** @@ -229,14 +174,12 @@ public class WorkflowServiceImpl( instance.state = TaskStatus.READY } - submittedTasks.add(1) _tasksSubmitted++ } instances.values.toCollection(jobInstance.tasks) incomingJobs += jobInstance rootListener.jobSubmitted(jobInstance) - submittedJobs.add(1) _workflowsSubmitted++ pacer.enqueue() @@ -283,7 +226,6 @@ public class WorkflowServiceImpl( jobQueue.add(jobInstance) activeJobs += jobInstance - runningJobs.add(1) _workflowsRunning++ rootListener.jobStarted(jobInstance) } @@ -363,7 +305,6 @@ public class WorkflowServiceImpl( ServerState.RUNNING -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() - runningTasks.add(1) _tasksRunning++ rootListener.taskStarted(task) } @@ -381,8 +322,6 @@ public class WorkflowServiceImpl( job.tasks.remove(task) activeTasks -= task - runningTasks.add(-1) - finishedTasks.add(1) _tasksRunning-- _tasksFinished++ rootListener.taskFinished(task) @@ -410,8 +349,6 @@ public class WorkflowServiceImpl( private fun finishJob(job: JobState) { activeJobs -= job - runningJobs.add(-1) - finishedJobs.add(1) _workflowsRunning-- _workflowsFinished++ rootListener.jobFinished(job) diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt index 7c7d7c4d..608e82df 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.workflow.service.telemetry +package org.opendc.workflow.service.scheduler.telemetry /** * Statistics about the workflow scheduler. diff --git a/opendc-workflow/opendc-workflow-workload/build.gradle.kts b/opendc-workflow/opendc-workflow-workload/build.gradle.kts index b725a69c..17eadf29 100644 --- a/opendc-workflow/opendc-workflow-workload/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-workload/build.gradle.kts @@ -32,6 +32,4 @@ dependencies { implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcTrace.opendcTraceApi) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(libs.opentelemetry.semconv) } diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt index a7d0ed6c..435d0190 100644 --- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt @@ -22,24 +22,13 @@ package org.opendc.workflow.workload -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.data.AggregationTemporality -import io.opentelemetry.sdk.metrics.export.MetricProducer -import io.opentelemetry.sdk.metrics.export.MetricReader -import io.opentelemetry.sdk.metrics.export.MetricReaderFactory -import io.opentelemetry.sdk.resources.Resource -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import org.opendc.compute.api.ComputeClient -import org.opendc.telemetry.sdk.toOtelClock import org.opendc.workflow.api.Job import org.opendc.workflow.service.WorkflowService import java.time.Clock -import java.util.* import kotlin.coroutines.CoroutineContext /** @@ -59,60 +48,16 @@ public class WorkflowServiceHelper( /** * The [WorkflowService] that is constructed by this runner. */ - public val service: WorkflowService - - /** - * The [MetricProducer] exposed by the [WorkflowService]. - */ - public lateinit var metricProducer: MetricProducer - private set - - /** - * The [MeterProvider] used for the service. - */ - private val _meterProvider: SdkMeterProvider - - /** - * The list of [MetricReader]s that have been registered with the runner. - */ - private val _metricReaders = mutableListOf<MetricReader>() - - init { - val resource = Resource.builder() - .put(ResourceAttributes.SERVICE_NAME, "opendc-workflow") - .build() - - _meterProvider = SdkMeterProvider.builder() - .setClock(clock.toOtelClock()) - .setResource(resource) - .registerMetricReader { producer -> - metricProducer = producer - - val metricReaders = _metricReaders - object : MetricReader { - override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE - override fun flush(): CompletableResultCode { - return CompletableResultCode.ofAll(metricReaders.map { it.flush() }) - } - override fun shutdown(): CompletableResultCode { - return CompletableResultCode.ofAll(metricReaders.map { it.shutdown() }) - } - } - } - .build() - - service = WorkflowService( - context, - clock, - _meterProvider, - computeClient, - schedulerSpec.schedulingQuantum, - jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy, - jobOrderPolicy = schedulerSpec.jobOrderPolicy, - taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy, - taskOrderPolicy = schedulerSpec.taskOrderPolicy, - ) - } + public val service: WorkflowService = WorkflowService( + context, + clock, + computeClient, + schedulerSpec.schedulingQuantum, + jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy, + jobOrderPolicy = schedulerSpec.jobOrderPolicy, + taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy, + taskOrderPolicy = schedulerSpec.taskOrderPolicy, + ) /** * Run the specified list of [jobs] using the workflow service and suspend execution until all jobs have @@ -146,19 +91,8 @@ public class WorkflowServiceHelper( } } - /** - * Register a [MetricReader] for this helper. - * - * @param factory The factory for the reader to register. - */ - public fun registerMetricReader(factory: MetricReaderFactory) { - val reader = factory.apply(metricProducer) - _metricReaders.add(reader) - } - override fun close() { computeClient.close() service.close() - _meterProvider.close() } } |
