diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-27 12:48:51 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-03-27 12:48:51 +0100 |
| commit | bef2b2fc9ab97941613ec4537ebca1eb3fccdee6 (patch) | |
| tree | 4a4c1e0fb2de6aa0a0b9810f9c255e039a7ce47b /simulator/opendc-workflow/opendc-workflow-service/src/main | |
| parent | 57ebc8a1c6779d7e7276754838e83f1c026cceb9 (diff) | |
| parent | 5b0eaf76ec00192c755b268b7655f6463c5bc62f (diff) | |
Integrate OpenTelemetry into OpenDC
This pull request resolves issue #91 by adopting the industry-standard OpenTelemetry standard
for collecting traces and metrics in OpenDC simulations:
* Metrics will now be exposed through OpenTelemetry's metrics API
* `opendc-telemetry` provides complementary code to support
gathering telemetry in OpenDC.
**Breaking API Changes**
* `opendc-tracer` has been removed.
* `EventFlow` and all usages of it have been removed.
* `opendc-experiments-sc18` has been removed for now, but a
suitable replacement will follow soon.
Diffstat (limited to 'simulator/opendc-workflow/opendc-workflow-service/src/main')
3 files changed, 94 insertions, 135 deletions
diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt deleted file mode 100644 index bb2ad6c6..00000000 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflow.service - -import org.opendc.trace.core.Event -import org.opendc.workflow.api.Job -import org.opendc.workflow.api.Task - -/** - * An event emitted by the [WorkflowService]. - */ -public sealed class WorkflowEvent : Event() { - /** - * The [WorkflowService] that emitted the event. - */ - public abstract val service: WorkflowService - - /** - * This event is emitted when a job was submitted to the scheduler. - */ - public data class JobSubmitted( - override val service: WorkflowService, - public val job: Job - ) : WorkflowEvent() - - /** - * This event is emitted when a job has become active. - */ - public data class JobStarted( - override val service: WorkflowService, - public val job: Job - ) : WorkflowEvent() - - /** - * This event is emitted when a job has finished processing. - */ - public data class JobFinished( - override val service: WorkflowService, - public val job: Job - ) : WorkflowEvent() - - /** - * This event is emitted when a task of a job has started processing. - */ - public data class TaskStarted( - override val service: WorkflowService, - public val job: Job, - public val task: Task - ) : WorkflowEvent() - - /** - * This event is emitted when a task of a job has started processing. - */ - public data class TaskFinished( - override val service: WorkflowService, - public val job: Job, - public val task: Task - ) : WorkflowEvent() -} diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index 2f83e376..d3358ef1 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -22,9 +22,8 @@ package org.opendc.workflow.service -import kotlinx.coroutines.flow.Flow +import io.opentelemetry.api.metrics.Meter import org.opendc.compute.api.ComputeClient -import org.opendc.trace.core.EventTracer import org.opendc.workflow.api.Job import org.opendc.workflow.service.internal.WorkflowServiceImpl import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode @@ -42,14 +41,14 @@ import kotlin.coroutines.CoroutineContext */ public interface WorkflowService : AutoCloseable { /** - * The events emitted by the workflow scheduler. + * Submit the specified [Job] to the workflow service for scheduling. */ - public val events: Flow<WorkflowEvent> + public suspend fun submit(job: Job) /** - * Submit the specified [Job] to the workflow service for scheduling. + * Run the specified [Job] and suspend execution until the job is finished. */ - public suspend fun submit(job: Job) + public suspend fun run(job: Job) /** * Terminate the lifecycle of the workflow service, stopping all running workflows. @@ -63,6 +62,7 @@ public interface WorkflowService : AutoCloseable { * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. * @param tracer The event tracer to use. + * @param meter The meter to use. * @param compute The compute client to use. * @param mode The scheduling mode to use. * @param jobAdmissionPolicy The job admission policy to use. @@ -73,7 +73,7 @@ public interface WorkflowService : AutoCloseable { public operator fun invoke( context: CoroutineContext, clock: Clock, - tracer: EventTracer, + meter: Meter, compute: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -84,7 +84,7 @@ public interface WorkflowService : AutoCloseable { return WorkflowServiceImpl( context, clock, - tracer, + meter, compute, mode, jobAdmissionPolicy, diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 85a88acd..32191b8f 100644 --- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -22,17 +22,11 @@ package org.opendc.workflow.service.internal -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.cancel -import kotlinx.coroutines.flow.Flow +import io.opentelemetry.api.metrics.Meter +import kotlinx.coroutines.* import kotlinx.coroutines.flow.map -import kotlinx.coroutines.launch import mu.KotlinLogging import org.opendc.compute.api.* -import org.opendc.trace.core.EventTracer -import org.opendc.trace.core.consumeAsFlow -import org.opendc.trace.core.enable import org.opendc.workflow.api.Job import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.service.* @@ -43,7 +37,9 @@ import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy import java.time.Clock import java.util.* +import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.resume /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for @@ -52,7 +48,7 @@ import kotlin.coroutines.CoroutineContext public class WorkflowServiceImpl( context: CoroutineContext, internal val clock: Clock, - internal val tracer: EventTracer, + private val meter: Meter, private val computeClient: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -63,7 +59,7 @@ public class WorkflowServiceImpl( /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. */ - internal val scope = CoroutineScope(context) + internal val scope = CoroutineScope(context + Job()) /** * The logger instance to use. @@ -106,6 +102,11 @@ public class WorkflowServiceImpl( internal val taskByServer = mutableMapOf<Server, TaskState>() /** + * The continuation of the jobs. + */ + private val conts = mutableMapOf<Job, Continuation<Unit>>() + + /** * The root listener of this scheduler. */ private val rootListener = object : WorkflowSchedulerListener { @@ -151,6 +152,54 @@ public class WorkflowServiceImpl( } } + /** + * The number of jobs that have been submitted to the service. + */ + private val submittedJobs = meter.longCounterBuilder("jobs.submitted") + .setDescription("Number of submitted jobs") + .setUnit("1") + .build() + + /** + * The number of jobs that are running. + */ + private val runningJobs = meter.longUpDownCounterBuilder("jobs.active") + .setDescription("Number of jobs running") + .setUnit("1") + .build() + + /** + * The number of jobs that have finished running. + */ + private val finishedJobs = meter.longCounterBuilder("jobs.finished") + .setDescription("Number of jobs that finished running") + .setUnit("1") + .build() + + /** + * The number of tasks that have been submitted to the service. + */ + private val submittedTasks = meter.longCounterBuilder("tasks.submitted") + .setDescription("Number of submitted tasks") + .setUnit("1") + .build() + + /** + * The number of jobs that are running. + */ + private val runningTasks = meter.longUpDownCounterBuilder("tasks.active") + .setDescription("Number of tasks running") + .setUnit("1") + .build() + + /** + * The number of jobs that have finished running. + */ + private val finishedTasks = meter.longCounterBuilder("tasks.finished") + .setDescription("Number of tasks that finished running") + .setUnit("1") + .build() + private val mode: WorkflowSchedulerMode.Logic private val jobAdmissionPolicy: JobAdmissionPolicy.Logic private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic @@ -167,16 +216,7 @@ public class WorkflowServiceImpl( } } - override val events: Flow<WorkflowEvent> = tracer.openRecording().let { - it.enable<WorkflowEvent.JobSubmitted>() - it.enable<WorkflowEvent.JobStarted>() - it.enable<WorkflowEvent.JobFinished>() - it.enable<WorkflowEvent.TaskStarted>() - it.enable<WorkflowEvent.TaskFinished>() - it.consumeAsFlow().map { event -> event as WorkflowEvent } - } - - override suspend fun submit(job: Job) { + override suspend fun run(job: Job) { // J1 Incoming Jobs val jobInstance = JobState(job, clock.millis()) val instances = job.tasks.associateWith { @@ -193,14 +233,24 @@ public class WorkflowServiceImpl( if (instance.isRoot) { instance.state = TaskStatus.READY } + + submittedTasks.add(1) } - instances.values.toCollection(jobInstance.tasks) - incomingJobs += jobInstance - rootListener.jobSubmitted(jobInstance) - tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job)) + return suspendCancellableCoroutine { cont -> + instances.values.toCollection(jobInstance.tasks) + incomingJobs += jobInstance + rootListener.jobSubmitted(jobInstance) + conts[job] = cont + + submittedJobs.add(1) + + requestCycle() + } + } - requestCycle() + override suspend fun submit(job: Job) { + scope.launch { run(job) } } override fun close() { @@ -231,12 +281,8 @@ public class WorkflowServiceImpl( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - tracer.commit( - WorkflowEvent.JobStarted( - this, - jobInstance.job - ) - ) + + runningJobs.add(1) rootListener.jobStarted(jobInstance) } @@ -311,18 +357,11 @@ public class WorkflowServiceImpl( public override fun onStateChanged(server: Server, newState: ServerState) { when (newState) { - ServerState.PROVISIONING -> { - } + ServerState.PROVISIONING -> {} ServerState.RUNNING -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() - tracer.commit( - WorkflowEvent.TaskStarted( - this@WorkflowServiceImpl, - task.job.job, - task.task - ) - ) + runningTasks.add(1) rootListener.taskStarted(task) } ServerState.TERMINATED, ServerState.ERROR -> { @@ -338,13 +377,9 @@ public class WorkflowServiceImpl( task.finishedAt = clock.millis() job.tasks.remove(task) activeTasks -= task - tracer.commit( - WorkflowEvent.TaskFinished( - this@WorkflowServiceImpl, - task.job.job, - task.task - ) - ) + + runningTasks.add(-1) + finishedTasks.add(1) rootListener.taskFinished(task) // Add job roots to the scheduling queue @@ -371,8 +406,11 @@ public class WorkflowServiceImpl( private fun finishJob(job: JobState) { activeJobs -= job - tracer.commit(WorkflowEvent.JobFinished(this, job.job)) + runningJobs.add(-1) + finishedJobs.add(1) rootListener.jobFinished(job) + + conts.remove(job.job)?.resume(Unit) } public fun addListener(listener: WorkflowSchedulerListener) { |
