summaryrefslogtreecommitdiff
path: root/opendc-workflow/opendc-workflow-service/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-workflow/opendc-workflow-service/src/main')
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt97
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt39
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskState.kt83
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt33
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt37
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt423
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/StagePolicy.kt36
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt105
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt102
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt70
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobOrderPolicy.kt31
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt45
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt37
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt63
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SizeJobOrderPolicy.kt38
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SubmissionTimeJobOrderPolicy.kt38
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt68
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt76
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt64
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt39
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt39
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt69
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt69
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt68
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt43
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt41
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt45
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt60
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt39
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt70
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskOrderPolicy.kt32
31 files changed, 2099 insertions, 0 deletions
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
new file mode 100644
index 00000000..d3358ef1
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service
+
+import io.opentelemetry.api.metrics.Meter
+import org.opendc.compute.api.ComputeClient
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
+import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
+import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
+import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
+import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * A service for cloud workflow management.
+ *
+ * The workflow scheduler is modelled after the Reference Architecture for Topology Scheduling by Andreadis et al.
+ */
+public interface WorkflowService : AutoCloseable {
+ /**
+ * Submit the specified [Job] to the workflow service for scheduling.
+ */
+ public suspend fun submit(job: Job)
+
+ /**
+ * Run the specified [Job] and suspend execution until the job is finished.
+ */
+ public suspend fun run(job: Job)
+
+ /**
+ * Terminate the lifecycle of the workflow service, stopping all running workflows.
+ */
+ public override fun close()
+
+ public companion object {
+ /**
+ * Construct a new [WorkflowService] implementation.
+ *
+ * @param context The [CoroutineContext] to use in the service.
+ * @param clock The clock instance to use.
+ * @param tracer The event tracer to use.
+ * @param meter The meter to use.
+ * @param compute The compute client to use.
+ * @param mode The scheduling mode to use.
+ * @param jobAdmissionPolicy The job admission policy to use.
+ * @param jobOrderPolicy The job order policy to use.
+ * @param taskEligibilityPolicy The task eligibility policy to use.
+ * @param taskOrderPolicy The task order policy to use.
+ */
+ public operator fun invoke(
+ context: CoroutineContext,
+ clock: Clock,
+ meter: Meter,
+ compute: ComputeClient,
+ mode: WorkflowSchedulerMode,
+ jobAdmissionPolicy: JobAdmissionPolicy,
+ jobOrderPolicy: JobOrderPolicy,
+ taskEligibilityPolicy: TaskEligibilityPolicy,
+ taskOrderPolicy: TaskOrderPolicy
+ ): WorkflowService {
+ return WorkflowServiceImpl(
+ context,
+ clock,
+ meter,
+ compute,
+ mode,
+ jobAdmissionPolicy,
+ jobOrderPolicy,
+ taskEligibilityPolicy,
+ taskOrderPolicy
+ )
+ }
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt
new file mode 100644
index 00000000..1bb67169
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.internal
+
+import org.opendc.workflow.api.Job
+
+public class JobState(public val job: Job, public val submittedAt: Long) {
+ /**
+ * A flag to indicate whether this job is finished.
+ */
+ public val isFinished: Boolean
+ get() = tasks.isEmpty()
+
+ internal val tasks: MutableSet<TaskState> = mutableSetOf()
+
+ override fun equals(other: Any?): Boolean = other is JobState && other.job == job
+
+ override fun hashCode(): Int = job.hashCode()
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskState.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskState.kt
new file mode 100644
index 00000000..c3ce1492
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskState.kt
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.internal
+
+import org.opendc.compute.api.Server
+import org.opendc.workflow.api.Task
+
+public class TaskState(public val job: JobState, public val task: Task) {
+ /**
+ * The moment in time the task was started.
+ */
+ public var startedAt: Long = Long.MIN_VALUE
+
+ /**
+ * The moment in time the task was finished.
+ */
+ public var finishedAt: Long = Long.MIN_VALUE
+
+ /**
+ * The dependencies of this task.
+ */
+ public val dependencies: HashSet<TaskState> = HashSet()
+
+ /**
+ * The dependents of this task.
+ */
+ public val dependents: HashSet<TaskState> = HashSet()
+
+ /**
+ * A flag to indicate whether this workflow task instance is a workflow root.
+ */
+ public val isRoot: Boolean
+ get() = dependencies.isEmpty()
+
+ public var state: TaskStatus = TaskStatus.CREATED
+ set(value) {
+ field = value
+
+ // Mark the process as terminated in the graph
+ if (value == TaskStatus.FINISHED) {
+ markTerminated()
+ }
+ }
+
+ public var server: Server? = null
+
+ /**
+ * Mark the specified [TaskView] as terminated.
+ */
+ private fun markTerminated() {
+ for (dependent in dependents) {
+ dependent.dependencies.remove(this)
+
+ if (dependent.isRoot) {
+ dependent.state = TaskStatus.READY
+ }
+ }
+ }
+
+ override fun equals(other: Any?): Boolean = other is TaskState && other.job == job && other.task == task
+
+ override fun hashCode(): Int = task.hashCode()
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt
new file mode 100644
index 00000000..fe941d09
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.internal
+
+/**
+ * The state of a workflow task.
+ */
+public enum class TaskStatus {
+ CREATED,
+ READY,
+ ACTIVE,
+ FINISHED
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt
new file mode 100644
index 00000000..29c6aeea
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.internal
+
+public interface WorkflowSchedulerListener {
+ public fun cycleStarted(scheduler: WorkflowServiceImpl) {}
+ public fun cycleFinished(scheduler: WorkflowServiceImpl) {}
+
+ public fun jobSubmitted(job: JobState) {}
+ public fun jobStarted(job: JobState) {}
+ public fun jobFinished(job: JobState) {}
+
+ public fun taskReady(task: TaskState) {}
+ public fun taskAssigned(task: TaskState) {}
+ public fun taskStarted(task: TaskState) {}
+ public fun taskFinished(task: TaskState) {}
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
new file mode 100644
index 00000000..32191b8f
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -0,0 +1,423 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.internal
+
+import io.opentelemetry.api.metrics.Meter
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.map
+import mu.KotlinLogging
+import org.opendc.compute.api.*
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.api.WORKFLOW_TASK_CORES
+import org.opendc.workflow.service.*
+import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
+import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
+import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
+import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
+import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
+import java.time.Clock
+import java.util.*
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.resume
+
+/**
+ * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for
+ * Datacenter Scheduling.
+ */
+public class WorkflowServiceImpl(
+ context: CoroutineContext,
+ internal val clock: Clock,
+ private val meter: Meter,
+ private val computeClient: ComputeClient,
+ mode: WorkflowSchedulerMode,
+ jobAdmissionPolicy: JobAdmissionPolicy,
+ jobOrderPolicy: JobOrderPolicy,
+ taskEligibilityPolicy: TaskEligibilityPolicy,
+ taskOrderPolicy: TaskOrderPolicy
+) : WorkflowService, ServerWatcher {
+ /**
+ * The [CoroutineScope] of the service bounded by the lifecycle of the service.
+ */
+ internal val scope = CoroutineScope(context + Job())
+
+ /**
+ * The logger instance to use.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The incoming jobs ready to be processed by the scheduler.
+ */
+ internal val incomingJobs: MutableSet<JobState> = linkedSetOf()
+
+ /**
+ * The incoming tasks ready to be processed by the scheduler.
+ */
+ internal val incomingTasks: MutableSet<TaskState> = linkedSetOf()
+
+ /**
+ * The job queue.
+ */
+ internal val jobQueue: Queue<JobState>
+
+ /**
+ * The task queue.
+ */
+ internal val taskQueue: Queue<TaskState>
+
+ /**
+ * The active jobs in the system.
+ */
+ internal val activeJobs: MutableSet<JobState> = mutableSetOf()
+
+ /**
+ * The active tasks in the system.
+ */
+ internal val activeTasks: MutableSet<TaskState> = mutableSetOf()
+
+ /**
+ * The running tasks by [Server].
+ */
+ internal val taskByServer = mutableMapOf<Server, TaskState>()
+
+ /**
+ * The continuation of the jobs.
+ */
+ private val conts = mutableMapOf<Job, Continuation<Unit>>()
+
+ /**
+ * The root listener of this scheduler.
+ */
+ private val rootListener = object : WorkflowSchedulerListener {
+ /**
+ * The listeners to delegate to.
+ */
+ val listeners = mutableSetOf<WorkflowSchedulerListener>()
+
+ override fun cycleStarted(scheduler: WorkflowServiceImpl) {
+ listeners.forEach { it.cycleStarted(scheduler) }
+ }
+
+ override fun cycleFinished(scheduler: WorkflowServiceImpl) {
+ listeners.forEach { it.cycleFinished(scheduler) }
+ }
+
+ override fun jobSubmitted(job: JobState) {
+ listeners.forEach { it.jobSubmitted(job) }
+ }
+
+ override fun jobStarted(job: JobState) {
+ listeners.forEach { it.jobStarted(job) }
+ }
+
+ override fun jobFinished(job: JobState) {
+ listeners.forEach { it.jobFinished(job) }
+ }
+
+ override fun taskReady(task: TaskState) {
+ listeners.forEach { it.taskReady(task) }
+ }
+
+ override fun taskAssigned(task: TaskState) {
+ listeners.forEach { it.taskAssigned(task) }
+ }
+
+ override fun taskStarted(task: TaskState) {
+ listeners.forEach { it.taskStarted(task) }
+ }
+
+ override fun taskFinished(task: TaskState) {
+ listeners.forEach { it.taskFinished(task) }
+ }
+ }
+
+ /**
+ * The number of jobs that have been submitted to the service.
+ */
+ private val submittedJobs = meter.longCounterBuilder("jobs.submitted")
+ .setDescription("Number of submitted jobs")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of jobs that are running.
+ */
+ private val runningJobs = meter.longUpDownCounterBuilder("jobs.active")
+ .setDescription("Number of jobs running")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of jobs that have finished running.
+ */
+ private val finishedJobs = meter.longCounterBuilder("jobs.finished")
+ .setDescription("Number of jobs that finished running")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of tasks that have been submitted to the service.
+ */
+ private val submittedTasks = meter.longCounterBuilder("tasks.submitted")
+ .setDescription("Number of submitted tasks")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of jobs that are running.
+ */
+ private val runningTasks = meter.longUpDownCounterBuilder("tasks.active")
+ .setDescription("Number of tasks running")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The number of jobs that have finished running.
+ */
+ private val finishedTasks = meter.longCounterBuilder("tasks.finished")
+ .setDescription("Number of tasks that finished running")
+ .setUnit("1")
+ .build()
+
+ private val mode: WorkflowSchedulerMode.Logic
+ private val jobAdmissionPolicy: JobAdmissionPolicy.Logic
+ private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
+ private lateinit var image: Image
+
+ init {
+ this.mode = mode(this)
+ this.jobAdmissionPolicy = jobAdmissionPolicy(this)
+ this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid })
+ this.taskEligibilityPolicy = taskEligibilityPolicy(this)
+ this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid })
+ scope.launch {
+ image = computeClient.newImage("workflow-runner")
+ }
+ }
+
+ override suspend fun run(job: Job) {
+ // J1 Incoming Jobs
+ val jobInstance = JobState(job, clock.millis())
+ val instances = job.tasks.associateWith {
+ TaskState(jobInstance, it)
+ }
+
+ for ((task, instance) in instances) {
+ instance.dependencies.addAll(task.dependencies.map { instances[it]!! })
+ task.dependencies.forEach {
+ instances[it]!!.dependents.add(instance)
+ }
+
+ // If the task has no dependency, it is a root task and can immediately be evaluated
+ if (instance.isRoot) {
+ instance.state = TaskStatus.READY
+ }
+
+ submittedTasks.add(1)
+ }
+
+ return suspendCancellableCoroutine { cont ->
+ instances.values.toCollection(jobInstance.tasks)
+ incomingJobs += jobInstance
+ rootListener.jobSubmitted(jobInstance)
+ conts[job] = cont
+
+ submittedJobs.add(1)
+
+ requestCycle()
+ }
+ }
+
+ override suspend fun submit(job: Job) {
+ scope.launch { run(job) }
+ }
+
+ override fun close() {
+ scope.cancel()
+ }
+
+ /**
+ * Indicate to the scheduler that a scheduling cycle is needed.
+ */
+ private fun requestCycle() = mode.requestCycle()
+
+ /**
+ * Perform a scheduling cycle immediately.
+ */
+ @OptIn(ExperimentalCoroutinesApi::class)
+ internal suspend fun schedule() {
+ // J2 Create list of eligible jobs
+ val iterator = incomingJobs.iterator()
+ while (iterator.hasNext()) {
+ val jobInstance = iterator.next()
+ val advice = jobAdmissionPolicy(jobInstance)
+ if (advice.stop) {
+ break
+ } else if (!advice.admit) {
+ continue
+ }
+
+ iterator.remove()
+ jobQueue.add(jobInstance)
+ activeJobs += jobInstance
+
+ runningJobs.add(1)
+ rootListener.jobStarted(jobInstance)
+ }
+
+ // J4 Per job
+ while (jobQueue.isNotEmpty()) {
+ val jobInstance = jobQueue.poll()
+
+ // Edge-case: job has no tasks
+ if (jobInstance.isFinished) {
+ finishJob(jobInstance)
+ }
+
+ // Add job roots to the scheduling queue
+ for (task in jobInstance.tasks) {
+ if (task.state != TaskStatus.READY) {
+ continue
+ }
+
+ incomingTasks += task
+ rootListener.taskReady(task)
+ }
+ }
+
+ // T1 Create list of eligible tasks
+ val taskIterator = incomingTasks.iterator()
+ while (taskIterator.hasNext()) {
+ val taskInstance = taskIterator.next()
+ val advice = taskEligibilityPolicy(taskInstance)
+ if (advice.stop) {
+ break
+ } else if (!advice.admit) {
+ continue
+ }
+
+ taskIterator.remove()
+ taskQueue.add(taskInstance)
+ }
+
+ // T3 Per task
+ while (taskQueue.isNotEmpty()) {
+ val instance = taskQueue.peek()
+
+ val cores = instance.task.metadata[WORKFLOW_TASK_CORES] as? Int ?: 1
+ val image = image
+ scope.launch {
+ val flavor = computeClient.newFlavor(
+ instance.task.name,
+ cores,
+ 1000
+ ) // TODO How to determine memory usage for workflow task
+ val server = computeClient.newServer(
+ instance.task.name,
+ image,
+ flavor,
+ start = false,
+ meta = instance.task.metadata
+ )
+
+ instance.state = TaskStatus.ACTIVE
+ instance.server = server
+ taskByServer[server] = instance
+
+ server.watch(this@WorkflowServiceImpl)
+ server.start()
+ }
+
+ activeTasks += instance
+ taskQueue.poll()
+ rootListener.taskAssigned(instance)
+ }
+ }
+
+ public override fun onStateChanged(server: Server, newState: ServerState) {
+ when (newState) {
+ ServerState.PROVISIONING -> {}
+ ServerState.RUNNING -> {
+ val task = taskByServer.getValue(server)
+ task.startedAt = clock.millis()
+ runningTasks.add(1)
+ rootListener.taskStarted(task)
+ }
+ ServerState.TERMINATED, ServerState.ERROR -> {
+ val task = taskByServer.remove(server) ?: throw IllegalStateException()
+
+ scope.launch {
+ server.delete()
+ server.flavor.delete()
+ }
+
+ val job = task.job
+ task.state = TaskStatus.FINISHED
+ task.finishedAt = clock.millis()
+ job.tasks.remove(task)
+ activeTasks -= task
+
+ runningTasks.add(-1)
+ finishedTasks.add(1)
+ rootListener.taskFinished(task)
+
+ // Add job roots to the scheduling queue
+ for (dependent in task.dependents) {
+ if (dependent.state != TaskStatus.READY) {
+ continue
+ }
+
+ incomingTasks += dependent
+ rootListener.taskReady(dependent)
+ }
+
+ if (job.isFinished) {
+ finishJob(job)
+ }
+
+ requestCycle()
+ }
+ ServerState.DELETED -> {
+ }
+ else -> throw IllegalStateException()
+ }
+ }
+
+ private fun finishJob(job: JobState) {
+ activeJobs -= job
+ runningJobs.add(-1)
+ finishedJobs.add(1)
+ rootListener.jobFinished(job)
+
+ conts.remove(job.job)?.resume(Unit)
+ }
+
+ public fun addListener(listener: WorkflowSchedulerListener) {
+ rootListener.listeners += listener
+ }
+
+ public fun removeListener(listener: WorkflowSchedulerListener) {
+ rootListener.listeners -= listener
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/StagePolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/StagePolicy.kt
new file mode 100644
index 00000000..359fc223
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/StagePolicy.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler
+
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+import java.io.Serializable
+
+/**
+ * A scheduling stage policy.
+ */
+public interface StagePolicy<T : Any> : Serializable {
+ /**
+ * Build the logic of the stage policy.
+ */
+ public operator fun invoke(scheduler: WorkflowServiceImpl): T
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt
new file mode 100644
index 00000000..58e7893f
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler
+
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+
+/**
+ * The operating mode of a workflow scheduler.
+ */
+public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
+ /**
+ * The logic for operating the cycles of a workflow scheduler.
+ */
+ public interface Logic {
+ /**
+ * Request a new scheduling cycle to be performed.
+ */
+ public fun requestCycle()
+ }
+
+ /**
+ * An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received.
+ */
+ public object Interactive : WorkflowSchedulerMode() {
+ override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic {
+ override fun requestCycle() {
+ scheduler.scope.launch { scheduler.schedule() }
+ }
+ }
+
+ override fun toString(): String = "Interactive"
+ }
+
+ /**
+ * A batch scheduler triggers a scheduling cycle every time quantum if needed.
+ */
+ public data class Batch(val quantum: Long) : WorkflowSchedulerMode() {
+ private var next: kotlinx.coroutines.Job? = null
+
+ override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic {
+ override fun requestCycle() {
+ if (next == null) {
+ // In batch mode, we assume that the scheduler runs at a fixed slot every time
+ // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot.
+ val delay = quantum - (scheduler.clock.millis() % quantum)
+
+ val job = scheduler.scope.launch {
+ delay(delay)
+ next = null
+ scheduler.schedule()
+ }
+ next = job
+ }
+ }
+ }
+
+ override fun toString(): String = "Batch($quantum)"
+ }
+
+ /**
+ * A scheduling cycle is triggered at a random point in time.
+ */
+ public data class Random(private val random: java.util.Random = java.util.Random(123)) : WorkflowSchedulerMode() {
+ private var next: kotlinx.coroutines.Job? = null
+
+ override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic {
+ override fun requestCycle() {
+ if (next == null) {
+ val delay = random.nextInt(200).toLong()
+
+ val job = scheduler.scope.launch {
+ delay(delay)
+ next = null
+ scheduler.schedule()
+ }
+ next = job
+ }
+ }
+ }
+
+ override fun toString(): String = "Random"
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt
new file mode 100644
index 00000000..1b5b91b9
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt
@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.job
+
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.api.Task
+import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.WorkflowSchedulerListener
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+
+/**
+ * A [JobOrderPolicy] that orders jobs based on its critical path length.
+ */
+public data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> =
+ object :
+ Comparator<JobState>,
+ WorkflowSchedulerListener {
+ private val results = HashMap<Job, Long>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ private val Job.duration: Long
+ get() = results[this]!!
+
+ override fun jobSubmitted(job: JobState) {
+ results[job.job] = job.job.toposort().map { task ->
+ val estimable = task.metadata[WORKFLOW_TASK_DEADLINE] as? Long?
+ estimable ?: Long.MAX_VALUE
+ }.sum()
+ }
+
+ override fun jobFinished(job: JobState) {
+ results.remove(job.job)
+ }
+
+ override fun compare(o1: JobState, o2: JobState): Int {
+ return compareValuesBy(o1, o2) { it.job.duration }.let { if (ascending) it else -it }
+ }
+ }
+
+ override fun toString(): String {
+ return "Job-Duration(${if (ascending) "asc" else "desc"})"
+ }
+}
+
+/**
+ * Create a topological sorting of the tasks in a job.
+ *
+ * @return The list of tasks within the job topologically sorted.
+ */
+public fun Job.toposort(): List<Task> {
+ val res = mutableListOf<Task>()
+ val visited = mutableSetOf<Task>()
+ val adjacent = mutableMapOf<Task, MutableList<Task>>()
+
+ for (task in tasks) {
+ for (dependency in task.dependencies) {
+ adjacent.getOrPut(dependency) { mutableListOf() }.add(task)
+ }
+ }
+
+ fun visit(task: Task) {
+ visited.add(task)
+
+ adjacent[task] ?: emptyList<Task>()
+ .asSequence()
+ .filter { it !in visited }
+ .forEach { visit(it) }
+
+ res.add(task)
+ }
+
+ tasks
+ .asSequence()
+ .filter { it !in visited }
+ .forEach { visit(it) }
+ return res
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt
new file mode 100644
index 00000000..ed3acff7
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.job
+
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.scheduler.StagePolicy
+
+/**
+ * A policy interface for admitting [JobState]s to a scheduling cycle.
+ */
+public interface JobAdmissionPolicy : StagePolicy<JobAdmissionPolicy.Logic> {
+ public interface Logic {
+ /**
+ * Determine whether the specified [JobState] should be admitted to the scheduling cycle.
+ *
+ * @param job The workflow that has been submitted.
+ * @return The advice for admitting the job.
+ */
+ public operator fun invoke(job: JobState): Advice
+ }
+
+ /**
+ * The advice given to the scheduler by an admission policy.
+ *
+ * @property admit A flag to indicate to the scheduler that the job should be admitted.
+ * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait
+ * for the next scheduling cycle.
+ */
+ public enum class Advice(public val admit: Boolean, public val stop: Boolean) {
+ /**
+ * Admit the current job to the scheduling queue and continue admitting jobs.
+ */
+ ADMIT(true, false),
+
+ /**
+ * Admit the current job to the scheduling queue and stop admitting jobs.
+ */
+ ADMIT_LAST(true, true),
+
+ /**
+ * Deny the current job, but continue admitting jobs.
+ */
+ DENY(false, false),
+
+ /**
+ * Deny the current job and also stop admitting jobs.
+ */
+ STOP(false, true)
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobOrderPolicy.kt
new file mode 100644
index 00000000..adaa6671
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobOrderPolicy.kt
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.job
+
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.scheduler.StagePolicy
+
+/**
+ * A policy interface for ordering admitted workflows in the scheduling queue.
+ */
+public interface JobOrderPolicy : StagePolicy<Comparator<JobState>>
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt
new file mode 100644
index 00000000..6a0bfeb9
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.job
+
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+
+/**
+ * A [JobAdmissionPolicy] that limits the amount of active jobs in the system.
+ *
+ * @property limit The maximum number of concurrent jobs in the system.
+ */
+public data class LimitJobAdmissionPolicy(public val limit: Int) : JobAdmissionPolicy {
+ override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic {
+ override fun invoke(
+ job: JobState
+ ): JobAdmissionPolicy.Advice =
+ if (scheduler.activeJobs.size < limit)
+ JobAdmissionPolicy.Advice.ADMIT
+ else
+ JobAdmissionPolicy.Advice.STOP
+ }
+
+ override fun toString(): String = "Limit-Active($limit)"
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt
new file mode 100644
index 00000000..31f8f8db
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.job
+
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+
+/**
+ * A [JobAdmissionPolicy] that admits all jobs.
+ */
+public object NullJobAdmissionPolicy : JobAdmissionPolicy {
+ override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic {
+ override fun invoke(job: JobState): JobAdmissionPolicy.Advice = JobAdmissionPolicy.Advice.ADMIT
+ }
+
+ override fun toString(): String = "Always"
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt
new file mode 100644
index 00000000..1b359125
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.job
+
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.WorkflowSchedulerListener
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+import java.util.*
+import kotlin.collections.HashMap
+
+/**
+ * A [JobOrderPolicy] that randomly orders jobs.
+ */
+public object RandomJobOrderPolicy : JobOrderPolicy {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> =
+ object :
+ Comparator<JobState>,
+ WorkflowSchedulerListener {
+ private val random = Random(123)
+ private val ids = HashMap<Job, Int>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun jobSubmitted(job: JobState) {
+ ids[job.job] = random.nextInt()
+ }
+
+ override fun jobFinished(job: JobState) {
+ ids.remove(job.job)
+ }
+
+ override fun compare(o1: JobState, o2: JobState): Int {
+ return compareValuesBy(o1, o2) { ids.getValue(it.job) }
+ }
+ }
+
+ override fun toString(): String {
+ return "Random"
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SizeJobOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SizeJobOrderPolicy.kt
new file mode 100644
index 00000000..6998606d
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SizeJobOrderPolicy.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.job
+
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+
+/**
+ * A [SizeJobOrderPolicy] that orders jobs based on the number of tasks it has.
+ */
+public data class SizeJobOrderPolicy(public val ascending: Boolean = true) : JobOrderPolicy {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> =
+ compareBy { it.tasks.size.let { if (ascending) it else -it } }
+
+ override fun toString(): String {
+ return "Job-Size(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SubmissionTimeJobOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SubmissionTimeJobOrderPolicy.kt
new file mode 100644
index 00000000..53d06023
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SubmissionTimeJobOrderPolicy.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.job
+
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+
+/**
+ * A [JobOrderPolicy] orders jobs in FIFO order.
+ */
+public data class SubmissionTimeJobOrderPolicy(public val ascending: Boolean = true) : JobOrderPolicy {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> =
+ compareBy { it.submittedAt.let { if (ascending) it else -it } }
+
+ override fun toString(): String {
+ return "Submission-Time(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt
new file mode 100644
index 00000000..821d4964
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.task
+
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowSchedulerListener
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+
+/**
+ * A [TaskOrderPolicy] that orders tasks based on the number of active relative tasks (w.r.t. its job) in the system.
+ */
+public data class ActiveTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> =
+ object : Comparator<TaskState>, WorkflowSchedulerListener {
+ private val active = mutableMapOf<JobState, Int>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun jobStarted(job: JobState) {
+ active[job] = 0
+ }
+
+ override fun jobFinished(job: JobState) {
+ active.remove(job)
+ }
+
+ override fun taskAssigned(task: TaskState) {
+ active.merge(task.job, 1, Int::plus)
+ }
+
+ override fun taskFinished(task: TaskState) {
+ active.merge(task.job, -1, Int::plus)
+ }
+
+ override fun compare(o1: TaskState, o2: TaskState): Int {
+ return compareValuesBy(o1, o2) { active.getValue(it.job) }.let {
+ if (ascending) it else -it
+ }
+ }
+ }
+
+ override fun toString(): String {
+ return "Active-Per-Job(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt
new file mode 100644
index 00000000..42804f5a
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.task
+
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowSchedulerListener
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+import kotlin.math.max
+
+/**
+ * A [TaskEligibilityPolicy] that balances the tasks based on their job, e.g. do not allow a single job to claim all
+ * resources of the system.
+ *
+ * @property tolerance The maximum difference from the average number of tasks per job in the system as a fraction of
+ * the average.
+ */
+public data class BalancingTaskEligibilityPolicy(public val tolerance: Double = 1.5) : TaskEligibilityPolicy {
+ override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic =
+ object : TaskEligibilityPolicy.Logic, WorkflowSchedulerListener {
+ private val active = mutableMapOf<JobState, Int>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun jobStarted(job: JobState) {
+ active[job] = 0
+ }
+
+ override fun jobFinished(job: JobState) {
+ active.remove(job)
+ }
+
+ override fun taskAssigned(task: TaskState) {
+ active.merge(task.job, 1, Int::plus)
+ }
+
+ override fun taskFinished(task: TaskState) {
+ active.merge(task.job, -1, Int::plus)
+ }
+
+ override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice {
+ val activeJobs = scheduler.activeJobs.size
+ val activeTasks = scheduler.activeTasks.size
+ val baseline = max(activeTasks / activeJobs.toDouble(), 1.0)
+ val activeForJob = active[task.job]!!
+ return if ((activeForJob + 1) / baseline < tolerance)
+ TaskEligibilityPolicy.Advice.ADMIT
+ else
+ TaskEligibilityPolicy.Advice.DENY
+ }
+ }
+
+ override fun toString(): String = "Job-Balance($tolerance)"
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt
new file mode 100644
index 00000000..dae7ad99
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.task
+
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowSchedulerListener
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+
+/**
+ * A [TaskOrderPolicy] that orders tasks based on the number of completed relative tasks.
+ */
+public data class CompletionTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> =
+ object : Comparator<TaskState>, WorkflowSchedulerListener {
+ private val finished = mutableMapOf<JobState, Int>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun jobStarted(job: JobState) {
+ finished[job] = 0
+ }
+
+ override fun jobFinished(job: JobState) {
+ finished.remove(job)
+ }
+
+ override fun taskFinished(task: TaskState) {
+ finished.merge(task.job, 1, Int::plus)
+ }
+
+ override fun compare(o1: TaskState, o2: TaskState): Int {
+ return compareValuesBy(o1, o2) { finished.getValue(it.job) / it.job.tasks.size.toDouble() }.let {
+ if (ascending) it else -it
+ }
+ }
+ }
+
+ override fun toString(): String {
+ return "Job-Completion(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt
new file mode 100644
index 00000000..7786f6ec
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.task
+
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+
+/**
+ * A [TaskOrderPolicy] that orders tasks based on the number of dependency tasks it has.
+ */
+public data class DependenciesTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy {
+ it.task.dependencies.size.let { if (ascending) it else -it }
+ }
+
+ override fun toString(): String {
+ return "Task-Dependencies(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt
new file mode 100644
index 00000000..4fb835d7
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.task
+
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+
+/**
+ * A [TaskOrderPolicy] that orders tasks based on the number of dependent tasks it has.
+ */
+public data class DependentsTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy {
+ it.dependents.size.let { if (ascending) it else -it }
+ }
+
+ override fun toString(): String {
+ return "Task-Dependents(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt
new file mode 100644
index 00000000..3a634de7
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.task
+
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowSchedulerListener
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+
+/**
+ * A [TaskOrderPolicy] that orders tasks based on the average duration of the preceding tasks in the job.
+ */
+public data class DurationHistoryTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> =
+ object : Comparator<TaskState>, WorkflowSchedulerListener {
+ private val results = HashMap<JobState, MutableList<Long>>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun jobStarted(job: JobState) {
+ results[job] = mutableListOf()
+ }
+
+ override fun jobFinished(job: JobState) {
+ results.remove(job)
+ }
+
+ override fun taskFinished(task: TaskState) {
+ results.getValue(task.job) += task.finishedAt - task.startedAt
+ }
+
+ override fun compare(o1: TaskState, o2: TaskState): Int {
+ return compareValuesBy(o1, o2) { key ->
+ val history = results.getValue(key.job)
+ if (history.isEmpty()) {
+ Long.MAX_VALUE
+ } else {
+ history.average()
+ }
+ }.let { if (ascending) it else -it }
+ }
+ }
+
+ override fun toString(): String {
+ return "Task-Duration-History(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt
new file mode 100644
index 00000000..d9fde53a
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.task
+
+import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowSchedulerListener
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+import java.util.*
+import kotlin.collections.HashMap
+import kotlin.collections.getValue
+import kotlin.collections.set
+
+/**
+ * A [TaskOrderPolicy] orders tasks based on the pre-specified (approximate) duration of the task.
+ */
+public data class DurationTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy {
+
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> =
+ object : Comparator<TaskState>, WorkflowSchedulerListener {
+ private val results = HashMap<UUID, Long>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun taskReady(task: TaskState) {
+ val deadline = task.task.metadata[WORKFLOW_TASK_DEADLINE] as? Long?
+ results[task.task.uid] = deadline ?: Long.MAX_VALUE
+ }
+
+ override fun taskFinished(task: TaskState) {
+ results -= task.task.uid
+ }
+
+ private val TaskState.duration: Long
+ get() = results.getValue(task.uid)
+
+ override fun compare(o1: TaskState, o2: TaskState): Int {
+ return compareValuesBy(o1, o2) { state -> state.duration }.let {
+ if (ascending) it else -it
+ }
+ }
+ }
+
+ override fun toString(): String {
+ return "Task-Duration(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt
new file mode 100644
index 00000000..229460df
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.task
+
+import org.opendc.workflow.service.internal.JobState
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowSchedulerListener
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+
+/**
+ * A [TaskEligibilityPolicy] that limits the number of active tasks of a job in the system.
+ */
+public data class LimitPerJobTaskEligibilityPolicy(public val limit: Int) : TaskEligibilityPolicy {
+ override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic =
+ object : TaskEligibilityPolicy.Logic, WorkflowSchedulerListener {
+ private val active = mutableMapOf<JobState, Int>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun jobStarted(job: JobState) {
+ active[job] = 0
+ }
+
+ override fun jobFinished(job: JobState) {
+ active.remove(job)
+ }
+
+ override fun taskAssigned(task: TaskState) {
+ active.merge(task.job, 1, Int::plus)
+ }
+
+ override fun taskFinished(task: TaskState) {
+ active.merge(task.job, -1, Int::plus)
+ }
+
+ override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice {
+ val activeForJob = active[task.job]!!
+ return if (activeForJob <= limit)
+ TaskEligibilityPolicy.Advice.ADMIT
+ else
+ TaskEligibilityPolicy.Advice.DENY
+ }
+ }
+
+ override fun toString(): String = "Limit-Active-Job($limit)"
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt
new file mode 100644
index 00000000..57aa0d58
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.task
+
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+
+/**
+ * A [TaskEligibilityPolicy] that limits the total number of active tasks in the system.
+ */
+public data class LimitTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy {
+ override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic {
+ override fun invoke(
+ task: TaskState
+ ): TaskEligibilityPolicy.Advice =
+ if (scheduler.activeTasks.size < limit)
+ TaskEligibilityPolicy.Advice.ADMIT
+ else
+ TaskEligibilityPolicy.Advice.STOP
+ }
+
+ override fun toString(): String = "Limit-Active($limit)"
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt
new file mode 100644
index 00000000..cfe2aeed
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.task
+
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+
+/**
+ * A [TaskEligibilityPolicy] that always allows new tasks to enter.
+ */
+public object NullTaskEligibilityPolicy : TaskEligibilityPolicy {
+ override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = Logic
+
+ private object Logic : TaskEligibilityPolicy.Logic {
+ override fun invoke(
+ task: TaskState
+ ): TaskEligibilityPolicy.Advice = TaskEligibilityPolicy.Advice.ADMIT
+ }
+
+ override fun toString(): String = "Always"
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt
new file mode 100644
index 00000000..a01439c2
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.task
+
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+import java.util.*
+
+/**
+ * A [TaskEligibilityPolicy] that randomly accepts tasks in the system with some [probability].
+ */
+public data class RandomTaskEligibilityPolicy(val probability: Double = 0.5) : TaskEligibilityPolicy {
+ override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic {
+ val random = Random(123)
+
+ override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice =
+ if (random.nextDouble() <= probability || scheduler.activeTasks.isEmpty())
+ TaskEligibilityPolicy.Advice.ADMIT
+ else {
+ TaskEligibilityPolicy.Advice.DENY
+ }
+ }
+
+ override fun toString(): String = "Random($probability)"
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt
new file mode 100644
index 00000000..c12d6a66
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.task
+
+import org.opendc.workflow.api.Task
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowSchedulerListener
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+import kotlin.random.Random
+
+/**
+ * A [TaskOrderPolicy] that orders the tasks randomly.
+ */
+public object RandomTaskOrderPolicy : TaskOrderPolicy {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> =
+ object : Comparator<TaskState>, WorkflowSchedulerListener {
+ private val random = Random(123)
+ private val ids = HashMap<Task, Int>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun taskReady(task: TaskState) {
+ ids[task.task] = random.nextInt()
+ }
+
+ override fun taskFinished(task: TaskState) {
+ ids.remove(task.task)
+ }
+
+ override fun compare(o1: TaskState, o2: TaskState): Int {
+ return compareValuesBy(o1, o2) { ids.getValue(it.task) }
+ }
+ }
+
+ override fun toString(): String {
+ return "Random"
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt
new file mode 100644
index 00000000..e9bbf815
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.task
+
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.internal.WorkflowServiceImpl
+
+/**
+ * A [TaskOrderPolicy] that orders tasks based on the order of arrival in the queue.
+ */
+public data class SubmissionTimeTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy {
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy {
+ it.job.submittedAt.let { if (ascending) it else -it }
+ }
+
+ override fun toString(): String {
+ return "Submission-Time(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt
new file mode 100644
index 00000000..ee31aee2
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.task
+
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.scheduler.StagePolicy
+
+/**
+ * A policy interface for determining the eligibility of tasks in a scheduling cycle.
+ */
+public interface TaskEligibilityPolicy : StagePolicy<TaskEligibilityPolicy.Logic> {
+ public interface Logic {
+ /**
+ * Determine whether the specified [TaskState] is eligible to be scheduled.
+ *
+ * @param task The task instance to schedule.
+ * @return The advice for marking the task.
+ */
+ public operator fun invoke(task: TaskState): Advice
+ }
+
+ /**
+ * The advice given to the scheduler by an admission policy.
+ *
+ * @property admit A flag to indicate to the scheduler that the job should be admitted.
+ * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait
+ * for the next scheduling cycle.
+ */
+ public enum class Advice(public val admit: Boolean, public val stop: Boolean) {
+ /**
+ * Admit the current job to the scheduling queue and continue admitting jobs.
+ */
+ ADMIT(true, false),
+
+ /**
+ * Admit the current job to the scheduling queue and stop admitting jobs.
+ */
+ ADMIT_LAST(true, true),
+
+ /**
+ * Deny the current job, but continue admitting jobs.
+ */
+ DENY(false, false),
+
+ /**
+ * Deny the current job and also stop admitting jobs.
+ */
+ STOP(false, true)
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskOrderPolicy.kt
new file mode 100644
index 00000000..fffcb765
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskOrderPolicy.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service.scheduler.task
+
+import org.opendc.workflow.service.internal.TaskState
+import org.opendc.workflow.service.scheduler.StagePolicy
+
+/**
+ * This interface represents the **T2** stage of the Reference Architecture for Topology Schedulers and provides the
+ * scheduler with a sorted list of tasks to schedule.
+ */
+public interface TaskOrderPolicy : StagePolicy<Comparator<TaskState>>