diff options
Diffstat (limited to 'opendc-workflow/opendc-workflow-service/src/main')
31 files changed, 2099 insertions, 0 deletions
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt new file mode 100644 index 00000000..d3358ef1 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service + +import io.opentelemetry.api.metrics.Meter +import org.opendc.compute.api.ComputeClient +import org.opendc.workflow.api.Job +import org.opendc.workflow.service.internal.WorkflowServiceImpl +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.JobOrderPolicy +import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * A service for cloud workflow management. + * + * The workflow scheduler is modelled after the Reference Architecture for Topology Scheduling by Andreadis et al. + */ +public interface WorkflowService : AutoCloseable { + /** + * Submit the specified [Job] to the workflow service for scheduling. + */ + public suspend fun submit(job: Job) + + /** + * Run the specified [Job] and suspend execution until the job is finished. + */ + public suspend fun run(job: Job) + + /** + * Terminate the lifecycle of the workflow service, stopping all running workflows. + */ + public override fun close() + + public companion object { + /** + * Construct a new [WorkflowService] implementation. + * + * @param context The [CoroutineContext] to use in the service. + * @param clock The clock instance to use. + * @param tracer The event tracer to use. + * @param meter The meter to use. + * @param compute The compute client to use. + * @param mode The scheduling mode to use. + * @param jobAdmissionPolicy The job admission policy to use. + * @param jobOrderPolicy The job order policy to use. + * @param taskEligibilityPolicy The task eligibility policy to use. + * @param taskOrderPolicy The task order policy to use. + */ + public operator fun invoke( + context: CoroutineContext, + clock: Clock, + meter: Meter, + compute: ComputeClient, + mode: WorkflowSchedulerMode, + jobAdmissionPolicy: JobAdmissionPolicy, + jobOrderPolicy: JobOrderPolicy, + taskEligibilityPolicy: TaskEligibilityPolicy, + taskOrderPolicy: TaskOrderPolicy + ): WorkflowService { + return WorkflowServiceImpl( + context, + clock, + meter, + compute, + mode, + jobAdmissionPolicy, + jobOrderPolicy, + taskEligibilityPolicy, + taskOrderPolicy + ) + } + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt new file mode 100644 index 00000000..1bb67169 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.internal + +import org.opendc.workflow.api.Job + +public class JobState(public val job: Job, public val submittedAt: Long) { + /** + * A flag to indicate whether this job is finished. + */ + public val isFinished: Boolean + get() = tasks.isEmpty() + + internal val tasks: MutableSet<TaskState> = mutableSetOf() + + override fun equals(other: Any?): Boolean = other is JobState && other.job == job + + override fun hashCode(): Int = job.hashCode() +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskState.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskState.kt new file mode 100644 index 00000000..c3ce1492 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskState.kt @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.internal + +import org.opendc.compute.api.Server +import org.opendc.workflow.api.Task + +public class TaskState(public val job: JobState, public val task: Task) { + /** + * The moment in time the task was started. + */ + public var startedAt: Long = Long.MIN_VALUE + + /** + * The moment in time the task was finished. + */ + public var finishedAt: Long = Long.MIN_VALUE + + /** + * The dependencies of this task. + */ + public val dependencies: HashSet<TaskState> = HashSet() + + /** + * The dependents of this task. + */ + public val dependents: HashSet<TaskState> = HashSet() + + /** + * A flag to indicate whether this workflow task instance is a workflow root. + */ + public val isRoot: Boolean + get() = dependencies.isEmpty() + + public var state: TaskStatus = TaskStatus.CREATED + set(value) { + field = value + + // Mark the process as terminated in the graph + if (value == TaskStatus.FINISHED) { + markTerminated() + } + } + + public var server: Server? = null + + /** + * Mark the specified [TaskView] as terminated. + */ + private fun markTerminated() { + for (dependent in dependents) { + dependent.dependencies.remove(this) + + if (dependent.isRoot) { + dependent.state = TaskStatus.READY + } + } + } + + override fun equals(other: Any?): Boolean = other is TaskState && other.job == job && other.task == task + + override fun hashCode(): Int = task.hashCode() +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt new file mode 100644 index 00000000..fe941d09 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.internal + +/** + * The state of a workflow task. + */ +public enum class TaskStatus { + CREATED, + READY, + ACTIVE, + FINISHED +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt new file mode 100644 index 00000000..29c6aeea --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.internal + +public interface WorkflowSchedulerListener { + public fun cycleStarted(scheduler: WorkflowServiceImpl) {} + public fun cycleFinished(scheduler: WorkflowServiceImpl) {} + + public fun jobSubmitted(job: JobState) {} + public fun jobStarted(job: JobState) {} + public fun jobFinished(job: JobState) {} + + public fun taskReady(task: TaskState) {} + public fun taskAssigned(task: TaskState) {} + public fun taskStarted(task: TaskState) {} + public fun taskFinished(task: TaskState) {} +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt new file mode 100644 index 00000000..32191b8f --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -0,0 +1,423 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.internal + +import io.opentelemetry.api.metrics.Meter +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.map +import mu.KotlinLogging +import org.opendc.compute.api.* +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.service.* +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.JobOrderPolicy +import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy +import java.time.Clock +import java.util.* +import kotlin.coroutines.Continuation +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.resume + +/** + * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for + * Datacenter Scheduling. + */ +public class WorkflowServiceImpl( + context: CoroutineContext, + internal val clock: Clock, + private val meter: Meter, + private val computeClient: ComputeClient, + mode: WorkflowSchedulerMode, + jobAdmissionPolicy: JobAdmissionPolicy, + jobOrderPolicy: JobOrderPolicy, + taskEligibilityPolicy: TaskEligibilityPolicy, + taskOrderPolicy: TaskOrderPolicy +) : WorkflowService, ServerWatcher { + /** + * The [CoroutineScope] of the service bounded by the lifecycle of the service. + */ + internal val scope = CoroutineScope(context + Job()) + + /** + * The logger instance to use. + */ + private val logger = KotlinLogging.logger {} + + /** + * The incoming jobs ready to be processed by the scheduler. + */ + internal val incomingJobs: MutableSet<JobState> = linkedSetOf() + + /** + * The incoming tasks ready to be processed by the scheduler. + */ + internal val incomingTasks: MutableSet<TaskState> = linkedSetOf() + + /** + * The job queue. + */ + internal val jobQueue: Queue<JobState> + + /** + * The task queue. + */ + internal val taskQueue: Queue<TaskState> + + /** + * The active jobs in the system. + */ + internal val activeJobs: MutableSet<JobState> = mutableSetOf() + + /** + * The active tasks in the system. + */ + internal val activeTasks: MutableSet<TaskState> = mutableSetOf() + + /** + * The running tasks by [Server]. + */ + internal val taskByServer = mutableMapOf<Server, TaskState>() + + /** + * The continuation of the jobs. + */ + private val conts = mutableMapOf<Job, Continuation<Unit>>() + + /** + * The root listener of this scheduler. + */ + private val rootListener = object : WorkflowSchedulerListener { + /** + * The listeners to delegate to. + */ + val listeners = mutableSetOf<WorkflowSchedulerListener>() + + override fun cycleStarted(scheduler: WorkflowServiceImpl) { + listeners.forEach { it.cycleStarted(scheduler) } + } + + override fun cycleFinished(scheduler: WorkflowServiceImpl) { + listeners.forEach { it.cycleFinished(scheduler) } + } + + override fun jobSubmitted(job: JobState) { + listeners.forEach { it.jobSubmitted(job) } + } + + override fun jobStarted(job: JobState) { + listeners.forEach { it.jobStarted(job) } + } + + override fun jobFinished(job: JobState) { + listeners.forEach { it.jobFinished(job) } + } + + override fun taskReady(task: TaskState) { + listeners.forEach { it.taskReady(task) } + } + + override fun taskAssigned(task: TaskState) { + listeners.forEach { it.taskAssigned(task) } + } + + override fun taskStarted(task: TaskState) { + listeners.forEach { it.taskStarted(task) } + } + + override fun taskFinished(task: TaskState) { + listeners.forEach { it.taskFinished(task) } + } + } + + /** + * The number of jobs that have been submitted to the service. + */ + private val submittedJobs = meter.longCounterBuilder("jobs.submitted") + .setDescription("Number of submitted jobs") + .setUnit("1") + .build() + + /** + * The number of jobs that are running. + */ + private val runningJobs = meter.longUpDownCounterBuilder("jobs.active") + .setDescription("Number of jobs running") + .setUnit("1") + .build() + + /** + * The number of jobs that have finished running. + */ + private val finishedJobs = meter.longCounterBuilder("jobs.finished") + .setDescription("Number of jobs that finished running") + .setUnit("1") + .build() + + /** + * The number of tasks that have been submitted to the service. + */ + private val submittedTasks = meter.longCounterBuilder("tasks.submitted") + .setDescription("Number of submitted tasks") + .setUnit("1") + .build() + + /** + * The number of jobs that are running. + */ + private val runningTasks = meter.longUpDownCounterBuilder("tasks.active") + .setDescription("Number of tasks running") + .setUnit("1") + .build() + + /** + * The number of jobs that have finished running. + */ + private val finishedTasks = meter.longCounterBuilder("tasks.finished") + .setDescription("Number of tasks that finished running") + .setUnit("1") + .build() + + private val mode: WorkflowSchedulerMode.Logic + private val jobAdmissionPolicy: JobAdmissionPolicy.Logic + private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic + private lateinit var image: Image + + init { + this.mode = mode(this) + this.jobAdmissionPolicy = jobAdmissionPolicy(this) + this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid }) + this.taskEligibilityPolicy = taskEligibilityPolicy(this) + this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid }) + scope.launch { + image = computeClient.newImage("workflow-runner") + } + } + + override suspend fun run(job: Job) { + // J1 Incoming Jobs + val jobInstance = JobState(job, clock.millis()) + val instances = job.tasks.associateWith { + TaskState(jobInstance, it) + } + + for ((task, instance) in instances) { + instance.dependencies.addAll(task.dependencies.map { instances[it]!! }) + task.dependencies.forEach { + instances[it]!!.dependents.add(instance) + } + + // If the task has no dependency, it is a root task and can immediately be evaluated + if (instance.isRoot) { + instance.state = TaskStatus.READY + } + + submittedTasks.add(1) + } + + return suspendCancellableCoroutine { cont -> + instances.values.toCollection(jobInstance.tasks) + incomingJobs += jobInstance + rootListener.jobSubmitted(jobInstance) + conts[job] = cont + + submittedJobs.add(1) + + requestCycle() + } + } + + override suspend fun submit(job: Job) { + scope.launch { run(job) } + } + + override fun close() { + scope.cancel() + } + + /** + * Indicate to the scheduler that a scheduling cycle is needed. + */ + private fun requestCycle() = mode.requestCycle() + + /** + * Perform a scheduling cycle immediately. + */ + @OptIn(ExperimentalCoroutinesApi::class) + internal suspend fun schedule() { + // J2 Create list of eligible jobs + val iterator = incomingJobs.iterator() + while (iterator.hasNext()) { + val jobInstance = iterator.next() + val advice = jobAdmissionPolicy(jobInstance) + if (advice.stop) { + break + } else if (!advice.admit) { + continue + } + + iterator.remove() + jobQueue.add(jobInstance) + activeJobs += jobInstance + + runningJobs.add(1) + rootListener.jobStarted(jobInstance) + } + + // J4 Per job + while (jobQueue.isNotEmpty()) { + val jobInstance = jobQueue.poll() + + // Edge-case: job has no tasks + if (jobInstance.isFinished) { + finishJob(jobInstance) + } + + // Add job roots to the scheduling queue + for (task in jobInstance.tasks) { + if (task.state != TaskStatus.READY) { + continue + } + + incomingTasks += task + rootListener.taskReady(task) + } + } + + // T1 Create list of eligible tasks + val taskIterator = incomingTasks.iterator() + while (taskIterator.hasNext()) { + val taskInstance = taskIterator.next() + val advice = taskEligibilityPolicy(taskInstance) + if (advice.stop) { + break + } else if (!advice.admit) { + continue + } + + taskIterator.remove() + taskQueue.add(taskInstance) + } + + // T3 Per task + while (taskQueue.isNotEmpty()) { + val instance = taskQueue.peek() + + val cores = instance.task.metadata[WORKFLOW_TASK_CORES] as? Int ?: 1 + val image = image + scope.launch { + val flavor = computeClient.newFlavor( + instance.task.name, + cores, + 1000 + ) // TODO How to determine memory usage for workflow task + val server = computeClient.newServer( + instance.task.name, + image, + flavor, + start = false, + meta = instance.task.metadata + ) + + instance.state = TaskStatus.ACTIVE + instance.server = server + taskByServer[server] = instance + + server.watch(this@WorkflowServiceImpl) + server.start() + } + + activeTasks += instance + taskQueue.poll() + rootListener.taskAssigned(instance) + } + } + + public override fun onStateChanged(server: Server, newState: ServerState) { + when (newState) { + ServerState.PROVISIONING -> {} + ServerState.RUNNING -> { + val task = taskByServer.getValue(server) + task.startedAt = clock.millis() + runningTasks.add(1) + rootListener.taskStarted(task) + } + ServerState.TERMINATED, ServerState.ERROR -> { + val task = taskByServer.remove(server) ?: throw IllegalStateException() + + scope.launch { + server.delete() + server.flavor.delete() + } + + val job = task.job + task.state = TaskStatus.FINISHED + task.finishedAt = clock.millis() + job.tasks.remove(task) + activeTasks -= task + + runningTasks.add(-1) + finishedTasks.add(1) + rootListener.taskFinished(task) + + // Add job roots to the scheduling queue + for (dependent in task.dependents) { + if (dependent.state != TaskStatus.READY) { + continue + } + + incomingTasks += dependent + rootListener.taskReady(dependent) + } + + if (job.isFinished) { + finishJob(job) + } + + requestCycle() + } + ServerState.DELETED -> { + } + else -> throw IllegalStateException() + } + } + + private fun finishJob(job: JobState) { + activeJobs -= job + runningJobs.add(-1) + finishedJobs.add(1) + rootListener.jobFinished(job) + + conts.remove(job.job)?.resume(Unit) + } + + public fun addListener(listener: WorkflowSchedulerListener) { + rootListener.listeners += listener + } + + public fun removeListener(listener: WorkflowSchedulerListener) { + rootListener.listeners -= listener + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/StagePolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/StagePolicy.kt new file mode 100644 index 00000000..359fc223 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/StagePolicy.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler + +import org.opendc.workflow.service.internal.WorkflowServiceImpl +import java.io.Serializable + +/** + * A scheduling stage policy. + */ +public interface StagePolicy<T : Any> : Serializable { + /** + * Build the logic of the stage policy. + */ + public operator fun invoke(scheduler: WorkflowServiceImpl): T +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt new file mode 100644 index 00000000..58e7893f --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler + +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import org.opendc.workflow.service.internal.WorkflowServiceImpl + +/** + * The operating mode of a workflow scheduler. + */ +public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { + /** + * The logic for operating the cycles of a workflow scheduler. + */ + public interface Logic { + /** + * Request a new scheduling cycle to be performed. + */ + public fun requestCycle() + } + + /** + * An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received. + */ + public object Interactive : WorkflowSchedulerMode() { + override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic { + override fun requestCycle() { + scheduler.scope.launch { scheduler.schedule() } + } + } + + override fun toString(): String = "Interactive" + } + + /** + * A batch scheduler triggers a scheduling cycle every time quantum if needed. + */ + public data class Batch(val quantum: Long) : WorkflowSchedulerMode() { + private var next: kotlinx.coroutines.Job? = null + + override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic { + override fun requestCycle() { + if (next == null) { + // In batch mode, we assume that the scheduler runs at a fixed slot every time + // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. + val delay = quantum - (scheduler.clock.millis() % quantum) + + val job = scheduler.scope.launch { + delay(delay) + next = null + scheduler.schedule() + } + next = job + } + } + } + + override fun toString(): String = "Batch($quantum)" + } + + /** + * A scheduling cycle is triggered at a random point in time. + */ + public data class Random(private val random: java.util.Random = java.util.Random(123)) : WorkflowSchedulerMode() { + private var next: kotlinx.coroutines.Job? = null + + override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic { + override fun requestCycle() { + if (next == null) { + val delay = random.nextInt(200).toLong() + + val job = scheduler.scope.launch { + delay(delay) + next = null + scheduler.schedule() + } + next = job + } + } + } + + override fun toString(): String = "Random" + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt new file mode 100644 index 00000000..1b5b91b9 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.job + +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl + +/** + * A [JobOrderPolicy] that orders jobs based on its critical path length. + */ +public data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> = + object : + Comparator<JobState>, + WorkflowSchedulerListener { + private val results = HashMap<Job, Long>() + + init { + scheduler.addListener(this) + } + + private val Job.duration: Long + get() = results[this]!! + + override fun jobSubmitted(job: JobState) { + results[job.job] = job.job.toposort().map { task -> + val estimable = task.metadata[WORKFLOW_TASK_DEADLINE] as? Long? + estimable ?: Long.MAX_VALUE + }.sum() + } + + override fun jobFinished(job: JobState) { + results.remove(job.job) + } + + override fun compare(o1: JobState, o2: JobState): Int { + return compareValuesBy(o1, o2) { it.job.duration }.let { if (ascending) it else -it } + } + } + + override fun toString(): String { + return "Job-Duration(${if (ascending) "asc" else "desc"})" + } +} + +/** + * Create a topological sorting of the tasks in a job. + * + * @return The list of tasks within the job topologically sorted. + */ +public fun Job.toposort(): List<Task> { + val res = mutableListOf<Task>() + val visited = mutableSetOf<Task>() + val adjacent = mutableMapOf<Task, MutableList<Task>>() + + for (task in tasks) { + for (dependency in task.dependencies) { + adjacent.getOrPut(dependency) { mutableListOf() }.add(task) + } + } + + fun visit(task: Task) { + visited.add(task) + + adjacent[task] ?: emptyList<Task>() + .asSequence() + .filter { it !in visited } + .forEach { visit(it) } + + res.add(task) + } + + tasks + .asSequence() + .filter { it !in visited } + .forEach { visit(it) } + return res +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt new file mode 100644 index 00000000..ed3acff7 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.job + +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.scheduler.StagePolicy + +/** + * A policy interface for admitting [JobState]s to a scheduling cycle. + */ +public interface JobAdmissionPolicy : StagePolicy<JobAdmissionPolicy.Logic> { + public interface Logic { + /** + * Determine whether the specified [JobState] should be admitted to the scheduling cycle. + * + * @param job The workflow that has been submitted. + * @return The advice for admitting the job. + */ + public operator fun invoke(job: JobState): Advice + } + + /** + * The advice given to the scheduler by an admission policy. + * + * @property admit A flag to indicate to the scheduler that the job should be admitted. + * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait + * for the next scheduling cycle. + */ + public enum class Advice(public val admit: Boolean, public val stop: Boolean) { + /** + * Admit the current job to the scheduling queue and continue admitting jobs. + */ + ADMIT(true, false), + + /** + * Admit the current job to the scheduling queue and stop admitting jobs. + */ + ADMIT_LAST(true, true), + + /** + * Deny the current job, but continue admitting jobs. + */ + DENY(false, false), + + /** + * Deny the current job and also stop admitting jobs. + */ + STOP(false, true) + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobOrderPolicy.kt new file mode 100644 index 00000000..adaa6671 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobOrderPolicy.kt @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.job + +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.scheduler.StagePolicy + +/** + * A policy interface for ordering admitted workflows in the scheduling queue. + */ +public interface JobOrderPolicy : StagePolicy<Comparator<JobState>> diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt new file mode 100644 index 00000000..6a0bfeb9 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.job + +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowServiceImpl + +/** + * A [JobAdmissionPolicy] that limits the amount of active jobs in the system. + * + * @property limit The maximum number of concurrent jobs in the system. + */ +public data class LimitJobAdmissionPolicy(public val limit: Int) : JobAdmissionPolicy { + override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic { + override fun invoke( + job: JobState + ): JobAdmissionPolicy.Advice = + if (scheduler.activeJobs.size < limit) + JobAdmissionPolicy.Advice.ADMIT + else + JobAdmissionPolicy.Advice.STOP + } + + override fun toString(): String = "Limit-Active($limit)" +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt new file mode 100644 index 00000000..31f8f8db --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.job + +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowServiceImpl + +/** + * A [JobAdmissionPolicy] that admits all jobs. + */ +public object NullJobAdmissionPolicy : JobAdmissionPolicy { + override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic { + override fun invoke(job: JobState): JobAdmissionPolicy.Advice = JobAdmissionPolicy.Advice.ADMIT + } + + override fun toString(): String = "Always" +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt new file mode 100644 index 00000000..1b359125 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.job + +import org.opendc.workflow.api.Job +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl +import java.util.* +import kotlin.collections.HashMap + +/** + * A [JobOrderPolicy] that randomly orders jobs. + */ +public object RandomJobOrderPolicy : JobOrderPolicy { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> = + object : + Comparator<JobState>, + WorkflowSchedulerListener { + private val random = Random(123) + private val ids = HashMap<Job, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobSubmitted(job: JobState) { + ids[job.job] = random.nextInt() + } + + override fun jobFinished(job: JobState) { + ids.remove(job.job) + } + + override fun compare(o1: JobState, o2: JobState): Int { + return compareValuesBy(o1, o2) { ids.getValue(it.job) } + } + } + + override fun toString(): String { + return "Random" + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SizeJobOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SizeJobOrderPolicy.kt new file mode 100644 index 00000000..6998606d --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SizeJobOrderPolicy.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.job + +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowServiceImpl + +/** + * A [SizeJobOrderPolicy] that orders jobs based on the number of tasks it has. + */ +public data class SizeJobOrderPolicy(public val ascending: Boolean = true) : JobOrderPolicy { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> = + compareBy { it.tasks.size.let { if (ascending) it else -it } } + + override fun toString(): String { + return "Job-Size(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SubmissionTimeJobOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SubmissionTimeJobOrderPolicy.kt new file mode 100644 index 00000000..53d06023 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SubmissionTimeJobOrderPolicy.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.job + +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowServiceImpl + +/** + * A [JobOrderPolicy] orders jobs in FIFO order. + */ +public data class SubmissionTimeJobOrderPolicy(public val ascending: Boolean = true) : JobOrderPolicy { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> = + compareBy { it.submittedAt.let { if (ascending) it else -it } } + + override fun toString(): String { + return "Submission-Time(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt new file mode 100644 index 00000000..821d4964 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.task + +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl + +/** + * A [TaskOrderPolicy] that orders tasks based on the number of active relative tasks (w.r.t. its job) in the system. + */ +public data class ActiveTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { + private val active = mutableMapOf<JobState, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + active[job] = 0 + } + + override fun jobFinished(job: JobState) { + active.remove(job) + } + + override fun taskAssigned(task: TaskState) { + active.merge(task.job, 1, Int::plus) + } + + override fun taskFinished(task: TaskState) { + active.merge(task.job, -1, Int::plus) + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { active.getValue(it.job) }.let { + if (ascending) it else -it + } + } + } + + override fun toString(): String { + return "Active-Per-Job(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt new file mode 100644 index 00000000..42804f5a --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.task + +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl +import kotlin.math.max + +/** + * A [TaskEligibilityPolicy] that balances the tasks based on their job, e.g. do not allow a single job to claim all + * resources of the system. + * + * @property tolerance The maximum difference from the average number of tasks per job in the system as a fraction of + * the average. + */ +public data class BalancingTaskEligibilityPolicy(public val tolerance: Double = 1.5) : TaskEligibilityPolicy { + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = + object : TaskEligibilityPolicy.Logic, WorkflowSchedulerListener { + private val active = mutableMapOf<JobState, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + active[job] = 0 + } + + override fun jobFinished(job: JobState) { + active.remove(job) + } + + override fun taskAssigned(task: TaskState) { + active.merge(task.job, 1, Int::plus) + } + + override fun taskFinished(task: TaskState) { + active.merge(task.job, -1, Int::plus) + } + + override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice { + val activeJobs = scheduler.activeJobs.size + val activeTasks = scheduler.activeTasks.size + val baseline = max(activeTasks / activeJobs.toDouble(), 1.0) + val activeForJob = active[task.job]!! + return if ((activeForJob + 1) / baseline < tolerance) + TaskEligibilityPolicy.Advice.ADMIT + else + TaskEligibilityPolicy.Advice.DENY + } + } + + override fun toString(): String = "Job-Balance($tolerance)" +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt new file mode 100644 index 00000000..dae7ad99 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.task + +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl + +/** + * A [TaskOrderPolicy] that orders tasks based on the number of completed relative tasks. + */ +public data class CompletionTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { + private val finished = mutableMapOf<JobState, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + finished[job] = 0 + } + + override fun jobFinished(job: JobState) { + finished.remove(job) + } + + override fun taskFinished(task: TaskState) { + finished.merge(task.job, 1, Int::plus) + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { finished.getValue(it.job) / it.job.tasks.size.toDouble() }.let { + if (ascending) it else -it + } + } + } + + override fun toString(): String { + return "Job-Completion(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt new file mode 100644 index 00000000..7786f6ec --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.task + +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl + +/** + * A [TaskOrderPolicy] that orders tasks based on the number of dependency tasks it has. + */ +public data class DependenciesTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy { + it.task.dependencies.size.let { if (ascending) it else -it } + } + + override fun toString(): String { + return "Task-Dependencies(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt new file mode 100644 index 00000000..4fb835d7 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.task + +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl + +/** + * A [TaskOrderPolicy] that orders tasks based on the number of dependent tasks it has. + */ +public data class DependentsTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy { + it.dependents.size.let { if (ascending) it else -it } + } + + override fun toString(): String { + return "Task-Dependents(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt new file mode 100644 index 00000000..3a634de7 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.task + +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl + +/** + * A [TaskOrderPolicy] that orders tasks based on the average duration of the preceding tasks in the job. + */ +public data class DurationHistoryTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { + private val results = HashMap<JobState, MutableList<Long>>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + results[job] = mutableListOf() + } + + override fun jobFinished(job: JobState) { + results.remove(job) + } + + override fun taskFinished(task: TaskState) { + results.getValue(task.job) += task.finishedAt - task.startedAt + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { key -> + val history = results.getValue(key.job) + if (history.isEmpty()) { + Long.MAX_VALUE + } else { + history.average() + } + }.let { if (ascending) it else -it } + } + } + + override fun toString(): String { + return "Task-Duration-History(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt new file mode 100644 index 00000000..d9fde53a --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.task + +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl +import java.util.* +import kotlin.collections.HashMap +import kotlin.collections.getValue +import kotlin.collections.set + +/** + * A [TaskOrderPolicy] orders tasks based on the pre-specified (approximate) duration of the task. + */ +public data class DurationTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { + + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { + private val results = HashMap<UUID, Long>() + + init { + scheduler.addListener(this) + } + + override fun taskReady(task: TaskState) { + val deadline = task.task.metadata[WORKFLOW_TASK_DEADLINE] as? Long? + results[task.task.uid] = deadline ?: Long.MAX_VALUE + } + + override fun taskFinished(task: TaskState) { + results -= task.task.uid + } + + private val TaskState.duration: Long + get() = results.getValue(task.uid) + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { state -> state.duration }.let { + if (ascending) it else -it + } + } + } + + override fun toString(): String { + return "Task-Duration(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt new file mode 100644 index 00000000..229460df --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.task + +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl + +/** + * A [TaskEligibilityPolicy] that limits the number of active tasks of a job in the system. + */ +public data class LimitPerJobTaskEligibilityPolicy(public val limit: Int) : TaskEligibilityPolicy { + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = + object : TaskEligibilityPolicy.Logic, WorkflowSchedulerListener { + private val active = mutableMapOf<JobState, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + active[job] = 0 + } + + override fun jobFinished(job: JobState) { + active.remove(job) + } + + override fun taskAssigned(task: TaskState) { + active.merge(task.job, 1, Int::plus) + } + + override fun taskFinished(task: TaskState) { + active.merge(task.job, -1, Int::plus) + } + + override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice { + val activeForJob = active[task.job]!! + return if (activeForJob <= limit) + TaskEligibilityPolicy.Advice.ADMIT + else + TaskEligibilityPolicy.Advice.DENY + } + } + + override fun toString(): String = "Limit-Active-Job($limit)" +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt new file mode 100644 index 00000000..57aa0d58 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.task + +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl + +/** + * A [TaskEligibilityPolicy] that limits the total number of active tasks in the system. + */ +public data class LimitTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy { + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic { + override fun invoke( + task: TaskState + ): TaskEligibilityPolicy.Advice = + if (scheduler.activeTasks.size < limit) + TaskEligibilityPolicy.Advice.ADMIT + else + TaskEligibilityPolicy.Advice.STOP + } + + override fun toString(): String = "Limit-Active($limit)" +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt new file mode 100644 index 00000000..cfe2aeed --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.task + +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl + +/** + * A [TaskEligibilityPolicy] that always allows new tasks to enter. + */ +public object NullTaskEligibilityPolicy : TaskEligibilityPolicy { + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = Logic + + private object Logic : TaskEligibilityPolicy.Logic { + override fun invoke( + task: TaskState + ): TaskEligibilityPolicy.Advice = TaskEligibilityPolicy.Advice.ADMIT + } + + override fun toString(): String = "Always" +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt new file mode 100644 index 00000000..a01439c2 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.task + +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl +import java.util.* + +/** + * A [TaskEligibilityPolicy] that randomly accepts tasks in the system with some [probability]. + */ +public data class RandomTaskEligibilityPolicy(val probability: Double = 0.5) : TaskEligibilityPolicy { + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic { + val random = Random(123) + + override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice = + if (random.nextDouble() <= probability || scheduler.activeTasks.isEmpty()) + TaskEligibilityPolicy.Advice.ADMIT + else { + TaskEligibilityPolicy.Advice.DENY + } + } + + override fun toString(): String = "Random($probability)" +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt new file mode 100644 index 00000000..c12d6a66 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.task + +import org.opendc.workflow.api.Task +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl +import kotlin.random.Random + +/** + * A [TaskOrderPolicy] that orders the tasks randomly. + */ +public object RandomTaskOrderPolicy : TaskOrderPolicy { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { + private val random = Random(123) + private val ids = HashMap<Task, Int>() + + init { + scheduler.addListener(this) + } + + override fun taskReady(task: TaskState) { + ids[task.task] = random.nextInt() + } + + override fun taskFinished(task: TaskState) { + ids.remove(task.task) + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { ids.getValue(it.task) } + } + } + + override fun toString(): String { + return "Random" + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt new file mode 100644 index 00000000..e9bbf815 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.task + +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl + +/** + * A [TaskOrderPolicy] that orders tasks based on the order of arrival in the queue. + */ +public data class SubmissionTimeTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy { + it.job.submittedAt.let { if (ascending) it else -it } + } + + override fun toString(): String { + return "Submission-Time(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt new file mode 100644 index 00000000..ee31aee2 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.task + +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.scheduler.StagePolicy + +/** + * A policy interface for determining the eligibility of tasks in a scheduling cycle. + */ +public interface TaskEligibilityPolicy : StagePolicy<TaskEligibilityPolicy.Logic> { + public interface Logic { + /** + * Determine whether the specified [TaskState] is eligible to be scheduled. + * + * @param task The task instance to schedule. + * @return The advice for marking the task. + */ + public operator fun invoke(task: TaskState): Advice + } + + /** + * The advice given to the scheduler by an admission policy. + * + * @property admit A flag to indicate to the scheduler that the job should be admitted. + * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait + * for the next scheduling cycle. + */ + public enum class Advice(public val admit: Boolean, public val stop: Boolean) { + /** + * Admit the current job to the scheduling queue and continue admitting jobs. + */ + ADMIT(true, false), + + /** + * Admit the current job to the scheduling queue and stop admitting jobs. + */ + ADMIT_LAST(true, true), + + /** + * Deny the current job, but continue admitting jobs. + */ + DENY(false, false), + + /** + * Deny the current job and also stop admitting jobs. + */ + STOP(false, true) + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskOrderPolicy.kt new file mode 100644 index 00000000..fffcb765 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskOrderPolicy.kt @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.task + +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.scheduler.StagePolicy + +/** + * This interface represents the **T2** stage of the Reference Architecture for Topology Schedulers and provides the + * scheduler with a sorted list of tasks to schedule. + */ +public interface TaskOrderPolicy : StagePolicy<Comparator<TaskState>> |
