summaryrefslogtreecommitdiff
path: root/opendc-workflow/opendc-workflow-service/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-04 23:00:46 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-06 17:45:40 +0200
commit470c96072fa4f112d0511383ea99cdf7d5cc0864 (patch)
treef3025c4a917c075c567bf504ade20ce69cbab76d /opendc-workflow/opendc-workflow-service/src/main
parent7981e9aa3e6854ad593a5af85f8eb56874299d7e (diff)
refactor(workflow/service): Directly expose scheduler stats to user
This change updates the `WorkflowService` interface to directly expose statistics about the scheduler to the user, such that they do not necessarily have to interact with OpenTelemetry to obtain these values
Diffstat (limited to 'opendc-workflow/opendc-workflow-service/src/main')
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt6
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt19
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt42
3 files changed, 67 insertions, 0 deletions
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
index ebace07d..b8bc0e33 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
@@ -30,6 +30,7 @@ import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
+import org.opendc.workflow.service.telemetry.SchedulerStats
import java.time.Clock
import java.time.Duration
import kotlin.coroutines.CoroutineContext
@@ -46,6 +47,11 @@ public interface WorkflowService : AutoCloseable {
public suspend fun invoke(job: Job)
/**
+ * Collect statistics about the workflow scheduler.
+ */
+ public fun getSchedulerStats(): SchedulerStats
+
+ /**
* Terminate the lifecycle of the workflow service, stopping all running workflows.
*/
public override fun close()
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index cdaec021..9c7f18a2 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -34,6 +34,7 @@ import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
+import org.opendc.workflow.service.telemetry.SchedulerStats
import java.time.Clock
import java.time.Duration
import java.util.*
@@ -145,6 +146,7 @@ public class WorkflowServiceImpl(
.setDescription("Number of submitted jobs")
.setUnit("1")
.build()
+ private var _workflowsSubmitted: Int = 0
/**
* The number of jobs that are running.
@@ -153,6 +155,7 @@ public class WorkflowServiceImpl(
.setDescription("Number of jobs running")
.setUnit("1")
.build()
+ private var _workflowsRunning: Int = 0
/**
* The number of jobs that have finished running.
@@ -161,6 +164,7 @@ public class WorkflowServiceImpl(
.setDescription("Number of jobs that finished running")
.setUnit("1")
.build()
+ private var _workflowsFinished: Int = 0
/**
* The number of tasks that have been submitted to the service.
@@ -169,6 +173,7 @@ public class WorkflowServiceImpl(
.setDescription("Number of submitted tasks")
.setUnit("1")
.build()
+ private var _tasksSubmitted: Int = 0
/**
* The number of jobs that are running.
@@ -177,6 +182,7 @@ public class WorkflowServiceImpl(
.setDescription("Number of tasks running")
.setUnit("1")
.build()
+ private var _tasksRunning: Int = 0
/**
* The number of jobs that have finished running.
@@ -185,6 +191,7 @@ public class WorkflowServiceImpl(
.setDescription("Number of tasks that finished running")
.setUnit("1")
.build()
+ private var _tasksFinished: Int = 0
/**
* The [Pacer] to use for scheduling the scheduler cycles.
@@ -223,16 +230,22 @@ public class WorkflowServiceImpl(
}
submittedTasks.add(1)
+ _tasksSubmitted++
}
instances.values.toCollection(jobInstance.tasks)
incomingJobs += jobInstance
rootListener.jobSubmitted(jobInstance)
submittedJobs.add(1)
+ _workflowsSubmitted++
pacer.enqueue()
}
+ override fun getSchedulerStats(): SchedulerStats {
+ return SchedulerStats(_workflowsSubmitted, _workflowsRunning, _workflowsFinished, _tasksSubmitted, _tasksRunning, _tasksFinished)
+ }
+
override fun close() {
scope.cancel()
}
@@ -271,6 +284,7 @@ public class WorkflowServiceImpl(
activeJobs += jobInstance
runningJobs.add(1)
+ _workflowsRunning++
rootListener.jobStarted(jobInstance)
}
@@ -350,6 +364,7 @@ public class WorkflowServiceImpl(
val task = taskByServer.getValue(server)
task.startedAt = clock.millis()
runningTasks.add(1)
+ _tasksRunning++
rootListener.taskStarted(task)
}
ServerState.TERMINATED, ServerState.ERROR -> {
@@ -368,6 +383,8 @@ public class WorkflowServiceImpl(
runningTasks.add(-1)
finishedTasks.add(1)
+ _tasksRunning--
+ _tasksFinished++
rootListener.taskFinished(task)
// Add job roots to the scheduling queue
@@ -395,6 +412,8 @@ public class WorkflowServiceImpl(
activeJobs -= job
runningJobs.add(-1)
finishedJobs.add(1)
+ _workflowsRunning--
+ _workflowsFinished++
rootListener.jobFinished(job)
job.cont.resume(Unit)
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt
new file mode 100644
index 00000000..7c7d7c4d
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.telemetry
+
+/**
+ * Statistics about the workflow scheduler.
+ *
+ * @property workflowsSubmitted The number of workflows submitted to the scheduler.
+ * @property workflowsRunning The number of workflows that are currently running.
+ * @property workflowsFinished The number of workflows that have completed since the scheduler started.
+ * @property tasksSubmitted The number of tasks submitted to the scheduler.
+ * @property tasksRunning The number of tasks that are currently running.
+ * @property tasksFinished The number of tasks that have completed.
+ */
+public data class SchedulerStats(
+ val workflowsSubmitted: Int,
+ val workflowsRunning: Int,
+ val workflowsFinished: Int,
+ val tasksSubmitted: Int,
+ val tasksRunning: Int,
+ val tasksFinished: Int
+)