summaryrefslogtreecommitdiff
path: root/opendc-workflow/opendc-workflow-workload
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-05 11:33:47 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-06 18:37:36 +0200
commitb82ae73d064590094f79e26de355060135ed13fd (patch)
tree9140865f68e8f5864b360bb405ce1e0d0e98a5a4 /opendc-workflow/opendc-workflow-workload
parentc7eec7904e08029b3ab31d3e7b21afa1ea9ab7e6 (diff)
refactor(workflow/service): Remove OpenTelemetry from "workflow" modules
This change removes the OpenTelemetry integration from the OpenDC Workflow modules. Previously, we chose to integrate OpenTelemetry to provide a unified way to report metrics to the users. See the previous commit removing it from the "Compute" modules for the reasoning behind this change.
Diffstat (limited to 'opendc-workflow/opendc-workflow-workload')
-rw-r--r--opendc-workflow/opendc-workflow-workload/build.gradle.kts2
-rw-r--r--opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt86
2 files changed, 10 insertions, 78 deletions
diff --git a/opendc-workflow/opendc-workflow-workload/build.gradle.kts b/opendc-workflow/opendc-workflow-workload/build.gradle.kts
index b725a69c..17eadf29 100644
--- a/opendc-workflow/opendc-workflow-workload/build.gradle.kts
+++ b/opendc-workflow/opendc-workflow-workload/build.gradle.kts
@@ -32,6 +32,4 @@ dependencies {
implementation(projects.opendcSimulator.opendcSimulatorCompute)
implementation(projects.opendcTrace.opendcTraceApi)
- implementation(projects.opendcTelemetry.opendcTelemetrySdk)
- implementation(libs.opentelemetry.semconv)
}
diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
index a7d0ed6c..435d0190 100644
--- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
+++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
@@ -22,24 +22,13 @@
package org.opendc.workflow.workload
-import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.common.CompletableResultCode
-import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.data.AggregationTemporality
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import io.opentelemetry.sdk.metrics.export.MetricReader
-import io.opentelemetry.sdk.metrics.export.MetricReaderFactory
-import io.opentelemetry.sdk.resources.Resource
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import org.opendc.compute.api.ComputeClient
-import org.opendc.telemetry.sdk.toOtelClock
import org.opendc.workflow.api.Job
import org.opendc.workflow.service.WorkflowService
import java.time.Clock
-import java.util.*
import kotlin.coroutines.CoroutineContext
/**
@@ -59,60 +48,16 @@ public class WorkflowServiceHelper(
/**
* The [WorkflowService] that is constructed by this runner.
*/
- public val service: WorkflowService
-
- /**
- * The [MetricProducer] exposed by the [WorkflowService].
- */
- public lateinit var metricProducer: MetricProducer
- private set
-
- /**
- * The [MeterProvider] used for the service.
- */
- private val _meterProvider: SdkMeterProvider
-
- /**
- * The list of [MetricReader]s that have been registered with the runner.
- */
- private val _metricReaders = mutableListOf<MetricReader>()
-
- init {
- val resource = Resource.builder()
- .put(ResourceAttributes.SERVICE_NAME, "opendc-workflow")
- .build()
-
- _meterProvider = SdkMeterProvider.builder()
- .setClock(clock.toOtelClock())
- .setResource(resource)
- .registerMetricReader { producer ->
- metricProducer = producer
-
- val metricReaders = _metricReaders
- object : MetricReader {
- override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE
- override fun flush(): CompletableResultCode {
- return CompletableResultCode.ofAll(metricReaders.map { it.flush() })
- }
- override fun shutdown(): CompletableResultCode {
- return CompletableResultCode.ofAll(metricReaders.map { it.shutdown() })
- }
- }
- }
- .build()
-
- service = WorkflowService(
- context,
- clock,
- _meterProvider,
- computeClient,
- schedulerSpec.schedulingQuantum,
- jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy,
- jobOrderPolicy = schedulerSpec.jobOrderPolicy,
- taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy,
- taskOrderPolicy = schedulerSpec.taskOrderPolicy,
- )
- }
+ public val service: WorkflowService = WorkflowService(
+ context,
+ clock,
+ computeClient,
+ schedulerSpec.schedulingQuantum,
+ jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy,
+ jobOrderPolicy = schedulerSpec.jobOrderPolicy,
+ taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy,
+ taskOrderPolicy = schedulerSpec.taskOrderPolicy,
+ )
/**
* Run the specified list of [jobs] using the workflow service and suspend execution until all jobs have
@@ -146,19 +91,8 @@ public class WorkflowServiceHelper(
}
}
- /**
- * Register a [MetricReader] for this helper.
- *
- * @param factory The factory for the reader to register.
- */
- public fun registerMetricReader(factory: MetricReaderFactory) {
- val reader = factory.apply(metricProducer)
- _metricReaders.add(reader)
- }
-
override fun close() {
computeClient.close()
service.close()
- _meterProvider.close()
}
}