summaryrefslogtreecommitdiff
path: root/opendc-workflow/opendc-workflow-service/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-11-16 16:30:55 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-11-17 11:39:22 +0100
commit1dcdca7e09e34cdde53f6d14db56780688e19eae (patch)
treef0982efeacf4910a73eb3133a54b50352984018f /opendc-workflow/opendc-workflow-service/src/main
parent1cfd967d6d27f339b264449ff2a1adeb705de598 (diff)
refactor(workflow): Remove WorkflowSchedulerMode
This change removes the WorkflowSchedulerMode interface in favour of an integrated timer scheduler approach that batches scheduling cycles over a user-specified quantum. This quantum can be lowered to a small value to get the interactive behavior. There is no replacement for the random behavior, but we believe that such a policy makes no sense in a real-world scenario.
Diffstat (limited to 'opendc-workflow/opendc-workflow-service/src/main')
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt20
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt3
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt34
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt133
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt105
5 files changed, 106 insertions, 189 deletions
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
index a0248a93..ebace07d 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
@@ -26,29 +26,24 @@ import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.compute.api.ComputeClient
import org.opendc.workflow.api.Job
import org.opendc.workflow.service.internal.WorkflowServiceImpl
-import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
import java.time.Clock
+import java.time.Duration
import kotlin.coroutines.CoroutineContext
/**
- * A service for cloud workflow management.
+ * A service for cloud workflow execution.
*
* The workflow scheduler is modelled after the Reference Architecture for Topology Scheduling by Andreadis et al.
*/
public interface WorkflowService : AutoCloseable {
/**
- * Submit the specified [Job] to the workflow service for scheduling.
+ * Submit the specified [Job] and suspend execution until the job is finished.
*/
- public suspend fun submit(job: Job)
-
- /**
- * Run the specified [Job] and suspend execution until the job is finished.
- */
- public suspend fun run(job: Job)
+ public suspend fun invoke(job: Job)
/**
* Terminate the lifecycle of the workflow service, stopping all running workflows.
@@ -61,10 +56,9 @@ public interface WorkflowService : AutoCloseable {
*
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
- * @param tracer The event tracer to use.
* @param meterProvider The meter provider to use.
* @param compute The compute client to use.
- * @param mode The scheduling mode to use.
+ * @param schedulingQuantum The scheduling quantum to use (minimum duration between scheduling cycles).
* @param jobAdmissionPolicy The job admission policy to use.
* @param jobOrderPolicy The job order policy to use.
* @param taskEligibilityPolicy The task eligibility policy to use.
@@ -75,7 +69,7 @@ public interface WorkflowService : AutoCloseable {
clock: Clock,
meterProvider: MeterProvider,
compute: ComputeClient,
- mode: WorkflowSchedulerMode,
+ schedulingQuantum: Duration,
jobAdmissionPolicy: JobAdmissionPolicy,
jobOrderPolicy: JobOrderPolicy,
taskEligibilityPolicy: TaskEligibilityPolicy,
@@ -86,7 +80,7 @@ public interface WorkflowService : AutoCloseable {
clock,
meterProvider,
compute,
- mode,
+ schedulingQuantum,
jobAdmissionPolicy,
jobOrderPolicy,
taskEligibilityPolicy,
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt
index 1bb67169..d2fc1ed7 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt
@@ -23,8 +23,9 @@
package org.opendc.workflow.service.internal
import org.opendc.workflow.api.Job
+import kotlin.coroutines.Continuation
-public class JobState(public val job: Job, public val submittedAt: Long) {
+public class JobState(public val job: Job, public val submittedAt: Long, internal val cont: Continuation<Unit>) {
/**
* A flag to indicate whether this job is finished.
*/
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt
index 29c6aeea..8e76a36e 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt
@@ -22,16 +22,44 @@
package org.opendc.workflow.service.internal
-public interface WorkflowSchedulerListener {
- public fun cycleStarted(scheduler: WorkflowServiceImpl) {}
- public fun cycleFinished(scheduler: WorkflowServiceImpl) {}
+import org.opendc.workflow.service.WorkflowService
+/**
+ * Interface for listening to events emitted by the [WorkflowService].
+ */
+public interface WorkflowSchedulerListener {
+ /**
+ * This method is invoked when [job] is submitted to the service.
+ */
public fun jobSubmitted(job: JobState) {}
+
+ /**
+ * This method is invoked when [job] is started by the service.
+ */
public fun jobStarted(job: JobState) {}
+
+ /**
+ * This method is invoked when [job] finishes.
+ */
public fun jobFinished(job: JobState) {}
+ /**
+ * This method is invoked when [task] becomes ready to be scheduled.
+ */
public fun taskReady(task: TaskState) {}
+
+ /**
+ * This method is invoked when [task] is assigned to a machine.
+ */
public fun taskAssigned(task: TaskState) {}
+
+ /**
+ * This method is invoked when [task] is started.
+ */
public fun taskStarted(task: TaskState) {}
+
+ /**
+ * This method is invoked when [task] finishes.
+ */
public fun taskFinished(task: TaskState) {}
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index a0fd3fad..1cadce44 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -25,20 +25,18 @@ package org.opendc.workflow.service.internal
import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.map
-import mu.KotlinLogging
import org.opendc.compute.api.*
+import org.opendc.utils.TimerScheduler
import org.opendc.workflow.api.Job
import org.opendc.workflow.api.WORKFLOW_TASK_CORES
import org.opendc.workflow.service.*
-import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
import java.time.Clock
+import java.time.Duration
import java.util.*
-import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
@@ -48,10 +46,10 @@ import kotlin.coroutines.resume
*/
public class WorkflowServiceImpl(
context: CoroutineContext,
- internal val clock: Clock,
+ private val clock: Clock,
meterProvider: MeterProvider,
private val computeClient: ComputeClient,
- mode: WorkflowSchedulerMode,
+ private val schedulingQuantum: Duration,
jobAdmissionPolicy: JobAdmissionPolicy,
jobOrderPolicy: JobOrderPolicy,
taskEligibilityPolicy: TaskEligibilityPolicy,
@@ -60,12 +58,7 @@ public class WorkflowServiceImpl(
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
*/
- internal val scope = CoroutineScope(context + Job())
-
- /**
- * The logger instance to use.
- */
- private val logger = KotlinLogging.logger {}
+ private val scope = CoroutineScope(context + Job())
/**
* The [Meter] to collect metrics of this service.
@@ -75,22 +68,22 @@ public class WorkflowServiceImpl(
/**
* The incoming jobs ready to be processed by the scheduler.
*/
- internal val incomingJobs: MutableSet<JobState> = linkedSetOf()
+ private val incomingJobs: MutableSet<JobState> = linkedSetOf()
/**
* The incoming tasks ready to be processed by the scheduler.
*/
- internal val incomingTasks: MutableSet<TaskState> = linkedSetOf()
+ private val incomingTasks: MutableSet<TaskState> = linkedSetOf()
/**
* The job queue.
*/
- internal val jobQueue: Queue<JobState>
+ private val jobQueue: Queue<JobState>
/**
* The task queue.
*/
- internal val taskQueue: Queue<TaskState>
+ private val taskQueue: Queue<TaskState>
/**
* The active jobs in the system.
@@ -105,12 +98,7 @@ public class WorkflowServiceImpl(
/**
* The running tasks by [Server].
*/
- internal val taskByServer = mutableMapOf<Server, TaskState>()
-
- /**
- * The continuation of the jobs.
- */
- private val conts = mutableMapOf<Job, Continuation<Unit>>()
+ private val taskByServer = mutableMapOf<Server, TaskState>()
/**
* The root listener of this scheduler.
@@ -119,15 +107,7 @@ public class WorkflowServiceImpl(
/**
* The listeners to delegate to.
*/
- val listeners = mutableSetOf<WorkflowSchedulerListener>()
-
- override fun cycleStarted(scheduler: WorkflowServiceImpl) {
- listeners.forEach { it.cycleStarted(scheduler) }
- }
-
- override fun cycleFinished(scheduler: WorkflowServiceImpl) {
- listeners.forEach { it.cycleFinished(scheduler) }
- }
+ val listeners = mutableListOf<WorkflowSchedulerListener>()
override fun jobSubmitted(job: JobState) {
listeners.forEach { it.jobSubmitted(job) }
@@ -206,25 +186,27 @@ public class WorkflowServiceImpl(
.setUnit("1")
.build()
- private val mode: WorkflowSchedulerMode.Logic
+ /**
+ * The [TimerScheduler] to use for scheduling the scheduler cycles.
+ */
+ private val timerScheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock)
+
private val jobAdmissionPolicy: JobAdmissionPolicy.Logic
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
private lateinit var image: Image
init {
- this.mode = mode(this)
this.jobAdmissionPolicy = jobAdmissionPolicy(this)
this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid })
this.taskEligibilityPolicy = taskEligibilityPolicy(this)
this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid })
- scope.launch {
- image = computeClient.newImage("workflow-runner")
- }
+
+ scope.launch { image = computeClient.newImage("workflow-runner") }
}
- override suspend fun run(job: Job) {
+ override suspend fun invoke(job: Job): Unit = suspendCancellableCoroutine { cont ->
// J1 Incoming Jobs
- val jobInstance = JobState(job, clock.millis())
+ val jobInstance = JobState(job, clock.millis(), cont)
val instances = job.tasks.associateWith {
TaskState(jobInstance, it)
}
@@ -243,36 +225,61 @@ public class WorkflowServiceImpl(
submittedTasks.add(1)
}
- return suspendCancellableCoroutine { cont ->
- instances.values.toCollection(jobInstance.tasks)
- incomingJobs += jobInstance
- rootListener.jobSubmitted(jobInstance)
- conts[job] = cont
+ instances.values.toCollection(jobInstance.tasks)
+ incomingJobs += jobInstance
+ rootListener.jobSubmitted(jobInstance)
+ submittedJobs.add(1)
- submittedJobs.add(1)
+ requestSchedulingCycle()
+ }
- requestCycle()
- }
+ override fun close() {
+ scope.cancel()
}
- override suspend fun submit(job: Job) {
- scope.launch { run(job) }
+ /**
+ * Register a [WorkflowSchedulerListener].
+ */
+ public fun addListener(listener: WorkflowSchedulerListener) {
+ rootListener.listeners += listener
}
- override fun close() {
- scope.cancel()
+ /**
+ * Unregister a [WorkflowSchedulerListener].
+ */
+ public fun removeListener(listener: WorkflowSchedulerListener) {
+ rootListener.listeners -= listener
}
/**
- * Indicate to the scheduler that a scheduling cycle is needed.
+ * Indicate that a new scheduling cycle is needed due to a change to the service's state.
*/
- private fun requestCycle() = mode.requestCycle()
+ private fun requestSchedulingCycle() {
+ // Bail out in case we have already requested a new cycle or the queue is empty.
+ if (timerScheduler.isTimerActive(Unit) || incomingJobs.isEmpty()) {
+ return
+ }
+
+ val quantum = schedulingQuantum.toMillis()
+ if (quantum == 0L) {
+ doSchedule()
+ return
+ }
+
+ // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
+ // This is important because the slices of the VMs need to be aligned.
+ // We calculate here the delay until the next scheduling slot.
+ val delay = quantum - (clock.millis() % quantum)
+
+ timerScheduler.startSingleTimer(Unit, delay) {
+ doSchedule()
+ }
+ }
/**
* Perform a scheduling cycle immediately.
*/
- @OptIn(ExperimentalCoroutinesApi::class)
- internal suspend fun schedule() {
+ private fun doSchedule() {
// J2 Create list of eligible jobs
val iterator = incomingJobs.iterator()
while (iterator.hasNext()) {
@@ -293,8 +300,8 @@ public class WorkflowServiceImpl(
}
// J4 Per job
- while (jobQueue.isNotEmpty()) {
- val jobInstance = jobQueue.poll()
+ while (true) {
+ val jobInstance = jobQueue.poll() ?: break
// Edge-case: job has no tasks
if (jobInstance.isFinished) {
@@ -361,7 +368,7 @@ public class WorkflowServiceImpl(
}
}
- public override fun onStateChanged(server: Server, newState: ServerState) {
+ override fun onStateChanged(server: Server, newState: ServerState) {
when (newState) {
ServerState.PROVISIONING -> {}
ServerState.RUNNING -> {
@@ -402,7 +409,7 @@ public class WorkflowServiceImpl(
finishJob(job)
}
- requestCycle()
+ requestSchedulingCycle()
}
ServerState.DELETED -> {
}
@@ -416,14 +423,6 @@ public class WorkflowServiceImpl(
finishedJobs.add(1)
rootListener.jobFinished(job)
- conts.remove(job.job)?.resume(Unit)
- }
-
- public fun addListener(listener: WorkflowSchedulerListener) {
- rootListener.listeners += listener
- }
-
- public fun removeListener(listener: WorkflowSchedulerListener) {
- rootListener.listeners -= listener
+ job.cont.resume(Unit)
}
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt
deleted file mode 100644
index 58e7893f..00000000
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflow.service.scheduler
-
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import org.opendc.workflow.service.internal.WorkflowServiceImpl
-
-/**
- * The operating mode of a workflow scheduler.
- */
-public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
- /**
- * The logic for operating the cycles of a workflow scheduler.
- */
- public interface Logic {
- /**
- * Request a new scheduling cycle to be performed.
- */
- public fun requestCycle()
- }
-
- /**
- * An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received.
- */
- public object Interactive : WorkflowSchedulerMode() {
- override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic {
- override fun requestCycle() {
- scheduler.scope.launch { scheduler.schedule() }
- }
- }
-
- override fun toString(): String = "Interactive"
- }
-
- /**
- * A batch scheduler triggers a scheduling cycle every time quantum if needed.
- */
- public data class Batch(val quantum: Long) : WorkflowSchedulerMode() {
- private var next: kotlinx.coroutines.Job? = null
-
- override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic {
- override fun requestCycle() {
- if (next == null) {
- // In batch mode, we assume that the scheduler runs at a fixed slot every time
- // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot.
- val delay = quantum - (scheduler.clock.millis() % quantum)
-
- val job = scheduler.scope.launch {
- delay(delay)
- next = null
- scheduler.schedule()
- }
- next = job
- }
- }
- }
-
- override fun toString(): String = "Batch($quantum)"
- }
-
- /**
- * A scheduling cycle is triggered at a random point in time.
- */
- public data class Random(private val random: java.util.Random = java.util.Random(123)) : WorkflowSchedulerMode() {
- private var next: kotlinx.coroutines.Job? = null
-
- override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic {
- override fun requestCycle() {
- if (next == null) {
- val delay = random.nextInt(200).toLong()
-
- val job = scheduler.scope.launch {
- delay(delay)
- next = null
- scheduler.schedule()
- }
- next = job
- }
- }
- }
-
- override fun toString(): String = "Random"
- }
-}