summaryrefslogtreecommitdiff
path: root/opendc-workflow
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-06 19:04:03 +0200
committerGitHub <noreply@github.com>2022-05-06 19:04:03 +0200
commitc3d8d967f82f39f1ef461d5687eb68fb867336c5 (patch)
tree2e9938f63c42e5d02fe203e049377d1d17b5d782 /opendc-workflow
parenta9657e4fa3b15e2c1c11884b5a250b0861bcc21d (diff)
parent260e2228afea08868e8f7f07233b1861b2d7f0c7 (diff)
merge: Move OpenTelemetry integration outside core modules (#81)
This change removes the OpenTelemetry integration from the OpenDC modules. Previously, we chose to integrate OpenTelemetry to provide a unified way to report metrics to the users. Although this worked as expected, the overhead of the OpenTelemetry when collecting metrics during simulation was considerable and lacked more optimization opportunities (other than providing a separate API implementation). Furthermore, since we were tied to OpenTelemetry's SDK implementation, we experienced issues with throttling and registering multiple instruments. We will instead use another approach, where we expose the core metrics in OpenDC via specialized interfaces (see #80) such that access is fast and can be done without having to interface with OpenTelemetry. In addition, we will provide an adapter to that is able to forward these metrics to OpenTelemetry implementations, so we can still integrate with the wider ecosystem. ## Implementation Notes :hammer_and_pick: * Remove OpenTelemetry from "compute" modules * Remove OpenTelemetry from "workflow" modules * Remove OpenTelemetry from "FaaS" modules * Remove OpenTelemetry from TF20 experiment * Remove dependency on OpenTelemetry SDK ## External Dependencies :four_leaf_clover: * N/A ## Breaking API Changes :warning: * Metrics are not anymore directly exposed via OpenTelemetry. Instead, an adapter needs to be used to access the data via OpenTelemetry.
Diffstat (limited to 'opendc-workflow')
-rw-r--r--opendc-workflow/opendc-workflow-service/build.gradle.kts2
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt7
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt67
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt (renamed from opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt)2
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt8
-rw-r--r--opendc-workflow/opendc-workflow-workload/build.gradle.kts2
-rw-r--r--opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt86
7 files changed, 21 insertions, 153 deletions
diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts
index 60b5eb13..b6365885 100644
--- a/opendc-workflow/opendc-workflow-service/build.gradle.kts
+++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts
@@ -30,7 +30,6 @@ plugins {
dependencies {
api(projects.opendcWorkflow.opendcWorkflowApi)
api(projects.opendcCompute.opendcComputeApi)
- api(projects.opendcTelemetry.opendcTelemetryApi)
implementation(projects.opendcCommon)
implementation(libs.kotlin.logging)
@@ -38,7 +37,6 @@ dependencies {
testImplementation(projects.opendcCompute.opendcComputeWorkload)
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testImplementation(projects.opendcTrace.opendcTraceApi)
- testImplementation(projects.opendcTelemetry.opendcTelemetrySdk)
testRuntimeOnly(projects.opendcTrace.opendcTraceGwf)
testRuntimeOnly(libs.log4j.slf4j)
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
index b8bc0e33..2436c387 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
@@ -22,7 +22,6 @@
package org.opendc.workflow.service
-import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.compute.api.ComputeClient
import org.opendc.workflow.api.Job
import org.opendc.workflow.service.internal.WorkflowServiceImpl
@@ -30,7 +29,7 @@ import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
-import org.opendc.workflow.service.telemetry.SchedulerStats
+import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats
import java.time.Clock
import java.time.Duration
import kotlin.coroutines.CoroutineContext
@@ -63,7 +62,7 @@ public interface WorkflowService : AutoCloseable {
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
* @param meterProvider The meter provider to use.
- * @param compute The compute client to use.
+ * @param compute The "Compute" client to use.
* @param schedulingQuantum The scheduling quantum to use (minimum duration between scheduling cycles).
* @param jobAdmissionPolicy The job admission policy to use.
* @param jobOrderPolicy The job order policy to use.
@@ -73,7 +72,6 @@ public interface WorkflowService : AutoCloseable {
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
- meterProvider: MeterProvider,
compute: ComputeClient,
schedulingQuantum: Duration,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -84,7 +82,6 @@ public interface WorkflowService : AutoCloseable {
return WorkflowServiceImpl(
context,
clock,
- meterProvider,
compute,
schedulingQuantum,
jobAdmissionPolicy,
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index 9c7f18a2..899810a2 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -22,8 +22,6 @@
package org.opendc.workflow.service.internal
-import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import org.opendc.common.util.Pacer
import org.opendc.compute.api.*
@@ -34,7 +32,7 @@ import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
-import org.opendc.workflow.service.telemetry.SchedulerStats
+import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats
import java.time.Clock
import java.time.Duration
import java.util.*
@@ -48,9 +46,8 @@ import kotlin.coroutines.resume
public class WorkflowServiceImpl(
context: CoroutineContext,
private val clock: Clock,
- meterProvider: MeterProvider,
private val computeClient: ComputeClient,
- private val schedulingQuantum: Duration,
+ schedulingQuantum: Duration,
jobAdmissionPolicy: JobAdmissionPolicy,
jobOrderPolicy: JobOrderPolicy,
taskEligibilityPolicy: TaskEligibilityPolicy,
@@ -62,11 +59,6 @@ public class WorkflowServiceImpl(
private val scope = CoroutineScope(context + Job())
/**
- * The [Meter] to collect metrics of this service.
- */
- private val meter = meterProvider.get("org.opendc.workflow.service")
-
- /**
* The incoming jobs ready to be processed by the scheduler.
*/
private val incomingJobs: MutableSet<JobState> = linkedSetOf()
@@ -139,58 +131,11 @@ public class WorkflowServiceImpl(
}
}
- /**
- * The number of jobs that have been submitted to the service.
- */
- private val submittedJobs = meter.counterBuilder("jobs.submitted")
- .setDescription("Number of submitted jobs")
- .setUnit("1")
- .build()
private var _workflowsSubmitted: Int = 0
-
- /**
- * The number of jobs that are running.
- */
- private val runningJobs = meter.upDownCounterBuilder("jobs.active")
- .setDescription("Number of jobs running")
- .setUnit("1")
- .build()
private var _workflowsRunning: Int = 0
-
- /**
- * The number of jobs that have finished running.
- */
- private val finishedJobs = meter.counterBuilder("jobs.finished")
- .setDescription("Number of jobs that finished running")
- .setUnit("1")
- .build()
private var _workflowsFinished: Int = 0
-
- /**
- * The number of tasks that have been submitted to the service.
- */
- private val submittedTasks = meter.counterBuilder("tasks.submitted")
- .setDescription("Number of submitted tasks")
- .setUnit("1")
- .build()
private var _tasksSubmitted: Int = 0
-
- /**
- * The number of jobs that are running.
- */
- private val runningTasks = meter.upDownCounterBuilder("tasks.active")
- .setDescription("Number of tasks running")
- .setUnit("1")
- .build()
private var _tasksRunning: Int = 0
-
- /**
- * The number of jobs that have finished running.
- */
- private val finishedTasks = meter.counterBuilder("tasks.finished")
- .setDescription("Number of tasks that finished running")
- .setUnit("1")
- .build()
private var _tasksFinished: Int = 0
/**
@@ -229,14 +174,12 @@ public class WorkflowServiceImpl(
instance.state = TaskStatus.READY
}
- submittedTasks.add(1)
_tasksSubmitted++
}
instances.values.toCollection(jobInstance.tasks)
incomingJobs += jobInstance
rootListener.jobSubmitted(jobInstance)
- submittedJobs.add(1)
_workflowsSubmitted++
pacer.enqueue()
@@ -283,7 +226,6 @@ public class WorkflowServiceImpl(
jobQueue.add(jobInstance)
activeJobs += jobInstance
- runningJobs.add(1)
_workflowsRunning++
rootListener.jobStarted(jobInstance)
}
@@ -363,7 +305,6 @@ public class WorkflowServiceImpl(
ServerState.RUNNING -> {
val task = taskByServer.getValue(server)
task.startedAt = clock.millis()
- runningTasks.add(1)
_tasksRunning++
rootListener.taskStarted(task)
}
@@ -381,8 +322,6 @@ public class WorkflowServiceImpl(
job.tasks.remove(task)
activeTasks -= task
- runningTasks.add(-1)
- finishedTasks.add(1)
_tasksRunning--
_tasksFinished++
rootListener.taskFinished(task)
@@ -410,8 +349,6 @@ public class WorkflowServiceImpl(
private fun finishJob(job: JobState) {
activeJobs -= job
- runningJobs.add(-1)
- finishedJobs.add(1)
_workflowsRunning--
_workflowsFinished++
rootListener.jobFinished(job)
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt
index 7c7d7c4d..608e82df 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.workflow.service.telemetry
+package org.opendc.workflow.service.scheduler.telemetry
/**
* Statistics about the workflow scheduler.
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index d5f06587..73d1b23b 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -32,7 +32,6 @@ import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.VCpuWeigher
import org.opendc.compute.workload.ComputeServiceHelper
-import org.opendc.compute.workload.telemetry.NoopTelemetryManager
import org.opendc.compute.workload.topology.HostSpec
import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider
import org.opendc.simulator.compute.model.MachineModel
@@ -70,7 +69,12 @@ internal class WorkflowServiceTest {
weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0))
)
- val computeHelper = ComputeServiceHelper(coroutineContext, clock, NoopTelemetryManager(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1))
+ val computeHelper = ComputeServiceHelper(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ schedulingQuantum = Duration.ofSeconds(1)
+ )
val hostCount = 4
repeat(hostCount) { computeHelper.registerHost(createHostSpec(it)) }
diff --git a/opendc-workflow/opendc-workflow-workload/build.gradle.kts b/opendc-workflow/opendc-workflow-workload/build.gradle.kts
index b725a69c..17eadf29 100644
--- a/opendc-workflow/opendc-workflow-workload/build.gradle.kts
+++ b/opendc-workflow/opendc-workflow-workload/build.gradle.kts
@@ -32,6 +32,4 @@ dependencies {
implementation(projects.opendcSimulator.opendcSimulatorCompute)
implementation(projects.opendcTrace.opendcTraceApi)
- implementation(projects.opendcTelemetry.opendcTelemetrySdk)
- implementation(libs.opentelemetry.semconv)
}
diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
index a7d0ed6c..435d0190 100644
--- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
+++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
@@ -22,24 +22,13 @@
package org.opendc.workflow.workload
-import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.common.CompletableResultCode
-import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.data.AggregationTemporality
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import io.opentelemetry.sdk.metrics.export.MetricReader
-import io.opentelemetry.sdk.metrics.export.MetricReaderFactory
-import io.opentelemetry.sdk.resources.Resource
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import org.opendc.compute.api.ComputeClient
-import org.opendc.telemetry.sdk.toOtelClock
import org.opendc.workflow.api.Job
import org.opendc.workflow.service.WorkflowService
import java.time.Clock
-import java.util.*
import kotlin.coroutines.CoroutineContext
/**
@@ -59,60 +48,16 @@ public class WorkflowServiceHelper(
/**
* The [WorkflowService] that is constructed by this runner.
*/
- public val service: WorkflowService
-
- /**
- * The [MetricProducer] exposed by the [WorkflowService].
- */
- public lateinit var metricProducer: MetricProducer
- private set
-
- /**
- * The [MeterProvider] used for the service.
- */
- private val _meterProvider: SdkMeterProvider
-
- /**
- * The list of [MetricReader]s that have been registered with the runner.
- */
- private val _metricReaders = mutableListOf<MetricReader>()
-
- init {
- val resource = Resource.builder()
- .put(ResourceAttributes.SERVICE_NAME, "opendc-workflow")
- .build()
-
- _meterProvider = SdkMeterProvider.builder()
- .setClock(clock.toOtelClock())
- .setResource(resource)
- .registerMetricReader { producer ->
- metricProducer = producer
-
- val metricReaders = _metricReaders
- object : MetricReader {
- override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE
- override fun flush(): CompletableResultCode {
- return CompletableResultCode.ofAll(metricReaders.map { it.flush() })
- }
- override fun shutdown(): CompletableResultCode {
- return CompletableResultCode.ofAll(metricReaders.map { it.shutdown() })
- }
- }
- }
- .build()
-
- service = WorkflowService(
- context,
- clock,
- _meterProvider,
- computeClient,
- schedulerSpec.schedulingQuantum,
- jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy,
- jobOrderPolicy = schedulerSpec.jobOrderPolicy,
- taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy,
- taskOrderPolicy = schedulerSpec.taskOrderPolicy,
- )
- }
+ public val service: WorkflowService = WorkflowService(
+ context,
+ clock,
+ computeClient,
+ schedulerSpec.schedulingQuantum,
+ jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy,
+ jobOrderPolicy = schedulerSpec.jobOrderPolicy,
+ taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy,
+ taskOrderPolicy = schedulerSpec.taskOrderPolicy,
+ )
/**
* Run the specified list of [jobs] using the workflow service and suspend execution until all jobs have
@@ -146,19 +91,8 @@ public class WorkflowServiceHelper(
}
}
- /**
- * Register a [MetricReader] for this helper.
- *
- * @param factory The factory for the reader to register.
- */
- public fun registerMetricReader(factory: MetricReaderFactory) {
- val reader = factory.apply(metricProducer)
- _metricReaders.add(reader)
- }
-
override fun close() {
computeClient.close()
service.close()
- _meterProvider.close()
}
}