summaryrefslogtreecommitdiff
path: root/opendc-workflow/opendc-workflow-workload
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-06 19:04:03 +0200
committerGitHub <noreply@github.com>2022-05-06 19:04:03 +0200
commitc3d8d967f82f39f1ef461d5687eb68fb867336c5 (patch)
tree2e9938f63c42e5d02fe203e049377d1d17b5d782 /opendc-workflow/opendc-workflow-workload
parenta9657e4fa3b15e2c1c11884b5a250b0861bcc21d (diff)
parent260e2228afea08868e8f7f07233b1861b2d7f0c7 (diff)
merge: Move OpenTelemetry integration outside core modules (#81)
This change removes the OpenTelemetry integration from the OpenDC modules. Previously, we chose to integrate OpenTelemetry to provide a unified way to report metrics to the users. Although this worked as expected, the overhead of the OpenTelemetry when collecting metrics during simulation was considerable and lacked more optimization opportunities (other than providing a separate API implementation). Furthermore, since we were tied to OpenTelemetry's SDK implementation, we experienced issues with throttling and registering multiple instruments. We will instead use another approach, where we expose the core metrics in OpenDC via specialized interfaces (see #80) such that access is fast and can be done without having to interface with OpenTelemetry. In addition, we will provide an adapter to that is able to forward these metrics to OpenTelemetry implementations, so we can still integrate with the wider ecosystem. ## Implementation Notes :hammer_and_pick: * Remove OpenTelemetry from "compute" modules * Remove OpenTelemetry from "workflow" modules * Remove OpenTelemetry from "FaaS" modules * Remove OpenTelemetry from TF20 experiment * Remove dependency on OpenTelemetry SDK ## External Dependencies :four_leaf_clover: * N/A ## Breaking API Changes :warning: * Metrics are not anymore directly exposed via OpenTelemetry. Instead, an adapter needs to be used to access the data via OpenTelemetry.
Diffstat (limited to 'opendc-workflow/opendc-workflow-workload')
-rw-r--r--opendc-workflow/opendc-workflow-workload/build.gradle.kts2
-rw-r--r--opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt86
2 files changed, 10 insertions, 78 deletions
diff --git a/opendc-workflow/opendc-workflow-workload/build.gradle.kts b/opendc-workflow/opendc-workflow-workload/build.gradle.kts
index b725a69c..17eadf29 100644
--- a/opendc-workflow/opendc-workflow-workload/build.gradle.kts
+++ b/opendc-workflow/opendc-workflow-workload/build.gradle.kts
@@ -32,6 +32,4 @@ dependencies {
implementation(projects.opendcSimulator.opendcSimulatorCompute)
implementation(projects.opendcTrace.opendcTraceApi)
- implementation(projects.opendcTelemetry.opendcTelemetrySdk)
- implementation(libs.opentelemetry.semconv)
}
diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
index a7d0ed6c..435d0190 100644
--- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
+++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
@@ -22,24 +22,13 @@
package org.opendc.workflow.workload
-import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.common.CompletableResultCode
-import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.data.AggregationTemporality
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import io.opentelemetry.sdk.metrics.export.MetricReader
-import io.opentelemetry.sdk.metrics.export.MetricReaderFactory
-import io.opentelemetry.sdk.resources.Resource
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import org.opendc.compute.api.ComputeClient
-import org.opendc.telemetry.sdk.toOtelClock
import org.opendc.workflow.api.Job
import org.opendc.workflow.service.WorkflowService
import java.time.Clock
-import java.util.*
import kotlin.coroutines.CoroutineContext
/**
@@ -59,60 +48,16 @@ public class WorkflowServiceHelper(
/**
* The [WorkflowService] that is constructed by this runner.
*/
- public val service: WorkflowService
-
- /**
- * The [MetricProducer] exposed by the [WorkflowService].
- */
- public lateinit var metricProducer: MetricProducer
- private set
-
- /**
- * The [MeterProvider] used for the service.
- */
- private val _meterProvider: SdkMeterProvider
-
- /**
- * The list of [MetricReader]s that have been registered with the runner.
- */
- private val _metricReaders = mutableListOf<MetricReader>()
-
- init {
- val resource = Resource.builder()
- .put(ResourceAttributes.SERVICE_NAME, "opendc-workflow")
- .build()
-
- _meterProvider = SdkMeterProvider.builder()
- .setClock(clock.toOtelClock())
- .setResource(resource)
- .registerMetricReader { producer ->
- metricProducer = producer
-
- val metricReaders = _metricReaders
- object : MetricReader {
- override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE
- override fun flush(): CompletableResultCode {
- return CompletableResultCode.ofAll(metricReaders.map { it.flush() })
- }
- override fun shutdown(): CompletableResultCode {
- return CompletableResultCode.ofAll(metricReaders.map { it.shutdown() })
- }
- }
- }
- .build()
-
- service = WorkflowService(
- context,
- clock,
- _meterProvider,
- computeClient,
- schedulerSpec.schedulingQuantum,
- jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy,
- jobOrderPolicy = schedulerSpec.jobOrderPolicy,
- taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy,
- taskOrderPolicy = schedulerSpec.taskOrderPolicy,
- )
- }
+ public val service: WorkflowService = WorkflowService(
+ context,
+ clock,
+ computeClient,
+ schedulerSpec.schedulingQuantum,
+ jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy,
+ jobOrderPolicy = schedulerSpec.jobOrderPolicy,
+ taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy,
+ taskOrderPolicy = schedulerSpec.taskOrderPolicy,
+ )
/**
* Run the specified list of [jobs] using the workflow service and suspend execution until all jobs have
@@ -146,19 +91,8 @@ public class WorkflowServiceHelper(
}
}
- /**
- * Register a [MetricReader] for this helper.
- *
- * @param factory The factory for the reader to register.
- */
- public fun registerMetricReader(factory: MetricReaderFactory) {
- val reader = factory.apply(metricProducer)
- _metricReaders.add(reader)
- }
-
override fun close() {
computeClient.close()
service.close()
- _meterProvider.close()
}
}