diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-26 15:30:22 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-26 15:41:06 +0100 |
| commit | b8ba3cf81da6367285c5d5a23a70f8c340a45fdd (patch) | |
| tree | e88125272bb192614894352240045520dc439c57 /simulator/opendc-workflow/opendc-workflow-service/src/main | |
| parent | 608ff59b2d7e8ce696fe6f7271d80b5efc9c4b87 (diff) | |
compute: Integrate OpenTelemetry Metrics in OpenDC Workflow
This change integrates the OpenTelemetry Metrics API in the OpenDC
Workflow Service implementation. This replaces the old infrastructure for
gathering metrics.
Diffstat (limited to 'simulator/opendc-workflow/opendc-workflow-service/src/main')
2 files changed, 95 insertions, 26 deletions
diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index 2f83e376..94302790 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -22,6 +22,7 @@ package org.opendc.workflow.service +import io.opentelemetry.api.metrics.Meter import kotlinx.coroutines.flow.Flow import org.opendc.compute.api.ComputeClient import org.opendc.trace.core.EventTracer @@ -42,14 +43,14 @@ import kotlin.coroutines.CoroutineContext */ public interface WorkflowService : AutoCloseable { /** - * The events emitted by the workflow scheduler. + * Submit the specified [Job] to the workflow service for scheduling. */ - public val events: Flow<WorkflowEvent> + public suspend fun submit(job: Job) /** - * Submit the specified [Job] to the workflow service for scheduling. + * Run the specified [Job] and suspend execution until the job is finished. */ - public suspend fun submit(job: Job) + public suspend fun run(job: Job) /** * Terminate the lifecycle of the workflow service, stopping all running workflows. @@ -63,6 +64,7 @@ public interface WorkflowService : AutoCloseable { * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. * @param tracer The event tracer to use. + * @param meter The meter to use. * @param compute The compute client to use. * @param mode The scheduling mode to use. * @param jobAdmissionPolicy The job admission policy to use. @@ -74,6 +76,7 @@ public interface WorkflowService : AutoCloseable { context: CoroutineContext, clock: Clock, tracer: EventTracer, + meter: Meter, compute: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -85,6 +88,7 @@ public interface WorkflowService : AutoCloseable { context, clock, tracer, + meter, compute, mode, jobAdmissionPolicy, diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 85a88acd..1aef6f8e 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -22,12 +22,10 @@ package org.opendc.workflow.service.internal -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.cancel +import io.opentelemetry.api.metrics.Meter +import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map -import kotlinx.coroutines.launch import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.trace.core.EventTracer @@ -43,7 +41,9 @@ import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy import java.time.Clock import java.util.* +import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.resume /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for @@ -53,6 +53,7 @@ public class WorkflowServiceImpl( context: CoroutineContext, internal val clock: Clock, internal val tracer: EventTracer, + private val meter: Meter, private val computeClient: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -63,7 +64,7 @@ public class WorkflowServiceImpl( /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. */ - internal val scope = CoroutineScope(context) + internal val scope = CoroutineScope(context + Job()) /** * The logger instance to use. @@ -106,6 +107,11 @@ public class WorkflowServiceImpl( internal val taskByServer = mutableMapOf<Server, TaskState>() /** + * The continuation of the jobs. + */ + private val conts = mutableMapOf<Job, Continuation<Unit>>() + + /** * The root listener of this scheduler. */ private val rootListener = object : WorkflowSchedulerListener { @@ -151,6 +157,54 @@ public class WorkflowServiceImpl( } } + /** + * The number of jobs that have been submitted to the service. + */ + private val submittedJobs = meter.longCounterBuilder("jobs.submitted") + .setDescription("Number of submitted jobs") + .setUnit("1") + .build() + + /** + * The number of jobs that are running. + */ + private val runningJobs = meter.longUpDownCounterBuilder("jobs.active") + .setDescription("Number of jobs running") + .setUnit("1") + .build() + + /** + * The number of jobs that have finished running. + */ + private val finishedJobs = meter.longCounterBuilder("jobs.finished") + .setDescription("Number of jobs that finished running") + .setUnit("1") + .build() + + /** + * The number of tasks that have been submitted to the service. + */ + private val submittedTasks = meter.longCounterBuilder("tasks.submitted") + .setDescription("Number of submitted tasks") + .setUnit("1") + .build() + + /** + * The number of jobs that are running. + */ + private val runningTasks = meter.longUpDownCounterBuilder("tasks.active") + .setDescription("Number of tasks running") + .setUnit("1") + .build() + + /** + * The number of jobs that have finished running. + */ + private val finishedTasks = meter.longCounterBuilder("tasks.finished") + .setDescription("Number of tasks that finished running") + .setUnit("1") + .build() + private val mode: WorkflowSchedulerMode.Logic private val jobAdmissionPolicy: JobAdmissionPolicy.Logic private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic @@ -167,16 +221,7 @@ public class WorkflowServiceImpl( } } - override val events: Flow<WorkflowEvent> = tracer.openRecording().let { - it.enable<WorkflowEvent.JobSubmitted>() - it.enable<WorkflowEvent.JobStarted>() - it.enable<WorkflowEvent.JobFinished>() - it.enable<WorkflowEvent.TaskStarted>() - it.enable<WorkflowEvent.TaskFinished>() - it.consumeAsFlow().map { event -> event as WorkflowEvent } - } - - override suspend fun submit(job: Job) { + override suspend fun run(job: Job) { // J1 Incoming Jobs val jobInstance = JobState(job, clock.millis()) val instances = job.tasks.associateWith { @@ -193,14 +238,25 @@ public class WorkflowServiceImpl( if (instance.isRoot) { instance.state = TaskStatus.READY } + + submittedTasks.add(1) } - instances.values.toCollection(jobInstance.tasks) - incomingJobs += jobInstance - rootListener.jobSubmitted(jobInstance) - tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job)) + return suspendCancellableCoroutine { cont -> + instances.values.toCollection(jobInstance.tasks) + incomingJobs += jobInstance + rootListener.jobSubmitted(jobInstance) + conts[job] = cont + + submittedJobs.add(1) + tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job)) - requestCycle() + requestCycle() + } + } + + override suspend fun submit(job: Job) { + scope.launch { run(job) } } override fun close() { @@ -231,6 +287,8 @@ public class WorkflowServiceImpl( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance + + runningJobs.add(1) tracer.commit( WorkflowEvent.JobStarted( this, @@ -311,11 +369,11 @@ public class WorkflowServiceImpl( public override fun onStateChanged(server: Server, newState: ServerState) { when (newState) { - ServerState.PROVISIONING -> { - } + ServerState.PROVISIONING -> {} ServerState.RUNNING -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() + runningTasks.add(1) tracer.commit( WorkflowEvent.TaskStarted( this@WorkflowServiceImpl, @@ -338,6 +396,9 @@ public class WorkflowServiceImpl( task.finishedAt = clock.millis() job.tasks.remove(task) activeTasks -= task + + runningTasks.add(-1) + finishedTasks.add(1) tracer.commit( WorkflowEvent.TaskFinished( this@WorkflowServiceImpl, @@ -371,8 +432,12 @@ public class WorkflowServiceImpl( private fun finishJob(job: JobState) { activeJobs -= job + runningJobs.add(-1) + finishedJobs.add(1) tracer.commit(WorkflowEvent.JobFinished(this, job.job)) rootListener.jobFinished(job) + + conts.remove(job.job)?.resume(Unit) } public fun addListener(listener: WorkflowSchedulerListener) { |
