From b8ba3cf81da6367285c5d5a23a70f8c340a45fdd Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 26 Mar 2021 15:30:22 +0100 Subject: compute: Integrate OpenTelemetry Metrics in OpenDC Workflow This change integrates the OpenTelemetry Metrics API in the OpenDC Workflow Service implementation. This replaces the old infrastructure for gathering metrics. --- .../org/opendc/workflow/service/WorkflowService.kt | 12 ++- .../service/internal/WorkflowServiceImpl.kt | 109 ++++++++++++++++----- 2 files changed, 95 insertions(+), 26 deletions(-) (limited to 'simulator/opendc-workflow/opendc-workflow-service/src/main') diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index 2f83e376..94302790 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -22,6 +22,7 @@ package org.opendc.workflow.service +import io.opentelemetry.api.metrics.Meter import kotlinx.coroutines.flow.Flow import org.opendc.compute.api.ComputeClient import org.opendc.trace.core.EventTracer @@ -42,14 +43,14 @@ import kotlin.coroutines.CoroutineContext */ public interface WorkflowService : AutoCloseable { /** - * The events emitted by the workflow scheduler. + * Submit the specified [Job] to the workflow service for scheduling. */ - public val events: Flow + public suspend fun submit(job: Job) /** - * Submit the specified [Job] to the workflow service for scheduling. + * Run the specified [Job] and suspend execution until the job is finished. */ - public suspend fun submit(job: Job) + public suspend fun run(job: Job) /** * Terminate the lifecycle of the workflow service, stopping all running workflows. @@ -63,6 +64,7 @@ public interface WorkflowService : AutoCloseable { * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. * @param tracer The event tracer to use. + * @param meter The meter to use. * @param compute The compute client to use. * @param mode The scheduling mode to use. * @param jobAdmissionPolicy The job admission policy to use. @@ -74,6 +76,7 @@ public interface WorkflowService : AutoCloseable { context: CoroutineContext, clock: Clock, tracer: EventTracer, + meter: Meter, compute: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -85,6 +88,7 @@ public interface WorkflowService : AutoCloseable { context, clock, tracer, + meter, compute, mode, jobAdmissionPolicy, diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 85a88acd..1aef6f8e 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -22,12 +22,10 @@ package org.opendc.workflow.service.internal -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.cancel +import io.opentelemetry.api.metrics.Meter +import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map -import kotlinx.coroutines.launch import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.trace.core.EventTracer @@ -43,7 +41,9 @@ import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy import java.time.Clock import java.util.* +import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.resume /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for @@ -53,6 +53,7 @@ public class WorkflowServiceImpl( context: CoroutineContext, internal val clock: Clock, internal val tracer: EventTracer, + private val meter: Meter, private val computeClient: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -63,7 +64,7 @@ public class WorkflowServiceImpl( /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. */ - internal val scope = CoroutineScope(context) + internal val scope = CoroutineScope(context + Job()) /** * The logger instance to use. @@ -105,6 +106,11 @@ public class WorkflowServiceImpl( */ internal val taskByServer = mutableMapOf() + /** + * The continuation of the jobs. + */ + private val conts = mutableMapOf>() + /** * The root listener of this scheduler. */ @@ -151,6 +157,54 @@ public class WorkflowServiceImpl( } } + /** + * The number of jobs that have been submitted to the service. + */ + private val submittedJobs = meter.longCounterBuilder("jobs.submitted") + .setDescription("Number of submitted jobs") + .setUnit("1") + .build() + + /** + * The number of jobs that are running. + */ + private val runningJobs = meter.longUpDownCounterBuilder("jobs.active") + .setDescription("Number of jobs running") + .setUnit("1") + .build() + + /** + * The number of jobs that have finished running. + */ + private val finishedJobs = meter.longCounterBuilder("jobs.finished") + .setDescription("Number of jobs that finished running") + .setUnit("1") + .build() + + /** + * The number of tasks that have been submitted to the service. + */ + private val submittedTasks = meter.longCounterBuilder("tasks.submitted") + .setDescription("Number of submitted tasks") + .setUnit("1") + .build() + + /** + * The number of jobs that are running. + */ + private val runningTasks = meter.longUpDownCounterBuilder("tasks.active") + .setDescription("Number of tasks running") + .setUnit("1") + .build() + + /** + * The number of jobs that have finished running. + */ + private val finishedTasks = meter.longCounterBuilder("tasks.finished") + .setDescription("Number of tasks that finished running") + .setUnit("1") + .build() + private val mode: WorkflowSchedulerMode.Logic private val jobAdmissionPolicy: JobAdmissionPolicy.Logic private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic @@ -167,16 +221,7 @@ public class WorkflowServiceImpl( } } - override val events: Flow = tracer.openRecording().let { - it.enable() - it.enable() - it.enable() - it.enable() - it.enable() - it.consumeAsFlow().map { event -> event as WorkflowEvent } - } - - override suspend fun submit(job: Job) { + override suspend fun run(job: Job) { // J1 Incoming Jobs val jobInstance = JobState(job, clock.millis()) val instances = job.tasks.associateWith { @@ -193,14 +238,25 @@ public class WorkflowServiceImpl( if (instance.isRoot) { instance.state = TaskStatus.READY } + + submittedTasks.add(1) } - instances.values.toCollection(jobInstance.tasks) - incomingJobs += jobInstance - rootListener.jobSubmitted(jobInstance) - tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job)) + return suspendCancellableCoroutine { cont -> + instances.values.toCollection(jobInstance.tasks) + incomingJobs += jobInstance + rootListener.jobSubmitted(jobInstance) + conts[job] = cont + + submittedJobs.add(1) + tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job)) - requestCycle() + requestCycle() + } + } + + override suspend fun submit(job: Job) { + scope.launch { run(job) } } override fun close() { @@ -231,6 +287,8 @@ public class WorkflowServiceImpl( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance + + runningJobs.add(1) tracer.commit( WorkflowEvent.JobStarted( this, @@ -311,11 +369,11 @@ public class WorkflowServiceImpl( public override fun onStateChanged(server: Server, newState: ServerState) { when (newState) { - ServerState.PROVISIONING -> { - } + ServerState.PROVISIONING -> {} ServerState.RUNNING -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() + runningTasks.add(1) tracer.commit( WorkflowEvent.TaskStarted( this@WorkflowServiceImpl, @@ -338,6 +396,9 @@ public class WorkflowServiceImpl( task.finishedAt = clock.millis() job.tasks.remove(task) activeTasks -= task + + runningTasks.add(-1) + finishedTasks.add(1) tracer.commit( WorkflowEvent.TaskFinished( this@WorkflowServiceImpl, @@ -371,8 +432,12 @@ public class WorkflowServiceImpl( private fun finishJob(job: JobState) { activeJobs -= job + runningJobs.add(-1) + finishedJobs.add(1) tracer.commit(WorkflowEvent.JobFinished(this, job.job)) rootListener.jobFinished(job) + + conts.remove(job.job)?.resume(Unit) } public fun addListener(listener: WorkflowSchedulerListener) { -- cgit v1.2.3 From be95e0b10b648e256c4eb8c2190ad6ed61f8afdd Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 26 Mar 2021 15:36:44 +0100 Subject: workflow: Remove event tracer from workflow service This change removes the event tracer from the OpenDC Workflow service as we start migrating to the industry standard OpenTelemetry. --- .../org/opendc/workflow/service/WorkflowEvent.kt | 79 ---------------------- .../org/opendc/workflow/service/WorkflowService.kt | 4 -- .../service/internal/WorkflowServiceImpl.kt | 27 -------- 3 files changed, 110 deletions(-) delete mode 100644 simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt (limited to 'simulator/opendc-workflow/opendc-workflow-service/src/main') diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt deleted file mode 100644 index bb2ad6c6..00000000 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflow.service - -import org.opendc.trace.core.Event -import org.opendc.workflow.api.Job -import org.opendc.workflow.api.Task - -/** - * An event emitted by the [WorkflowService]. - */ -public sealed class WorkflowEvent : Event() { - /** - * The [WorkflowService] that emitted the event. - */ - public abstract val service: WorkflowService - - /** - * This event is emitted when a job was submitted to the scheduler. - */ - public data class JobSubmitted( - override val service: WorkflowService, - public val job: Job - ) : WorkflowEvent() - - /** - * This event is emitted when a job has become active. - */ - public data class JobStarted( - override val service: WorkflowService, - public val job: Job - ) : WorkflowEvent() - - /** - * This event is emitted when a job has finished processing. - */ - public data class JobFinished( - override val service: WorkflowService, - public val job: Job - ) : WorkflowEvent() - - /** - * This event is emitted when a task of a job has started processing. - */ - public data class TaskStarted( - override val service: WorkflowService, - public val job: Job, - public val task: Task - ) : WorkflowEvent() - - /** - * This event is emitted when a task of a job has started processing. - */ - public data class TaskFinished( - override val service: WorkflowService, - public val job: Job, - public val task: Task - ) : WorkflowEvent() -} diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index 94302790..d3358ef1 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -23,9 +23,7 @@ package org.opendc.workflow.service import io.opentelemetry.api.metrics.Meter -import kotlinx.coroutines.flow.Flow import org.opendc.compute.api.ComputeClient -import org.opendc.trace.core.EventTracer import org.opendc.workflow.api.Job import org.opendc.workflow.service.internal.WorkflowServiceImpl import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode @@ -75,7 +73,6 @@ public interface WorkflowService : AutoCloseable { public operator fun invoke( context: CoroutineContext, clock: Clock, - tracer: EventTracer, meter: Meter, compute: ComputeClient, mode: WorkflowSchedulerMode, @@ -87,7 +84,6 @@ public interface WorkflowService : AutoCloseable { return WorkflowServiceImpl( context, clock, - tracer, meter, compute, mode, diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 1aef6f8e..32191b8f 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -24,13 +24,9 @@ package org.opendc.workflow.service.internal import io.opentelemetry.api.metrics.Meter import kotlinx.coroutines.* -import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map import mu.KotlinLogging import org.opendc.compute.api.* -import org.opendc.trace.core.EventTracer -import org.opendc.trace.core.consumeAsFlow -import org.opendc.trace.core.enable import org.opendc.workflow.api.Job import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.service.* @@ -52,7 +48,6 @@ import kotlin.coroutines.resume public class WorkflowServiceImpl( context: CoroutineContext, internal val clock: Clock, - internal val tracer: EventTracer, private val meter: Meter, private val computeClient: ComputeClient, mode: WorkflowSchedulerMode, @@ -249,7 +244,6 @@ public class WorkflowServiceImpl( conts[job] = cont submittedJobs.add(1) - tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job)) requestCycle() } @@ -289,12 +283,6 @@ public class WorkflowServiceImpl( activeJobs += jobInstance runningJobs.add(1) - tracer.commit( - WorkflowEvent.JobStarted( - this, - jobInstance.job - ) - ) rootListener.jobStarted(jobInstance) } @@ -374,13 +362,6 @@ public class WorkflowServiceImpl( val task = taskByServer.getValue(server) task.startedAt = clock.millis() runningTasks.add(1) - tracer.commit( - WorkflowEvent.TaskStarted( - this@WorkflowServiceImpl, - task.job.job, - task.task - ) - ) rootListener.taskStarted(task) } ServerState.TERMINATED, ServerState.ERROR -> { @@ -399,13 +380,6 @@ public class WorkflowServiceImpl( runningTasks.add(-1) finishedTasks.add(1) - tracer.commit( - WorkflowEvent.TaskFinished( - this@WorkflowServiceImpl, - task.job.job, - task.task - ) - ) rootListener.taskFinished(task) // Add job roots to the scheduling queue @@ -434,7 +408,6 @@ public class WorkflowServiceImpl( activeJobs -= job runningJobs.add(-1) finishedJobs.add(1) - tracer.commit(WorkflowEvent.JobFinished(this, job.job)) rootListener.jobFinished(job) conts.remove(job.job)?.resume(Unit) -- cgit v1.2.3