diff options
Diffstat (limited to 'opendc-workflow')
4 files changed, 74 insertions, 32 deletions
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index ebace07d..b8bc0e33 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -30,6 +30,7 @@ import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy +import org.opendc.workflow.service.telemetry.SchedulerStats import java.time.Clock import java.time.Duration import kotlin.coroutines.CoroutineContext @@ -46,6 +47,11 @@ public interface WorkflowService : AutoCloseable { public suspend fun invoke(job: Job) /** + * Collect statistics about the workflow scheduler. + */ + public fun getSchedulerStats(): SchedulerStats + + /** * Terminate the lifecycle of the workflow service, stopping all running workflows. */ public override fun close() diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index cdaec021..9c7f18a2 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -34,6 +34,7 @@ import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy +import org.opendc.workflow.service.telemetry.SchedulerStats import java.time.Clock import java.time.Duration import java.util.* @@ -145,6 +146,7 @@ public class WorkflowServiceImpl( .setDescription("Number of submitted jobs") .setUnit("1") .build() + private var _workflowsSubmitted: Int = 0 /** * The number of jobs that are running. @@ -153,6 +155,7 @@ public class WorkflowServiceImpl( .setDescription("Number of jobs running") .setUnit("1") .build() + private var _workflowsRunning: Int = 0 /** * The number of jobs that have finished running. @@ -161,6 +164,7 @@ public class WorkflowServiceImpl( .setDescription("Number of jobs that finished running") .setUnit("1") .build() + private var _workflowsFinished: Int = 0 /** * The number of tasks that have been submitted to the service. @@ -169,6 +173,7 @@ public class WorkflowServiceImpl( .setDescription("Number of submitted tasks") .setUnit("1") .build() + private var _tasksSubmitted: Int = 0 /** * The number of jobs that are running. @@ -177,6 +182,7 @@ public class WorkflowServiceImpl( .setDescription("Number of tasks running") .setUnit("1") .build() + private var _tasksRunning: Int = 0 /** * The number of jobs that have finished running. @@ -185,6 +191,7 @@ public class WorkflowServiceImpl( .setDescription("Number of tasks that finished running") .setUnit("1") .build() + private var _tasksFinished: Int = 0 /** * The [Pacer] to use for scheduling the scheduler cycles. @@ -223,16 +230,22 @@ public class WorkflowServiceImpl( } submittedTasks.add(1) + _tasksSubmitted++ } instances.values.toCollection(jobInstance.tasks) incomingJobs += jobInstance rootListener.jobSubmitted(jobInstance) submittedJobs.add(1) + _workflowsSubmitted++ pacer.enqueue() } + override fun getSchedulerStats(): SchedulerStats { + return SchedulerStats(_workflowsSubmitted, _workflowsRunning, _workflowsFinished, _tasksSubmitted, _tasksRunning, _tasksFinished) + } + override fun close() { scope.cancel() } @@ -271,6 +284,7 @@ public class WorkflowServiceImpl( activeJobs += jobInstance runningJobs.add(1) + _workflowsRunning++ rootListener.jobStarted(jobInstance) } @@ -350,6 +364,7 @@ public class WorkflowServiceImpl( val task = taskByServer.getValue(server) task.startedAt = clock.millis() runningTasks.add(1) + _tasksRunning++ rootListener.taskStarted(task) } ServerState.TERMINATED, ServerState.ERROR -> { @@ -368,6 +383,8 @@ public class WorkflowServiceImpl( runningTasks.add(-1) finishedTasks.add(1) + _tasksRunning-- + _tasksFinished++ rootListener.taskFinished(task) // Add job roots to the scheduling queue @@ -395,6 +412,8 @@ public class WorkflowServiceImpl( activeJobs -= job runningJobs.add(-1) finishedJobs.add(1) + _workflowsRunning-- + _workflowsFinished++ rootListener.jobFinished(job) job.cont.resume(Unit) diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt new file mode 100644 index 00000000..7c7d7c4d --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.telemetry + +/** + * Statistics about the workflow scheduler. + * + * @property workflowsSubmitted The number of workflows submitted to the scheduler. + * @property workflowsRunning The number of workflows that are currently running. + * @property workflowsFinished The number of workflows that have completed since the scheduler started. + * @property tasksSubmitted The number of tasks submitted to the scheduler. + * @property tasksRunning The number of tasks that are currently running. + * @property tasksFinished The number of tasks that have completed. + */ +public data class SchedulerStats( + val workflowsSubmitted: Int, + val workflowsRunning: Int, + val workflowsFinished: Int, + val tasksSubmitted: Int, + val tasksRunning: Int, + val tasksFinished: Int +) diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 1fd332b9..d5f06587 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -22,7 +22,6 @@ package org.opendc.workflow.service -import io.opentelemetry.sdk.metrics.export.MetricProducer import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test @@ -66,7 +65,6 @@ internal class WorkflowServiceTest { @Test fun testTrace() = runBlockingSimulation { // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts - val HOST_COUNT = 4 val computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) @@ -74,7 +72,8 @@ internal class WorkflowServiceTest { val computeHelper = ComputeServiceHelper(coroutineContext, clock, NoopTelemetryManager(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1)) - repeat(HOST_COUNT) { computeHelper.registerHost(createHostSpec(it)) } + val hostCount = 4 + repeat(hostCount) { computeHelper.registerHost(createHostSpec(it)) } // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines val workflowScheduler = WorkflowSchedulerSpec( @@ -98,13 +97,13 @@ internal class WorkflowServiceTest { computeHelper.close() } - val metrics = collectMetrics(workflowHelper.metricProducer) + val metrics = workflowHelper.service.getSchedulerStats() assertAll( - { assertEquals(758, metrics.jobsSubmitted, "No jobs submitted") }, - { assertEquals(0, metrics.jobsActive, "Not all submitted jobs started") }, - { assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") }, - { assertEquals(0, metrics.tasksActive, "Not all started tasks finished") }, + { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") }, + { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") }, + { assertEquals(metrics.workflowsSubmitted, metrics.workflowsFinished, "Not all started jobs finished") }, + { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") }, { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, { assertEquals(32649883L, clock.millis()) { "Total duration incorrect" } } ) @@ -130,28 +129,4 @@ internal class WorkflowServiceTest { SimSpaceSharedHypervisorProvider() ) } - - class WorkflowMetrics { - var jobsSubmitted = 0L - var jobsActive = 0L - var jobsFinished = 0L - var tasksSubmitted = 0L - var tasksActive = 0L - var tasksFinished = 0L - } - - /** - * Collect the metrics of the workflow service. - */ - private fun collectMetrics(metricProducer: MetricProducer): WorkflowMetrics { - val metrics = metricProducer.collectAllMetrics().associateBy { it.name } - val res = WorkflowMetrics() - res.jobsSubmitted = metrics["jobs.submitted"]?.longSumData?.points?.last()?.value ?: 0 - res.jobsActive = metrics["jobs.active"]?.longSumData?.points?.last()?.value ?: 0 - res.jobsFinished = metrics["jobs.finished"]?.longSumData?.points?.last()?.value ?: 0 - res.tasksSubmitted = metrics["tasks.submitted"]?.longSumData?.points?.last()?.value ?: 0 - res.tasksActive = metrics["tasks.active"]?.longSumData?.points?.last()?.value ?: 0 - res.tasksFinished = metrics["tasks.finished"]?.longSumData?.points?.last()?.value ?: 0 - return res - } } |
