summaryrefslogtreecommitdiff
path: root/simulator/opendc-workflow/opendc-workflow-service/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-26 15:30:22 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-26 15:41:06 +0100
commitb8ba3cf81da6367285c5d5a23a70f8c340a45fdd (patch)
treee88125272bb192614894352240045520dc439c57 /simulator/opendc-workflow/opendc-workflow-service/src/main
parent608ff59b2d7e8ce696fe6f7271d80b5efc9c4b87 (diff)
compute: Integrate OpenTelemetry Metrics in OpenDC Workflow
This change integrates the OpenTelemetry Metrics API in the OpenDC Workflow Service implementation. This replaces the old infrastructure for gathering metrics.
Diffstat (limited to 'simulator/opendc-workflow/opendc-workflow-service/src/main')
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt12
-rw-r--r--simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt109
2 files changed, 95 insertions, 26 deletions
diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
index 2f83e376..94302790 100644
--- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
@@ -22,6 +22,7 @@
package org.opendc.workflow.service
+import io.opentelemetry.api.metrics.Meter
import kotlinx.coroutines.flow.Flow
import org.opendc.compute.api.ComputeClient
import org.opendc.trace.core.EventTracer
@@ -42,14 +43,14 @@ import kotlin.coroutines.CoroutineContext
*/
public interface WorkflowService : AutoCloseable {
/**
- * The events emitted by the workflow scheduler.
+ * Submit the specified [Job] to the workflow service for scheduling.
*/
- public val events: Flow<WorkflowEvent>
+ public suspend fun submit(job: Job)
/**
- * Submit the specified [Job] to the workflow service for scheduling.
+ * Run the specified [Job] and suspend execution until the job is finished.
*/
- public suspend fun submit(job: Job)
+ public suspend fun run(job: Job)
/**
* Terminate the lifecycle of the workflow service, stopping all running workflows.
@@ -63,6 +64,7 @@ public interface WorkflowService : AutoCloseable {
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
* @param tracer The event tracer to use.
+ * @param meter The meter to use.
* @param compute The compute client to use.
* @param mode The scheduling mode to use.
* @param jobAdmissionPolicy The job admission policy to use.
@@ -74,6 +76,7 @@ public interface WorkflowService : AutoCloseable {
context: CoroutineContext,
clock: Clock,
tracer: EventTracer,
+ meter: Meter,
compute: ComputeClient,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -85,6 +88,7 @@ public interface WorkflowService : AutoCloseable {
context,
clock,
tracer,
+ meter,
compute,
mode,
jobAdmissionPolicy,
diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index 85a88acd..1aef6f8e 100644
--- a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -22,12 +22,10 @@
package org.opendc.workflow.service.internal
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.cancel
+import io.opentelemetry.api.metrics.Meter
+import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
-import kotlinx.coroutines.launch
import mu.KotlinLogging
import org.opendc.compute.api.*
import org.opendc.trace.core.EventTracer
@@ -43,7 +41,9 @@ import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
import java.time.Clock
import java.util.*
+import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.resume
/**
* A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for
@@ -53,6 +53,7 @@ public class WorkflowServiceImpl(
context: CoroutineContext,
internal val clock: Clock,
internal val tracer: EventTracer,
+ private val meter: Meter,
private val computeClient: ComputeClient,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -63,7 +64,7 @@ public class WorkflowServiceImpl(
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
*/
- internal val scope = CoroutineScope(context)
+ internal val scope = CoroutineScope(context + Job())
/**
* The logger instance to use.
@@ -106,6 +107,11 @@ public class WorkflowServiceImpl(
internal val taskByServer = mutableMapOf<Server, TaskState>()
/**
+ * The continuation of the jobs.
+ */
+ private val conts = mutableMapOf<Job, Continuation<Unit>>()
+
+ /**
* The root listener of this scheduler.
*/
private val rootListener = object : WorkflowSchedulerListener {
@@ -151,6 +157,54 @@ public class WorkflowServiceImpl(
}
}
+ /**
+ * The number of jobs that have been submitted to the service.
+ */
+ private val submittedJobs = meter.longCounterBuilder("jobs.submitted")
+ .setDescription("Number of submitted jobs")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of jobs that are running.
+ */
+ private val runningJobs = meter.longUpDownCounterBuilder("jobs.active")
+ .setDescription("Number of jobs running")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of jobs that have finished running.
+ */
+ private val finishedJobs = meter.longCounterBuilder("jobs.finished")
+ .setDescription("Number of jobs that finished running")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of tasks that have been submitted to the service.
+ */
+ private val submittedTasks = meter.longCounterBuilder("tasks.submitted")
+ .setDescription("Number of submitted tasks")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of jobs that are running.
+ */
+ private val runningTasks = meter.longUpDownCounterBuilder("tasks.active")
+ .setDescription("Number of tasks running")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of jobs that have finished running.
+ */
+ private val finishedTasks = meter.longCounterBuilder("tasks.finished")
+ .setDescription("Number of tasks that finished running")
+ .setUnit("1")
+ .build()
+
private val mode: WorkflowSchedulerMode.Logic
private val jobAdmissionPolicy: JobAdmissionPolicy.Logic
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
@@ -167,16 +221,7 @@ public class WorkflowServiceImpl(
}
}
- override val events: Flow<WorkflowEvent> = tracer.openRecording().let {
- it.enable<WorkflowEvent.JobSubmitted>()
- it.enable<WorkflowEvent.JobStarted>()
- it.enable<WorkflowEvent.JobFinished>()
- it.enable<WorkflowEvent.TaskStarted>()
- it.enable<WorkflowEvent.TaskFinished>()
- it.consumeAsFlow().map { event -> event as WorkflowEvent }
- }
-
- override suspend fun submit(job: Job) {
+ override suspend fun run(job: Job) {
// J1 Incoming Jobs
val jobInstance = JobState(job, clock.millis())
val instances = job.tasks.associateWith {
@@ -193,14 +238,25 @@ public class WorkflowServiceImpl(
if (instance.isRoot) {
instance.state = TaskStatus.READY
}
+
+ submittedTasks.add(1)
}
- instances.values.toCollection(jobInstance.tasks)
- incomingJobs += jobInstance
- rootListener.jobSubmitted(jobInstance)
- tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job))
+ return suspendCancellableCoroutine { cont ->
+ instances.values.toCollection(jobInstance.tasks)
+ incomingJobs += jobInstance
+ rootListener.jobSubmitted(jobInstance)
+ conts[job] = cont
+
+ submittedJobs.add(1)
+ tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job))
- requestCycle()
+ requestCycle()
+ }
+ }
+
+ override suspend fun submit(job: Job) {
+ scope.launch { run(job) }
}
override fun close() {
@@ -231,6 +287,8 @@ public class WorkflowServiceImpl(
iterator.remove()
jobQueue.add(jobInstance)
activeJobs += jobInstance
+
+ runningJobs.add(1)
tracer.commit(
WorkflowEvent.JobStarted(
this,
@@ -311,11 +369,11 @@ public class WorkflowServiceImpl(
public override fun onStateChanged(server: Server, newState: ServerState) {
when (newState) {
- ServerState.PROVISIONING -> {
- }
+ ServerState.PROVISIONING -> {}
ServerState.RUNNING -> {
val task = taskByServer.getValue(server)
task.startedAt = clock.millis()
+ runningTasks.add(1)
tracer.commit(
WorkflowEvent.TaskStarted(
this@WorkflowServiceImpl,
@@ -338,6 +396,9 @@ public class WorkflowServiceImpl(
task.finishedAt = clock.millis()
job.tasks.remove(task)
activeTasks -= task
+
+ runningTasks.add(-1)
+ finishedTasks.add(1)
tracer.commit(
WorkflowEvent.TaskFinished(
this@WorkflowServiceImpl,
@@ -371,8 +432,12 @@ public class WorkflowServiceImpl(
private fun finishJob(job: JobState) {
activeJobs -= job
+ runningJobs.add(-1)
+ finishedJobs.add(1)
tracer.commit(WorkflowEvent.JobFinished(this, job.job))
rootListener.jobFinished(job)
+
+ conts.remove(job.job)?.resume(Unit)
}
public fun addListener(listener: WorkflowSchedulerListener) {