diff options
Diffstat (limited to 'opendc-workflow')
8 files changed, 112 insertions, 196 deletions
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index a0248a93..ebace07d 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -26,29 +26,24 @@ import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient import org.opendc.workflow.api.Job import org.opendc.workflow.service.internal.WorkflowServiceImpl -import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy import java.time.Clock +import java.time.Duration import kotlin.coroutines.CoroutineContext /** - * A service for cloud workflow management. + * A service for cloud workflow execution. * * The workflow scheduler is modelled after the Reference Architecture for Topology Scheduling by Andreadis et al. */ public interface WorkflowService : AutoCloseable { /** - * Submit the specified [Job] to the workflow service for scheduling. + * Submit the specified [Job] and suspend execution until the job is finished. */ - public suspend fun submit(job: Job) - - /** - * Run the specified [Job] and suspend execution until the job is finished. - */ - public suspend fun run(job: Job) + public suspend fun invoke(job: Job) /** * Terminate the lifecycle of the workflow service, stopping all running workflows. @@ -61,10 +56,9 @@ public interface WorkflowService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. - * @param tracer The event tracer to use. * @param meterProvider The meter provider to use. * @param compute The compute client to use. - * @param mode The scheduling mode to use. + * @param schedulingQuantum The scheduling quantum to use (minimum duration between scheduling cycles). * @param jobAdmissionPolicy The job admission policy to use. * @param jobOrderPolicy The job order policy to use. * @param taskEligibilityPolicy The task eligibility policy to use. @@ -75,7 +69,7 @@ public interface WorkflowService : AutoCloseable { clock: Clock, meterProvider: MeterProvider, compute: ComputeClient, - mode: WorkflowSchedulerMode, + schedulingQuantum: Duration, jobAdmissionPolicy: JobAdmissionPolicy, jobOrderPolicy: JobOrderPolicy, taskEligibilityPolicy: TaskEligibilityPolicy, @@ -86,7 +80,7 @@ public interface WorkflowService : AutoCloseable { clock, meterProvider, compute, - mode, + schedulingQuantum, jobAdmissionPolicy, jobOrderPolicy, taskEligibilityPolicy, diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt index 1bb67169..d2fc1ed7 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt @@ -23,8 +23,9 @@ package org.opendc.workflow.service.internal import org.opendc.workflow.api.Job +import kotlin.coroutines.Continuation -public class JobState(public val job: Job, public val submittedAt: Long) { +public class JobState(public val job: Job, public val submittedAt: Long, internal val cont: Continuation<Unit>) { /** * A flag to indicate whether this job is finished. */ diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt index 29c6aeea..8e76a36e 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt @@ -22,16 +22,44 @@ package org.opendc.workflow.service.internal -public interface WorkflowSchedulerListener { - public fun cycleStarted(scheduler: WorkflowServiceImpl) {} - public fun cycleFinished(scheduler: WorkflowServiceImpl) {} +import org.opendc.workflow.service.WorkflowService +/** + * Interface for listening to events emitted by the [WorkflowService]. + */ +public interface WorkflowSchedulerListener { + /** + * This method is invoked when [job] is submitted to the service. + */ public fun jobSubmitted(job: JobState) {} + + /** + * This method is invoked when [job] is started by the service. + */ public fun jobStarted(job: JobState) {} + + /** + * This method is invoked when [job] finishes. + */ public fun jobFinished(job: JobState) {} + /** + * This method is invoked when [task] becomes ready to be scheduled. + */ public fun taskReady(task: TaskState) {} + + /** + * This method is invoked when [task] is assigned to a machine. + */ public fun taskAssigned(task: TaskState) {} + + /** + * This method is invoked when [task] is started. + */ public fun taskStarted(task: TaskState) {} + + /** + * This method is invoked when [task] finishes. + */ public fun taskFinished(task: TaskState) {} } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index a0fd3fad..1cadce44 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -25,20 +25,18 @@ package org.opendc.workflow.service.internal import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* -import kotlinx.coroutines.flow.map -import mu.KotlinLogging import org.opendc.compute.api.* +import org.opendc.utils.TimerScheduler import org.opendc.workflow.api.Job import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.service.* -import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy import java.time.Clock +import java.time.Duration import java.util.* -import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume @@ -48,10 +46,10 @@ import kotlin.coroutines.resume */ public class WorkflowServiceImpl( context: CoroutineContext, - internal val clock: Clock, + private val clock: Clock, meterProvider: MeterProvider, private val computeClient: ComputeClient, - mode: WorkflowSchedulerMode, + private val schedulingQuantum: Duration, jobAdmissionPolicy: JobAdmissionPolicy, jobOrderPolicy: JobOrderPolicy, taskEligibilityPolicy: TaskEligibilityPolicy, @@ -60,12 +58,7 @@ public class WorkflowServiceImpl( /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. */ - internal val scope = CoroutineScope(context + Job()) - - /** - * The logger instance to use. - */ - private val logger = KotlinLogging.logger {} + private val scope = CoroutineScope(context + Job()) /** * The [Meter] to collect metrics of this service. @@ -75,22 +68,22 @@ public class WorkflowServiceImpl( /** * The incoming jobs ready to be processed by the scheduler. */ - internal val incomingJobs: MutableSet<JobState> = linkedSetOf() + private val incomingJobs: MutableSet<JobState> = linkedSetOf() /** * The incoming tasks ready to be processed by the scheduler. */ - internal val incomingTasks: MutableSet<TaskState> = linkedSetOf() + private val incomingTasks: MutableSet<TaskState> = linkedSetOf() /** * The job queue. */ - internal val jobQueue: Queue<JobState> + private val jobQueue: Queue<JobState> /** * The task queue. */ - internal val taskQueue: Queue<TaskState> + private val taskQueue: Queue<TaskState> /** * The active jobs in the system. @@ -105,12 +98,7 @@ public class WorkflowServiceImpl( /** * The running tasks by [Server]. */ - internal val taskByServer = mutableMapOf<Server, TaskState>() - - /** - * The continuation of the jobs. - */ - private val conts = mutableMapOf<Job, Continuation<Unit>>() + private val taskByServer = mutableMapOf<Server, TaskState>() /** * The root listener of this scheduler. @@ -119,15 +107,7 @@ public class WorkflowServiceImpl( /** * The listeners to delegate to. */ - val listeners = mutableSetOf<WorkflowSchedulerListener>() - - override fun cycleStarted(scheduler: WorkflowServiceImpl) { - listeners.forEach { it.cycleStarted(scheduler) } - } - - override fun cycleFinished(scheduler: WorkflowServiceImpl) { - listeners.forEach { it.cycleFinished(scheduler) } - } + val listeners = mutableListOf<WorkflowSchedulerListener>() override fun jobSubmitted(job: JobState) { listeners.forEach { it.jobSubmitted(job) } @@ -206,25 +186,27 @@ public class WorkflowServiceImpl( .setUnit("1") .build() - private val mode: WorkflowSchedulerMode.Logic + /** + * The [TimerScheduler] to use for scheduling the scheduler cycles. + */ + private val timerScheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock) + private val jobAdmissionPolicy: JobAdmissionPolicy.Logic private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic private lateinit var image: Image init { - this.mode = mode(this) this.jobAdmissionPolicy = jobAdmissionPolicy(this) this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid }) this.taskEligibilityPolicy = taskEligibilityPolicy(this) this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid }) - scope.launch { - image = computeClient.newImage("workflow-runner") - } + + scope.launch { image = computeClient.newImage("workflow-runner") } } - override suspend fun run(job: Job) { + override suspend fun invoke(job: Job): Unit = suspendCancellableCoroutine { cont -> // J1 Incoming Jobs - val jobInstance = JobState(job, clock.millis()) + val jobInstance = JobState(job, clock.millis(), cont) val instances = job.tasks.associateWith { TaskState(jobInstance, it) } @@ -243,36 +225,61 @@ public class WorkflowServiceImpl( submittedTasks.add(1) } - return suspendCancellableCoroutine { cont -> - instances.values.toCollection(jobInstance.tasks) - incomingJobs += jobInstance - rootListener.jobSubmitted(jobInstance) - conts[job] = cont + instances.values.toCollection(jobInstance.tasks) + incomingJobs += jobInstance + rootListener.jobSubmitted(jobInstance) + submittedJobs.add(1) - submittedJobs.add(1) + requestSchedulingCycle() + } - requestCycle() - } + override fun close() { + scope.cancel() } - override suspend fun submit(job: Job) { - scope.launch { run(job) } + /** + * Register a [WorkflowSchedulerListener]. + */ + public fun addListener(listener: WorkflowSchedulerListener) { + rootListener.listeners += listener } - override fun close() { - scope.cancel() + /** + * Unregister a [WorkflowSchedulerListener]. + */ + public fun removeListener(listener: WorkflowSchedulerListener) { + rootListener.listeners -= listener } /** - * Indicate to the scheduler that a scheduling cycle is needed. + * Indicate that a new scheduling cycle is needed due to a change to the service's state. */ - private fun requestCycle() = mode.requestCycle() + private fun requestSchedulingCycle() { + // Bail out in case we have already requested a new cycle or the queue is empty. + if (timerScheduler.isTimerActive(Unit) || incomingJobs.isEmpty()) { + return + } + + val quantum = schedulingQuantum.toMillis() + if (quantum == 0L) { + doSchedule() + return + } + + // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). + // This is important because the slices of the VMs need to be aligned. + // We calculate here the delay until the next scheduling slot. + val delay = quantum - (clock.millis() % quantum) + + timerScheduler.startSingleTimer(Unit, delay) { + doSchedule() + } + } /** * Perform a scheduling cycle immediately. */ - @OptIn(ExperimentalCoroutinesApi::class) - internal suspend fun schedule() { + private fun doSchedule() { // J2 Create list of eligible jobs val iterator = incomingJobs.iterator() while (iterator.hasNext()) { @@ -293,8 +300,8 @@ public class WorkflowServiceImpl( } // J4 Per job - while (jobQueue.isNotEmpty()) { - val jobInstance = jobQueue.poll() + while (true) { + val jobInstance = jobQueue.poll() ?: break // Edge-case: job has no tasks if (jobInstance.isFinished) { @@ -361,7 +368,7 @@ public class WorkflowServiceImpl( } } - public override fun onStateChanged(server: Server, newState: ServerState) { + override fun onStateChanged(server: Server, newState: ServerState) { when (newState) { ServerState.PROVISIONING -> {} ServerState.RUNNING -> { @@ -402,7 +409,7 @@ public class WorkflowServiceImpl( finishJob(job) } - requestCycle() + requestSchedulingCycle() } ServerState.DELETED -> { } @@ -416,14 +423,6 @@ public class WorkflowServiceImpl( finishedJobs.add(1) rootListener.jobFinished(job) - conts.remove(job.job)?.resume(Unit) - } - - public fun addListener(listener: WorkflowSchedulerListener) { - rootListener.listeners += listener - } - - public fun removeListener(listener: WorkflowSchedulerListener) { - rootListener.listeners -= listener + job.cont.resume(Unit) } } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt deleted file mode 100644 index 58e7893f..00000000 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflow.service.scheduler - -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import org.opendc.workflow.service.internal.WorkflowServiceImpl - -/** - * The operating mode of a workflow scheduler. - */ -public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { - /** - * The logic for operating the cycles of a workflow scheduler. - */ - public interface Logic { - /** - * Request a new scheduling cycle to be performed. - */ - public fun requestCycle() - } - - /** - * An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received. - */ - public object Interactive : WorkflowSchedulerMode() { - override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic { - override fun requestCycle() { - scheduler.scope.launch { scheduler.schedule() } - } - } - - override fun toString(): String = "Interactive" - } - - /** - * A batch scheduler triggers a scheduling cycle every time quantum if needed. - */ - public data class Batch(val quantum: Long) : WorkflowSchedulerMode() { - private var next: kotlinx.coroutines.Job? = null - - override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic { - override fun requestCycle() { - if (next == null) { - // In batch mode, we assume that the scheduler runs at a fixed slot every time - // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. - val delay = quantum - (scheduler.clock.millis() % quantum) - - val job = scheduler.scope.launch { - delay(delay) - next = null - scheduler.schedule() - } - next = job - } - } - } - - override fun toString(): String = "Batch($quantum)" - } - - /** - * A scheduling cycle is triggered at a random point in time. - */ - public data class Random(private val random: java.util.Random = java.util.Random(123)) : WorkflowSchedulerMode() { - private var next: kotlinx.coroutines.Job? = null - - override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic { - override fun requestCycle() { - if (next == null) { - val delay = random.nextInt(200).toLong() - - val job = scheduler.scope.launch { - delay(delay) - next = null - scheduler.schedule() - } - next = job - } - } - } - - override fun toString(): String = "Random" - } -} diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index d80c098b..066e9685 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -43,7 +43,6 @@ import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation import org.opendc.trace.Trace -import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy @@ -77,7 +76,7 @@ internal class WorkflowServiceTest { // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines val workflowScheduler = WorkflowSchedulerSpec( - batchMode = WorkflowSchedulerMode.Batch(100), + schedulingQuantum = Duration.ofMillis(100), jobAdmissionPolicy = NullJobAdmissionPolicy, jobOrderPolicy = SubmissionTimeJobOrderPolicy(), taskEligibilityPolicy = NullTaskEligibilityPolicy, diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt index b22e16d9..d6a375b6 100644 --- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt @@ -22,17 +22,17 @@ package org.opendc.workflow.workload -import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy +import java.time.Duration /** * Specification of the scheduling policies of the workflow scheduler. */ public data class WorkflowSchedulerSpec( - val batchMode: WorkflowSchedulerMode, + val schedulingQuantum: Duration, val jobAdmissionPolicy: JobAdmissionPolicy, val jobOrderPolicy: JobOrderPolicy, val taskEligibilityPolicy: TaskEligibilityPolicy, diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt index 236a036b..0198900f 100644 --- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt @@ -49,7 +49,7 @@ public class WorkflowServiceHelper( private val clock: Clock, private val computeClient: ComputeClient, private val schedulerSpec: WorkflowSchedulerSpec - ) : AutoCloseable { +) : AutoCloseable { /** * The [WorkflowService] that is constructed by this runner. */ @@ -76,7 +76,7 @@ public class WorkflowServiceHelper( clock, meterProvider, computeClient, - mode = schedulerSpec.batchMode, + schedulerSpec.schedulingQuantum, jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy, jobOrderPolicy = schedulerSpec.jobOrderPolicy, taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy, @@ -111,7 +111,7 @@ public class WorkflowServiceHelper( delay(((submitTime - offset) - clock.millis()).coerceAtLeast(0)) } - launch { service.run(job) } + launch { service.invoke(job) } } } } |
