diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-02-20 23:28:34 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-02-24 17:22:41 +0100 |
| commit | e5345dc2192f88ba1d5949eca60ae505759038e0 (patch) | |
| tree | 8319c193fe960255a21fbda43a12abc6b0378904 /opendc | |
| parent | 8d05ee524981bacfe4af4fe0097c935309115481 (diff) | |
[ci skip] feat: Incorporate extensions to workflow scheduler
This change incorporate a number of extensions and improvements to the
workflow scheduler. These are a result of the Design Space Exploration
for Datacenter Schedulers work.
Diffstat (limited to 'opendc')
38 files changed, 1606 insertions, 295 deletions
diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt index 3dc8be51..415221ca 100644 --- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt +++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt @@ -31,7 +31,7 @@ import com.atlarge.opendc.format.trace.gwf.GwfTraceReader import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.WorkflowSchedulerMode -import com.atlarge.opendc.workflows.service.stage.job.FifoJobSortingPolicy +import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceDynamicFilterPolicy @@ -92,7 +92,7 @@ fun main(args: Array<String>) { environment.platforms[0].zones[0].services[ProvisioningService.Key], mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, - jobSortingPolicy = FifoJobSortingPolicy(), + jobSortingPolicy = SubmissionTimeJobOrderPolicy(), taskEligibilityPolicy = FunctionalTaskEligibilityPolicy(), taskSortingPolicy = FifoTaskSortingPolicy(), resourceDynamicFilterPolicy = FunctionalResourceDynamicFilterPolicy(), diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt new file mode 100644 index 00000000..b444f91c --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +import com.atlarge.opendc.workflows.monitor.WorkflowMonitor +import com.atlarge.opendc.workflows.workload.Job + +class JobState(val job: Job, val monitor: WorkflowMonitor, val submittedAt: Long) { + /** + * A flag to indicate whether this job is finished. + */ + val isFinished: Boolean + get() = tasks.isEmpty() + + val tasks: MutableSet<TaskState> = mutableSetOf() + + override fun equals(other: Any?): Boolean = other is JobState && other.job == job + + override fun hashCode(): Int = job.hashCode() +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerListener.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerListener.kt new file mode 100644 index 00000000..73c3e752 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerListener.kt @@ -0,0 +1,39 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +interface StageWorkflowSchedulerListener { + fun cycleStarted(scheduler: StageWorkflowService) {} + fun cycleFinished(scheduler: StageWorkflowService) {} + + fun jobSubmitted(job: JobState) {} + fun jobStarted(job: JobState) {} + fun jobFinished(job: JobState) {} + + fun taskReady(task: TaskState) {} + fun taskAssigned(task: TaskState) {} + fun taskStarted(task: TaskState) {} + fun taskFinished(task: TaskState) {} +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index d7b29c32..e19f5446 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -32,15 +32,15 @@ import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy -import com.atlarge.opendc.workflows.service.stage.job.JobSortingPolicy -import com.atlarge.opendc.workflows.service.stage.resource.ResourceDynamicFilterPolicy +import com.atlarge.opendc.workflows.service.stage.job.JobOrderPolicy +import com.atlarge.opendc.workflows.service.stage.resource.ResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPolicy import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy -import com.atlarge.opendc.workflows.service.stage.task.TaskSortingPolicy +import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy import com.atlarge.opendc.workflows.workload.Job -import com.atlarge.opendc.workflows.workload.Task -import kotlinx.coroutines.delay import kotlinx.coroutines.launch +import java.util.PriorityQueue +import java.util.Queue /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for @@ -49,29 +49,50 @@ import kotlinx.coroutines.launch class StageWorkflowService( private val ctx: ProcessContext, private val provisioningService: ProvisioningService, - private val mode: WorkflowSchedulerMode, - private val jobAdmissionPolicy: JobAdmissionPolicy, - private val jobSortingPolicy: JobSortingPolicy, - private val taskEligibilityPolicy: TaskEligibilityPolicy, - private val taskSortingPolicy: TaskSortingPolicy, - private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy, - private val resourceSelectionPolicy: ResourceSelectionPolicy + mode: WorkflowSchedulerMode, + jobAdmissionPolicy: JobAdmissionPolicy, + jobOrderPolicy: JobOrderPolicy, + taskEligibilityPolicy: TaskEligibilityPolicy, + taskOrderPolicy: TaskOrderPolicy, + resourceFilterPolicy: ResourceFilterPolicy, + resourceSelectionPolicy: ResourceSelectionPolicy ) : WorkflowService, ServerMonitor { /** * The incoming jobs ready to be processed by the scheduler. */ - internal val incomingJobs: MutableSet<JobView> = mutableSetOf() + internal val incomingJobs: MutableSet<JobState> = linkedSetOf() + + /** + * The incoming tasks ready to be processed by the scheduler. + */ + internal val incomingTasks: MutableSet<TaskState> = linkedSetOf() + + /** + * The job queue. + */ + internal val jobQueue: Queue<JobState> + + /** + * The task queue. + */ + internal val taskQueue: Queue<TaskState> /** * The active jobs in the system. */ - internal val activeJobs: MutableSet<JobView> = mutableSetOf() + internal val activeJobs: MutableSet<JobState> = mutableSetOf() + + /** + * The active tasks in the system. + */ + internal val activeTasks: MutableSet<TaskState> = mutableSetOf() + /** * The running tasks by [Server]. */ - internal val taskByServer = mutableMapOf<Server, TaskView>() + internal val taskByServer = mutableMapOf<Server, TaskState>() /** * The nodes that are controlled by the service. @@ -83,18 +104,90 @@ class StageWorkflowService( */ internal val available: MutableSet<Node> = mutableSetOf() + + /** + * The maximum number of incoming jobs. + */ + private val throttleLimit: Int = 20000 + + /** + * The load of the system. + */ + internal val load: Double + get() = (available.size / nodes.size.toDouble()) + + /** + * The root listener of this scheduler. + */ + private val rootListener = object : StageWorkflowSchedulerListener { + /** + * The listeners to delegate to. + */ + val listeners = mutableSetOf<StageWorkflowSchedulerListener>() + + override fun cycleStarted(scheduler: StageWorkflowService) { + listeners.forEach { it.cycleStarted(scheduler) } + } + + override fun cycleFinished(scheduler: StageWorkflowService) { + listeners.forEach { it.cycleFinished(scheduler) } + } + + override fun jobSubmitted(job: JobState) { + listeners.forEach { it.jobSubmitted(job) } + } + + override fun jobStarted(job: JobState) { + listeners.forEach { it.jobStarted(job) } + } + + override fun jobFinished(job: JobState) { + listeners.forEach { it.jobFinished(job) } + } + + override fun taskReady(task: TaskState) { + listeners.forEach { it.taskReady(task) } + } + + override fun taskAssigned(task: TaskState) { + listeners.forEach { it.taskAssigned(task) } + } + + override fun taskStarted(task: TaskState) { + listeners.forEach { it.taskStarted(task) } + } + + override fun taskFinished(task: TaskState) { + listeners.forEach { it.taskFinished(task) } + } + } + + private val mode: WorkflowSchedulerMode.Logic + private val jobAdmissionPolicy: JobAdmissionPolicy.Logic + private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic + private val resourceFilterPolicy: ResourceFilterPolicy.Logic + private val resourceSelectionPolicy: Comparator<Node> + init { ctx.launch { nodes = provisioningService.nodes().toList() available.addAll(nodes) } + + this.mode = mode(this) + this.jobAdmissionPolicy = jobAdmissionPolicy(this) + this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid }) + this.taskEligibilityPolicy = taskEligibilityPolicy(this) + this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid }) + this.resourceFilterPolicy = resourceFilterPolicy(this) + this.resourceSelectionPolicy = resourceSelectionPolicy(this) } override suspend fun submit(job: Job, monitor: WorkflowMonitor) { // J1 Incoming Jobs - val jobInstance = JobView(job, monitor) + val jobInstance = JobState(job, monitor, ctx.clock.millis()) val instances = job.tasks.associateWith { - TaskView(jobInstance, it) + TaskState(jobInstance, it) } for ((task, instance) in instances) { @@ -105,82 +198,84 @@ class StageWorkflowService( // If the task has no dependency, it is a root task and can immediately be evaluated if (instance.isRoot) { - instance.state = TaskState.READY + instance.state = TaskStatus.READY } } - jobInstance.tasks = instances.values.toMutableSet() + instances.values.toCollection(jobInstance.tasks) incomingJobs += jobInstance + rootListener.jobSubmitted(jobInstance) + requestCycle() } - private var next: kotlinx.coroutines.Job? = null /** * Indicate to the scheduler that a scheduling cycle is needed. */ - private fun requestCycle() { - when (mode) { - is WorkflowSchedulerMode.Interactive -> { - ctx.launch { - schedule() - } - } - is WorkflowSchedulerMode.Batch -> { - if (next == null) { - val job = ctx.launch { - delay(mode.quantum) - next = null - schedule() - } - next = job - } - } - } - } + private suspend fun requestCycle() = mode.requestCycle() /** * Perform a scheduling cycle immediately. */ - private suspend fun schedule() { + internal suspend fun schedule() { // J2 Create list of eligible jobs - jobAdmissionPolicy.startCycle(this) - val eligibleJobs = incomingJobs.filter { jobAdmissionPolicy.shouldAdmit(this, it) } + val iterator = incomingJobs.iterator() + while (iterator.hasNext()) { + val jobInstance = iterator.next() + val advice = jobAdmissionPolicy(jobInstance) + if (advice.stop) { + break + } else if (!advice.admit) { + continue + } - for (jobInstance in eligibleJobs) { - incomingJobs -= jobInstance + iterator.remove() + jobQueue.add(jobInstance) activeJobs += jobInstance jobInstance.monitor.onJobStart(jobInstance.job, ctx.clock.millis()) + rootListener.jobStarted(jobInstance) } - // J3 Sort jobs on criterion - val sortedJobs = jobSortingPolicy(this, activeJobs) - // J4 Per job - for (jobInstance in sortedJobs) { - // T1 Create list of eligible tasks - taskEligibilityPolicy.startCycle(this) - val eligibleTasks = jobInstance.tasks.filter { taskEligibilityPolicy.isEligible(this, it) } - - // T2 Sort tasks on criterion - val sortedTasks = taskSortingPolicy(this, eligibleTasks) - - // T3 Per task - for (instance in sortedTasks) { - val hosts = resourceDynamicFilterPolicy(this, nodes, instance) - val host = resourceSelectionPolicy.select(this, hosts, instance) - - if (host != null) { - // T4 Submit task to machine - available -= host - instance.state = TaskState.ACTIVE - - val newHost = provisioningService.deploy(host, instance.task.image, this) - instance.host = newHost - taskByServer[newHost.server!!] = instance - } else { - return + while (jobQueue.isNotEmpty()) { + val jobInstance = jobQueue.poll() + + // Edge-case: job has no tasks + if (jobInstance.isFinished) { + finishJob(jobInstance) + } + + // Add job roots to the scheduling queue + for (task in jobInstance.tasks) { + if (task.state != TaskStatus.READY) { + continue } + + incomingTasks += task + rootListener.taskReady(task) + } + } + + // T3 Per task + while (taskQueue.isNotEmpty()) { + val instance = taskQueue.peek() + val host: Node? = available.firstOrNull() + + if (host != null) { + // T4 Submit task to machine + available -= host + instance.state = TaskStatus.ACTIVE + + val newHost = provisioningService.deploy(host, instance.task.image, this) + instance.host = newHost + taskByServer[newHost.server!!] = instance + + activeTasks += instance + taskQueue.poll() + rootListener.taskAssigned(instance) + } else { + break } } } @@ -189,19 +284,33 @@ class StageWorkflowService( when (server.state) { ServerState.ACTIVE -> { val task = taskByServer.getValue(server) + task.startedAt = ctx.clock.millis() task.job.monitor.onTaskStart(task.job.job, task.task, ctx.clock.millis()) + rootListener.taskStarted(task) } ServerState.SHUTOFF, ServerState.ERROR -> { val task = taskByServer.remove(server) ?: throw IllegalStateException() val job = task.job - task.state = TaskState.FINISHED + task.state = TaskStatus.FINISHED + task.finishedAt = ctx.clock.millis() job.tasks.remove(task) available += task.host!! + activeTasks -= task job.monitor.onTaskFinish(job.job, task.task, 0, ctx.clock.millis()) + rootListener.taskFinished(task) + + // Add job roots to the scheduling queue + for (dependent in task.dependents) { + if (dependent.state != TaskStatus.READY) { + continue + } + + incomingTasks += dependent + rootListener.taskReady(dependent) + } if (job.isFinished) { - activeJobs -= job - job.monitor.onJobFinish(job.job, ctx.clock.millis()) + finishJob(job) } requestCycle() @@ -210,56 +319,18 @@ class StageWorkflowService( } } - class JobView(val job: Job, val monitor: WorkflowMonitor) { - /** - * A flag to indicate whether this job is finished. - */ - val isFinished: Boolean - get() = tasks.isEmpty() - - lateinit var tasks: MutableSet<TaskView> + private suspend fun finishJob(job: JobState) { + activeJobs -= job + job.monitor.onJobFinish(job.job, ctx.clock.millis()) + rootListener.jobFinished(job) } - class TaskView(val job: JobView, val task: Task) { - /** - * The dependencies of this task. - */ - val dependencies = HashSet<TaskView>() - - /** - * The dependents of this task. - */ - val dependents = HashSet<TaskView>() - - /** - * A flag to indicate whether this workflow task instance is a workflow root. - */ - val isRoot: Boolean - get() = dependencies.isEmpty() - - var state: TaskState = TaskState.CREATED - set(value) { - field = value - - // Mark the process as terminated in the graph - if (value == TaskState.FINISHED) { - markTerminated() - } - } - - var host: Node? = null - /** - * Mark the specified [TaskView] as terminated. - */ - private fun markTerminated() { - for (dependent in dependents) { - dependent.dependencies.remove(this) + fun addListener(listener: StageWorkflowSchedulerListener) { + rootListener.listeners += listener + } - if (dependent.isRoot) { - dependent.state = TaskState.READY - } - } - } + fun removeListener(listener: StageWorkflowSchedulerListener) { + rootListener.listeners -= listener } } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt index ee0024f5..a69b5235 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2019 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,12 +24,63 @@ package com.atlarge.opendc.workflows.service -/** - * The state of a workflow task. - */ -public enum class TaskState { - CREATED, - READY, - ACTIVE, - FINISHED +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService.TaskView +import com.atlarge.opendc.workflows.workload.Task + +class TaskState(val job: JobState, val task: Task) { + /** + * The moment in time the task was started. + */ + var startedAt: Long = Long.MIN_VALUE + + /** + * The moment in time the task was finished. + */ + var finishedAt: Long = Long.MIN_VALUE + + /** + * The dependencies of this task. + */ + val dependencies = HashSet<TaskState>() + + /** + * The dependents of this task. + */ + val dependents = HashSet<TaskState>() + + /** + * A flag to indicate whether this workflow task instance is a workflow root. + */ + val isRoot: Boolean + get() = dependencies.isEmpty() + + var state: TaskStatus = TaskStatus.CREATED + set(value) { + field = value + + // Mark the process as terminated in the graph + if (value == TaskStatus.FINISHED) { + markTerminated() + } + } + + var host: Node? = null + + /** + * Mark the specified [TaskView] as terminated. + */ + private fun markTerminated() { + for (dependent in dependents) { + dependent.dependencies.remove(this) + + if (dependent.isRoot) { + dependent.state = TaskStatus.READY + } + } + } + + override fun equals(other: Any?): Boolean = other is TaskState && other.job == job && other.task == task + + override fun hashCode(): Int = task.hashCode() } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskStatus.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskStatus.kt new file mode 100644 index 00000000..c53c6171 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskStatus.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +/** + * The state of a workflow task. + */ +public enum class TaskStatus { + CREATED, + READY, + ACTIVE, + FINISHED +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt index f5060c5c..f5a0f49b 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt @@ -24,17 +24,90 @@ package com.atlarge.opendc.workflows.service +import com.atlarge.odcsim.processContext +import com.atlarge.opendc.workflows.service.stage.StagePolicy +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.yield + /** * The operating mode of a workflow scheduler. */ -sealed class WorkflowSchedulerMode { +sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { + /** + * The logic for operating the cycles of a workflow scheduler. + */ + interface Logic { + /** + * Request a new scheduling cycle to be performed. + */ + suspend fun requestCycle() + } + /** * An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received. */ - object Interactive : WorkflowSchedulerMode() + object Interactive : WorkflowSchedulerMode() { + override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override suspend fun requestCycle() { + yield() + scheduler.schedule() + } + } + + + override fun toString(): String = "Interactive" + } /** * A batch scheduler triggers a scheduling cycle every time quantum if needed. */ - data class Batch(val quantum: Long) : WorkflowSchedulerMode() + data class Batch(val quantum: Long) : WorkflowSchedulerMode() { + private var next: kotlinx.coroutines.Job? = null + + override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override suspend fun requestCycle() { + val ctx = processContext + if (next == null) { + // In batch mode, we assume that the scheduler runs at a fixed slot every time + // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. + val delay = quantum - (ctx.clock.millis() % quantum) + + val job = ctx.launch { + delay(delay) + next = null + scheduler.schedule() + } + next = job + } + } + } + + override fun toString(): String = "Batch($quantum)" + } + + /** + * A scheduling cycle is triggered at a random point in time. + */ + data class Random(private val random: java.util.Random = java.util.Random(123)) : WorkflowSchedulerMode() { + private var next: kotlinx.coroutines.Job? = null + + override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override suspend fun requestCycle() { + val ctx = processContext + if (next == null) { + val delay = random.nextInt(200).toLong() + + val job = ctx.launch { + delay(delay) + next = null + scheduler.schedule() + } + next = job + } + } + } + + override fun toString(): String = "Random" + } } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/StagePolicy.kt index 976fbbf3..c7cc3d84 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/StagePolicy.kt @@ -22,16 +22,17 @@ * SOFTWARE. */ -package com.atlarge.opendc.workflows.service.stage.job +package com.atlarge.opendc.workflows.service.stage import com.atlarge.opendc.workflows.service.StageWorkflowService +import java.io.Serializable /** - * The [FifoJobSortingPolicy] sorts tasks based on the order of arrival in the queue. + * A scheduling stage policy. */ -class FifoJobSortingPolicy : JobSortingPolicy { - override fun invoke( - scheduler: StageWorkflowService, - jobs: Collection<StageWorkflowService.JobView> - ): List<StageWorkflowService.JobView> = jobs.toList() +interface StagePolicy<T : Any> : Serializable { + /** + * Build the logic of the stage policy. + */ + operator fun invoke(scheduler: StageWorkflowService): T } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt new file mode 100644 index 00000000..0b3f567a --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt @@ -0,0 +1,102 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.workload.Job +import com.atlarge.opendc.workflows.workload.Task + +/** + * The [DurationJobOrderPolicy] sorts tasks based on the critical path length of the job. + */ +data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> = + object : Comparator<JobState>, StageWorkflowSchedulerListener { + private val results = HashMap<Job, Double>() + + init { + scheduler.addListener(this) + } + + private val Job.duration: Long + get() = results[this]!! + + override fun jobSubmitted(job: JobState) { + results[job.job] = job.job.toposort().sumByDouble { task -> + val estimable = task.application as? Estimable + estimable?.estimate(resources) ?: Duration.POSITIVE_INFINITY + } + } + + override fun jobFinished(job: JobState) { + results.remove(job.job) + } + + override fun compare(o1: JobState, o2: JobState): Int { + return compareValuesBy(o1, o2) { it.job.duration }.let { if (ascending) it else -it } + } + } + + + override fun toString(): String { + return "Job-Duration(${if (ascending) "asc" else "desc"})" + } +} + +/** + * Create a topological sorting of the tasks in a job. + * + * @return The list of tasks within the job topologically sorted. + */ +fun Job.toposort(): List<Task> { + val res = mutableListOf<Task>() + val visited = mutableSetOf<Task>() + val adjacent = mutableMapOf<Task, MutableList<Task>>() + + for (task in tasks) { + for (dependency in task.dependencies) { + adjacent.getOrPut(dependency) { mutableListOf() }.add(task) + } + } + + fun visit(task: Task) { + visited.add(task) + + adjacent[task] ?: emptyList<Task>() + .asSequence() + .filter { it !in visited } + .forEach { visit(it) } + + res.add(task) + } + + tasks + .asSequence() + .filter { it !in visited } + .forEach { visit(it) } + return res +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt index cdaad512..a3645523 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt @@ -24,25 +24,49 @@ package com.atlarge.opendc.workflows.service.stage.job -import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.stage.StagePolicy /** - * A policy interface for admitting [StageWorkflowService.JobView]s to a scheduling cycle. + * A policy interface for admitting [JobState]s to a scheduling cycle. */ -interface JobAdmissionPolicy { - /** - * A method that is invoked at the start of each scheduling cycle. - * - * @param scheduler The scheduler that started the cycle. - */ - fun startCycle(scheduler: StageWorkflowService) {} +interface JobAdmissionPolicy : StagePolicy<JobAdmissionPolicy.Logic> { + interface Logic { + /** + * Determine whether the specified [JobState] should be admitted to the scheduling cycle. + * + * @param job The workflow that has been submitted. + * @return The advice for admitting the job. + */ + operator fun invoke(job: JobState): Advice + } /** - * Determine whether the specified [StageWorkflowService.JobView] should be admitted to the scheduling cycle. + * The advice given to the scheduler by an admission policy. * - * @param scheduler The scheduler that should admit or reject the job. - * @param job The workflow that has been submitted. - * @return `true` if the workflow may be admitted to the scheduling cycle, `false` otherwise. + * @property admit A flag to indicate to the scheduler that the job should be admitted. + * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait + * for the next scheduling cycle. */ - fun shouldAdmit(scheduler: StageWorkflowService, job: StageWorkflowService.JobView): Boolean + enum class Advice(val admit: Boolean, val stop: Boolean) { + /** + * Admit the current job to the scheduling queue and continue admitting jobs. + */ + ADMIT(true, false), + + /** + * Admit the current job to the scheduling queue and stop admitting jobs. + */ + ADMIT_LAST(true, true), + + /** + * Deny the current job, but continue admitting jobs. + */ + DENY(false, false), + + /** + * Deny the current job and also stop admitting jobs. + */ + STOP(false, true) + } } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt index c3a5dab5..488148af 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt @@ -24,21 +24,11 @@ package com.atlarge.opendc.workflows.service.stage.job -import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.stage.StagePolicy /** * A policy interface for ordering admitted workflows in the scheduling queue. */ -interface JobSortingPolicy { - /** - * Sort the given collection of jobs on a given criterion. - * - * @param scheduler The scheduler that started the cycle. - * @param jobs The collection of tasks that should be sorted. - * @return The sorted list of jobs. - */ - operator fun invoke( - scheduler: StageWorkflowService, - jobs: Collection<StageWorkflowService.JobView> - ): List<StageWorkflowService.JobView> -} +interface JobOrderPolicy : StagePolicy<Comparator<JobState>> + diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt new file mode 100644 index 00000000..6b1faf20 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowService + +/** + * A [JobAdmissionPolicy] that limits the amount of active jobs in the system. + * + * @property limit The maximum number of concurrent jobs in the system. + */ +data class LimitJobAdmissionPolicy(val limit: Int) : JobAdmissionPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : JobAdmissionPolicy.Logic { + override fun invoke( + job: JobState + ): JobAdmissionPolicy.Advice = + if (scheduler.activeJobs.size < limit) + JobAdmissionPolicy.Advice.ADMIT + else + JobAdmissionPolicy.Advice.STOP + } + + override fun toString(): String = "Limit-Active($limit)" +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt new file mode 100644 index 00000000..a58d965f --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowService + +/** + * A [JobAdmissionPolicy] that limits the amount of jobs based on the system load. + * + * @property limit The maximum load before stopping admission. + */ +data class LoadJobAdmissionPolicy(val limit: Double) : JobAdmissionPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : JobAdmissionPolicy.Logic { + override fun invoke( + job: JobState + ): JobAdmissionPolicy.Advice = + if (scheduler.load < limit) + JobAdmissionPolicy.Advice.ADMIT + else + JobAdmissionPolicy.Advice.STOP + } + + override fun toString(): String = "Limit-Load($limit)" +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt index ad90839c..46888467 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt @@ -24,17 +24,16 @@ package com.atlarge.opendc.workflows.service.stage.job +import com.atlarge.opendc.workflows.service.JobState import com.atlarge.opendc.workflows.service.StageWorkflowService /** * A [JobAdmissionPolicy] that admits all jobs. */ object NullJobAdmissionPolicy : JobAdmissionPolicy { - /** - * Admit every submitted job. - */ - override fun shouldAdmit( - scheduler: StageWorkflowService, - job: StageWorkflowService.JobView - ): Boolean = true + override fun invoke(scheduler: StageWorkflowService) = object : JobAdmissionPolicy.Logic { + override fun invoke(job: JobState): JobAdmissionPolicy.Advice = JobAdmissionPolicy.Advice.ADMIT + } + + override fun toString(): String = "Always" } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt new file mode 100644 index 00000000..c1f23376 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt @@ -0,0 +1,59 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.workload.Job +import java.util.Random + +object RandomJobOrderPolicy : JobOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> = + object : Comparator<JobState>, StageWorkflowSchedulerListener { + private val random = Random(123) + private val ids = HashMap<Job, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobSubmitted(job: JobState) { + ids[job.job] = random.nextInt() + } + + override fun jobFinished(job: JobState) { + ids.remove(job.job) + } + + override fun compare(o1: JobState, o2: JobState): Int { + return compareValuesBy(o1, o2) { ids.getValue(it.job) } + } + } + + override fun toString(): String { + return "Random" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt new file mode 100644 index 00000000..d6c3155b --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowService + +/** + * The [SizeJobOrderPolicy] sorts tasks based on the order of arrival in the queue. + */ +data class SizeJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = + compareBy<JobState> { it.tasks.size.let { if (ascending) it else -it } } + + override fun toString(): String { + return "Job-Size(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt index 9ce2811c..bf5bff1d 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt @@ -24,17 +24,17 @@ package com.atlarge.opendc.workflows.service.stage.job +import com.atlarge.opendc.workflows.service.JobState import com.atlarge.opendc.workflows.service.StageWorkflowService -import kotlin.random.Random /** - * The [RandomJobSortingPolicy] sorts tasks randomly. - * - * @property random The [Random] instance to use when sorting the list of tasks. + * The [SubmissionTimeJobOrderPolicy] sorts tasks based on the order of arrival in the queue. */ -class RandomJobSortingPolicy(private val random: Random = Random.Default) : JobSortingPolicy { - override fun invoke( - scheduler: StageWorkflowService, - jobs: Collection<StageWorkflowService.JobView> - ): List<StageWorkflowService.JobView> = jobs.shuffled(random) +data class SubmissionTimeJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = + compareBy<JobState> { it.submittedAt.let { if (ascending) it else -it } } + + override fun toString(): String { + return "Submission-Time(${if (ascending) "asc" else "desc"})" + } } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt index e2490214..a5671d45 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt @@ -30,11 +30,10 @@ import com.atlarge.opendc.workflows.service.StageWorkflowService /** * A [ResourceSelectionPolicy] that selects the first machine that is available. */ -class FirstFitResourceSelectionPolicy : ResourceSelectionPolicy { - override fun select( - scheduler: StageWorkflowService, - machines: List<Node>, - task: StageWorkflowService.TaskView - ): Node? = - machines.firstOrNull() +object FirstFitResourceSelectionPolicy : ResourceSelectionPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : Comparator<Node> { + override fun compare(o1: Node, o2: Node): Int = 1 + } + + override fun toString(): String = "First-Fit" } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt new file mode 100644 index 00000000..5b0afccf --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt @@ -0,0 +1,44 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.resource + +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +/** + * A [ResourceFilterPolicy] based on the amount of cores available on the machine and the cores required for + * the task. + */ +object FunctionalResourceFilterPolicy : ResourceFilterPolicy { + override fun invoke(scheduler: StageWorkflowService): ResourceFilterPolicy.Logic = + object : ResourceFilterPolicy.Logic { + override fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node> = + hosts.filter { it in scheduler.available } + } + + + override fun toString(): String = "functional" +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt index a8f2fda9..5f070e27 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2020 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -26,18 +26,19 @@ package com.atlarge.opendc.workflows.service.stage.resource import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.workflows.service.StageWorkflowService +import java.util.Random -/** - * A [ResourceDynamicFilterPolicy] based on the amount of cores available on the machine and the cores required for - * the task. - */ -class FunctionalResourceDynamicFilterPolicy : ResourceDynamicFilterPolicy { - override fun invoke( - scheduler: StageWorkflowService, - machines: List<Node>, - task: StageWorkflowService.TaskView - ): List<Node> { - return machines - .filter { it in scheduler.available } +object RandomResourceSelectionPolicy : ResourceSelectionPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : Comparator<Node> { + private val ids: Map<Node, Long> + + init { + val random = Random(123) + ids = scheduler.nodes.associateWith { random.nextLong() } + } + + override fun compare(o1: Node, o2: Node): Int = compareValuesBy(o1, o2) { ids[it] } } + + override fun toString(): String = "Random" } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt index 8d8ceec2..28ef970f 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2020 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -25,25 +25,23 @@ package com.atlarge.opendc.workflows.service.stage.resource import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState +import com.atlarge.opendc.workflows.service.stage.StagePolicy /** - * This interface represents the **R4** stage of the Reference Architecture for Schedulers and acts as a filter yielding - * a list of resources with sufficient resource-capacities, based on fixed or dynamic requirements, and on predicted or - * monitored information about processing unit availability, memory occupancy, etc. + * This interface represents stages **R2**, **R3** and **R4** stage of the Reference Architecture for Schedulers and + * acts as a filter yielding a list of resources with sufficient resource-capacities, based on fixed or dynamic + * requirements, and on predicted or monitored information about processing unit availability, memory occupancy, etc. */ -interface ResourceDynamicFilterPolicy { - /** - * Filter the list of machines based on dynamic information. - * - * @param scheduler The scheduler to filter the machines. - * @param machines The list of machines in the system. - * @param task The task that is to be scheduled. - * @return The machines on which the task can be scheduled. - */ - operator fun invoke( - scheduler: StageWorkflowService, - machines: List<Node>, - task: StageWorkflowService.TaskView - ): List<Node> +interface ResourceFilterPolicy : StagePolicy<ResourceFilterPolicy.Logic> { + interface Logic { + /** + * Filter the list of machines based on dynamic information. + * + * @param hosts The hosts to filter. + * @param task The task that is to be scheduled. + * @return The machines on which the task can be scheduled. + */ + operator fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node> + } } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt index 38fe5886..43053097 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt @@ -25,24 +25,10 @@ package com.atlarge.opendc.workflows.service.stage.resource import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.stage.StagePolicy /** * This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected * task with a (set of) resource(s), using policies such as First-Fit, Worst-Fit, and Best-Fit. */ -interface ResourceSelectionPolicy { - /** - * Select a machine on which the task should be scheduled. - * - * @param scheduler The scheduler to select the machine. - * @param machines The list of machines in the system. - * @param task The task that is to be scheduled. - * @return The selected machine or `null` if no machine could be found. - */ - fun select( - scheduler: StageWorkflowService, - machines: List<Node>, - task: StageWorkflowService.TaskView - ): Node? -} +interface ResourceSelectionPolicy : StagePolicy<Comparator<Node>> diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt new file mode 100644 index 00000000..15f0d0dc --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +data class ActiveTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = + object : Comparator<TaskState>, StageWorkflowSchedulerListener { + private val active = mutableMapOf<JobState, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + active[job] = 0 + } + + override fun jobFinished(job: JobState) { + active.remove(job) + } + + override fun taskAssigned(task: TaskState) { + active.merge(task.job, 1, Int::plus) + } + + override fun taskFinished(task: TaskState) { + active.merge(task.job, -1, Int::plus) + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { active.getValue(it.job) }.let { + if (ascending) it else -it + } + } + } + + override fun toString(): String { + return "Active-Per-Job(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt new file mode 100644 index 00000000..f6311644 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt @@ -0,0 +1,71 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState +import kotlin.math.max + +data class BalancingTaskEligibilityPolicy(val tolerance: Double = 1.5) : TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = + object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener { + private val active = mutableMapOf<JobState, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + active[job] = 0 + } + + override fun jobFinished(job: JobState) { + active.remove(job) + } + + override fun taskAssigned(task: TaskState) { + active.merge(task.job, 1, Int::plus) + } + + override fun taskFinished(task: TaskState) { + active.merge(task.job, -1, Int::plus) + } + + override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice { + val activeJobs = scheduler.activeJobs.size + val activeTasks = scheduler.activeTasks.size + val baseline = max(activeTasks / activeJobs.toDouble(), 1.0) + val activeForJob = active[task.job]!! + return if ((activeForJob + 1) / baseline < tolerance) + TaskEligibilityPolicy.Advice.ADMIT + else + TaskEligibilityPolicy.Advice.DENY + } + } + + override fun toString(): String = "Job-Balance($tolerance)" +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt new file mode 100644 index 00000000..29d6f6b5 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt @@ -0,0 +1,63 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +data class CompletionTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = + object : Comparator<TaskState>, StageWorkflowSchedulerListener { + private val finished = mutableMapOf<JobState, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + finished[job] = 0 + } + + override fun jobFinished(job: JobState) { + finished.remove(job) + } + + override fun taskFinished(task: TaskState) { + finished.merge(task.job, 1, Int::plus) + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { finished.getValue(it.job) / it.job.tasks.size.toDouble() }.let { + if (ascending) it else -it + } + } + } + + override fun toString(): String { + return "Job-Completion(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt new file mode 100644 index 00000000..e49c4858 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt @@ -0,0 +1,41 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +/** + * The [DependenciesTaskOrderPolicy] sorts tasks based on the number of dependency tasks it has. + */ +data class DependenciesTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = compareBy<TaskState> { + it.task.dependencies.size.let { if (ascending) it else -it } + } + + override fun toString(): String { + return "Task-Dependencies(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt index 1b1d5b44..9dcaecb2 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2020 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -25,16 +25,17 @@ package com.atlarge.opendc.workflows.service.stage.task import com.atlarge.opendc.workflows.service.StageWorkflowService -import kotlin.random.Random +import com.atlarge.opendc.workflows.service.TaskState /** - * The [RandomTaskSortingPolicy] sorts tasks randomly. - * - * @property random The [Random] instance to use when sorting the list of tasks. + * The [DependentsTaskOrderPolicy] sorts tasks based on the number of dependent tasks it has. */ -class RandomTaskSortingPolicy(private val random: Random = Random.Default) : TaskSortingPolicy { - override fun invoke( - scheduler: StageWorkflowService, - tasks: Collection<StageWorkflowService.TaskView> - ): List<StageWorkflowService.TaskView> = tasks.shuffled(random) +data class DependentsTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = compareBy<TaskState> { + it.dependents.size.let { if (ascending) it else -it } + } + + override fun toString(): String { + return "Task-Dependents(${if (ascending) "asc" else "desc"})" + } } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt new file mode 100644 index 00000000..ea6bf541 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt @@ -0,0 +1,69 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +data class DurationHistoryTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + + override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = + object : Comparator<TaskState>, StageWorkflowSchedulerListener { + private val results = HashMap<JobState, MutableList<Long>>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + results[job] = mutableListOf() + } + + override fun jobFinished(job: JobState) { + results.remove(job) + } + + override fun taskFinished(task: TaskState) { + results.getValue(task.job) += task.finishedAt - task.startedAt + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { key -> + val history = results.getValue(key.job) + if (history.isEmpty()) { + Long.MAX_VALUE + } else { + history.average() + } + }.let { if (ascending) it else -it } + } + } + + override fun toString(): String { + return "Task-Duration-History(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt new file mode 100644 index 00000000..25214367 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState +import java.util.UUID + +/** + * The [DurationTaskOrderPolicy] sorts tasks based on the duration of the task. + */ +data class DurationTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + + override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = + object : Comparator<TaskState>, StageWorkflowSchedulerListener { + private val results = HashMap<UUID, Double>() + + init { + scheduler.addListener(this) + } + + override fun taskReady(task: TaskState) { + val estimable = task.task.application as? Estimable + results[task.task.uid] = estimable?.estimate(resources) ?: Duration.POSITIVE_INFINITY + } + + override fun taskFinished(task: TaskState) { + results -= task.task.uid + } + + private val TaskState.duration: Double + get() = results.getValue(task.uid) + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { state -> state.duration }.let { + if (ascending) it else -it + } + } + } + + override fun toString(): String { + return "Task-Duration(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt new file mode 100644 index 00000000..330b191e --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +data class LimitPerJobTaskEligibilityPolicy(val limit: Int): TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = + object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener { + private val active = mutableMapOf<JobState, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + active[job] = 0 + } + + override fun jobFinished(job: JobState) { + active.remove(job) + } + + override fun taskAssigned(task: TaskState) { + active.merge(task.job, 1, Int::plus) + } + + override fun taskFinished(task: TaskState) { + active.merge(task.job, -1, Int::plus) + } + + override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice { + val activeForJob = active[task.job]!! + return if (activeForJob <= limit) + TaskEligibilityPolicy.Advice.ADMIT + else + TaskEligibilityPolicy.Advice.DENY + } + } + + override fun toString(): String = "Limit-Active-Job($limit)" +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt index aabc44a9..1e8a8158 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2020 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -25,21 +25,19 @@ package com.atlarge.opendc.workflows.service.stage.task import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState -/** - * This interface represents the **T2** stage of the Reference Architecture for Datacenter Schedulers and provides the - * scheduler with a sorted list of tasks to schedule. - */ -interface TaskSortingPolicy { - /** - * Sort the given list of tasks on a given criterion. - * - * @param scheduler The scheduler that is sorting the tasks. - * @param tasks The collection of tasks that should be sorted. - * @return The sorted list of tasks. - */ - operator fun invoke( - scheduler: StageWorkflowService, - tasks: Collection<StageWorkflowService.TaskView> - ): List<StageWorkflowService.TaskView> +data class LimitTaskEligibilityPolicy(val limit: Int): TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { + override fun invoke( + task: TaskState + ): TaskEligibilityPolicy.Advice = + if (scheduler.activeTasks.size < limit) + TaskEligibilityPolicy.Advice.ADMIT + else + TaskEligibilityPolicy.Advice.STOP + } + + + override fun toString(): String = "Limit-Active($limit)" } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt new file mode 100644 index 00000000..a53ab8a5 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt @@ -0,0 +1,43 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +data class LoadTaskEligibilityPolicy(val limit: Double): TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { + override fun invoke( + task: TaskState + ): TaskEligibilityPolicy.Advice = + if (scheduler.load < limit) + TaskEligibilityPolicy.Advice.ADMIT + else + TaskEligibilityPolicy.Advice.STOP + } + + + override fun toString(): String = "Limit-Load($limit)" +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt index 72ecbee2..3dc86e85 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2020 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -30,9 +30,14 @@ import com.atlarge.opendc.workflows.service.TaskState /** * A [TaskEligibilityPolicy] that marks tasks as eligible if they are tasks roots within the job. */ -class FunctionalTaskEligibilityPolicy : TaskEligibilityPolicy { - override fun isEligible( - scheduler: StageWorkflowService, - task: StageWorkflowService.TaskView - ): Boolean = task.state == TaskState.READY +object NullTaskEligibilityPolicy : TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = Logic + + private object Logic : TaskEligibilityPolicy.Logic { + override fun invoke( + task: TaskState + ): TaskEligibilityPolicy.Advice = TaskEligibilityPolicy.Advice.ADMIT + } + + override fun toString(): String = "Always" } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt new file mode 100644 index 00000000..2aa41a2b --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt @@ -0,0 +1,45 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState +import java.util.Random + +data class RandomTaskEligibilityPolicy(val probability: Double = 0.5): TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { + val random = Random(123) + + override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice = + if (random.nextDouble() <= probability || scheduler.activeTasks.isEmpty()) + TaskEligibilityPolicy.Advice.ADMIT + else { + TaskEligibilityPolicy.Advice.DENY + } + } + + + override fun toString(): String = "Random($probability)" +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt new file mode 100644 index 00000000..60aad9a7 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt @@ -0,0 +1,64 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState +import com.atlarge.opendc.workflows.workload.Task +import kotlin.random.Random + +/** + * The [RandomTaskOrderPolicy] sorts tasks randomly. + * + * @property random The [Random] instance to use when sorting the list of tasks. + */ +object RandomTaskOrderPolicy : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = + object : Comparator<TaskState>, StageWorkflowSchedulerListener { + private val random = Random(123) + private val ids = HashMap<Task, Int>() + + init { + scheduler.addListener(this) + } + + override fun taskReady(task: TaskState) { + ids[task.task] = random.nextInt() + } + + override fun taskFinished(task: TaskState) { + ids.remove(task.task) + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { ids.getValue(it.task) } + } + } + + override fun toString(): String { + return "Random" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt new file mode 100644 index 00000000..998d782b --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt @@ -0,0 +1,41 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +/** + * The [SubmissionTimeTaskOrderPolicy] sorts tasks based on the order of arrival in the queue. + */ +data class SubmissionTimeTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = compareBy<TaskState> { + it.job.submittedAt.let { if (ascending) it else -it } + } + + override fun toString(): String { + return "Submission-Time(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt index 19954d7b..75529005 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt @@ -24,25 +24,49 @@ package com.atlarge.opendc.workflows.service.stage.task -import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState +import com.atlarge.opendc.workflows.service.stage.StagePolicy /** * A policy interface for determining the eligibility of tasks in a scheduling cycle. */ -interface TaskEligibilityPolicy { - /** - * A method that is invoked at the start of each scheduling cycle. - * - * @param scheduler The scheduler that started the cycle. - */ - fun startCycle(scheduler: StageWorkflowService) {} +interface TaskEligibilityPolicy : StagePolicy<TaskEligibilityPolicy.Logic> { + interface Logic { + /** + * Determine whether the specified [TaskState] is eligible to be scheduled. + * + * @param task The task instance to schedule. + * @return The advice for marking the task. + */ + operator fun invoke(task: TaskState): Advice + } /** - * Determine whether the specified [StageWorkflowService.TaskView] is eligible to be scheduled. + * The advice given to the scheduler by an admission policy. * - * @param scheduler The scheduler that is determining whether the task is eligible. - * @param task The task instance to schedule. - * @return `true` if the task eligible to be scheduled, `false` otherwise. + * @property admit A flag to indicate to the scheduler that the job should be admitted. + * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait + * for the next scheduling cycle. */ - fun isEligible(scheduler: StageWorkflowService, task: StageWorkflowService.TaskView): Boolean + enum class Advice(val admit: Boolean, val stop: Boolean) { + /** + * Admit the current job to the scheduling queue and continue admitting jobs. + */ + ADMIT(true, false), + + /** + * Admit the current job to the scheduling queue and stop admitting jobs. + */ + ADMIT_LAST(true, true), + + /** + * Deny the current job, but continue admitting jobs. + */ + DENY(false, false), + + /** + * Deny the current job and also stop admitting jobs. + */ + STOP(false, true) + } } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskOrderPolicy.kt index bba81d27..bfdc7924 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskOrderPolicy.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2020 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,14 +24,11 @@ package com.atlarge.opendc.workflows.service.stage.task -import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState +import com.atlarge.opendc.workflows.service.stage.StagePolicy /** - * The [FifoTaskSortingPolicy] sorts tasks based on the order of arrival in the queue. + * This interface represents the **T2** stage of the Reference Architecture for Datacenter Schedulers and provides the + * scheduler with a sorted list of tasks to schedule. */ -class FifoTaskSortingPolicy : TaskSortingPolicy { - override fun invoke( - scheduler: StageWorkflowService, - tasks: Collection<StageWorkflowService.TaskView> - ): List<StageWorkflowService.TaskView> = tasks.toList() -} +interface TaskOrderPolicy : StagePolicy<Comparator<TaskState>> |
