From e5345dc2192f88ba1d5949eca60ae505759038e0 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 20 Feb 2020 23:28:34 +0100 Subject: [ci skip] feat: Incorporate extensions to workflow scheduler This change incorporate a number of extensions and improvements to the workflow scheduler. These are a result of the Design Space Exploration for Datacenter Schedulers work. --- .../opendc/experiments/sc18/TestExperiment.kt | 4 +- .../atlarge/opendc/workflows/service/JobState.kt | 42 +++ .../service/StageWorkflowSchedulerListener.kt | 39 +++ .../workflows/service/StageWorkflowService.kt | 311 +++++++++++++-------- .../atlarge/opendc/workflows/service/TaskState.kt | 69 ++++- .../atlarge/opendc/workflows/service/TaskStatus.kt | 35 +++ .../workflows/service/WorkflowSchedulerMode.kt | 79 +++++- .../opendc/workflows/service/stage/StagePolicy.kt | 38 +++ .../service/stage/job/DurationJobOrderPolicy.kt | 102 +++++++ .../service/stage/job/FifoJobSortingPolicy.kt | 37 --- .../service/stage/job/JobAdmissionPolicy.kt | 52 +++- .../workflows/service/stage/job/JobOrderPolicy.kt | 34 +++ .../service/stage/job/JobSortingPolicy.kt | 44 --- .../service/stage/job/LimitJobAdmissionPolicy.kt | 47 ++++ .../service/stage/job/LoadJobAdmissionPolicy.kt | 47 ++++ .../service/stage/job/NullJobAdmissionPolicy.kt | 13 +- .../service/stage/job/RandomJobOrderPolicy.kt | 59 ++++ .../service/stage/job/RandomJobSortingPolicy.kt | 40 --- .../service/stage/job/SizeJobOrderPolicy.kt | 40 +++ .../stage/job/SubmissionTimeJobOrderPolicy.kt | 40 +++ .../resource/FirstFitResourceSelectionPolicy.kt | 13 +- .../FunctionalResourceDynamicFilterPolicy.kt | 43 --- .../resource/FunctionalResourceFilterPolicy.kt | 44 +++ .../resource/RandomResourceSelectionPolicy.kt | 44 +++ .../stage/resource/ResourceDynamicFilterPolicy.kt | 49 ---- .../service/stage/resource/ResourceFilterPolicy.kt | 47 ++++ .../stage/resource/ResourceSelectionPolicy.kt | 18 +- .../service/stage/task/ActiveTaskOrderPolicy.kt | 67 +++++ .../stage/task/BalancingTaskEligibilityPolicy.kt | 71 +++++ .../stage/task/CompletionTaskOrderPolicy.kt | 63 +++++ .../stage/task/DependenciesTaskOrderPolicy.kt | 41 +++ .../stage/task/DependentsTaskOrderPolicy.kt | 41 +++ .../stage/task/DurationHistoryTaskOrderPolicy.kt | 69 +++++ .../service/stage/task/DurationTaskOrderPolicy.kt | 67 +++++ .../service/stage/task/FifoTaskSortingPolicy.kt | 37 --- .../stage/task/FunctionalTaskEligibilityPolicy.kt | 38 --- .../stage/task/LimitPerJobTaskEligibilityPolicy.kt | 67 +++++ .../stage/task/LimitTaskEligibilityPolicy.kt | 43 +++ .../stage/task/LoadTaskEligibilityPolicy.kt | 43 +++ .../stage/task/NullTaskEligibilityPolicy.kt | 43 +++ .../stage/task/RandomTaskEligibilityPolicy.kt | 45 +++ .../service/stage/task/RandomTaskOrderPolicy.kt | 64 +++++ .../service/stage/task/RandomTaskSortingPolicy.kt | 40 --- .../stage/task/SubmissionTimeTaskOrderPolicy.kt | 41 +++ .../service/stage/task/TaskEligibilityPolicy.kt | 50 +++- .../service/stage/task/TaskOrderPolicy.kt | 34 +++ .../service/stage/task/TaskSortingPolicy.kt | 45 --- 47 files changed, 1875 insertions(+), 564 deletions(-) create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerListener.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskStatus.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/StagePolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt delete mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt delete mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt delete mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt delete mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt delete mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt delete mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt delete mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt delete mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskOrderPolicy.kt delete mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt index 3dc8be51..415221ca 100644 --- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt +++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt @@ -31,7 +31,7 @@ import com.atlarge.opendc.format.trace.gwf.GwfTraceReader import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.WorkflowSchedulerMode -import com.atlarge.opendc.workflows.service.stage.job.FifoJobSortingPolicy +import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceDynamicFilterPolicy @@ -92,7 +92,7 @@ fun main(args: Array) { environment.platforms[0].zones[0].services[ProvisioningService.Key], mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, - jobSortingPolicy = FifoJobSortingPolicy(), + jobSortingPolicy = SubmissionTimeJobOrderPolicy(), taskEligibilityPolicy = FunctionalTaskEligibilityPolicy(), taskSortingPolicy = FifoTaskSortingPolicy(), resourceDynamicFilterPolicy = FunctionalResourceDynamicFilterPolicy(), diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt new file mode 100644 index 00000000..b444f91c --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +import com.atlarge.opendc.workflows.monitor.WorkflowMonitor +import com.atlarge.opendc.workflows.workload.Job + +class JobState(val job: Job, val monitor: WorkflowMonitor, val submittedAt: Long) { + /** + * A flag to indicate whether this job is finished. + */ + val isFinished: Boolean + get() = tasks.isEmpty() + + val tasks: MutableSet = mutableSetOf() + + override fun equals(other: Any?): Boolean = other is JobState && other.job == job + + override fun hashCode(): Int = job.hashCode() +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerListener.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerListener.kt new file mode 100644 index 00000000..73c3e752 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerListener.kt @@ -0,0 +1,39 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +interface StageWorkflowSchedulerListener { + fun cycleStarted(scheduler: StageWorkflowService) {} + fun cycleFinished(scheduler: StageWorkflowService) {} + + fun jobSubmitted(job: JobState) {} + fun jobStarted(job: JobState) {} + fun jobFinished(job: JobState) {} + + fun taskReady(task: TaskState) {} + fun taskAssigned(task: TaskState) {} + fun taskStarted(task: TaskState) {} + fun taskFinished(task: TaskState) {} +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index d7b29c32..e19f5446 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -32,15 +32,15 @@ import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy -import com.atlarge.opendc.workflows.service.stage.job.JobSortingPolicy -import com.atlarge.opendc.workflows.service.stage.resource.ResourceDynamicFilterPolicy +import com.atlarge.opendc.workflows.service.stage.job.JobOrderPolicy +import com.atlarge.opendc.workflows.service.stage.resource.ResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPolicy import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy -import com.atlarge.opendc.workflows.service.stage.task.TaskSortingPolicy +import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy import com.atlarge.opendc.workflows.workload.Job -import com.atlarge.opendc.workflows.workload.Task -import kotlinx.coroutines.delay import kotlinx.coroutines.launch +import java.util.PriorityQueue +import java.util.Queue /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for @@ -49,29 +49,50 @@ import kotlinx.coroutines.launch class StageWorkflowService( private val ctx: ProcessContext, private val provisioningService: ProvisioningService, - private val mode: WorkflowSchedulerMode, - private val jobAdmissionPolicy: JobAdmissionPolicy, - private val jobSortingPolicy: JobSortingPolicy, - private val taskEligibilityPolicy: TaskEligibilityPolicy, - private val taskSortingPolicy: TaskSortingPolicy, - private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy, - private val resourceSelectionPolicy: ResourceSelectionPolicy + mode: WorkflowSchedulerMode, + jobAdmissionPolicy: JobAdmissionPolicy, + jobOrderPolicy: JobOrderPolicy, + taskEligibilityPolicy: TaskEligibilityPolicy, + taskOrderPolicy: TaskOrderPolicy, + resourceFilterPolicy: ResourceFilterPolicy, + resourceSelectionPolicy: ResourceSelectionPolicy ) : WorkflowService, ServerMonitor { /** * The incoming jobs ready to be processed by the scheduler. */ - internal val incomingJobs: MutableSet = mutableSetOf() + internal val incomingJobs: MutableSet = linkedSetOf() + + /** + * The incoming tasks ready to be processed by the scheduler. + */ + internal val incomingTasks: MutableSet = linkedSetOf() + + /** + * The job queue. + */ + internal val jobQueue: Queue + + /** + * The task queue. + */ + internal val taskQueue: Queue /** * The active jobs in the system. */ - internal val activeJobs: MutableSet = mutableSetOf() + internal val activeJobs: MutableSet = mutableSetOf() + + /** + * The active tasks in the system. + */ + internal val activeTasks: MutableSet = mutableSetOf() + /** * The running tasks by [Server]. */ - internal val taskByServer = mutableMapOf() + internal val taskByServer = mutableMapOf() /** * The nodes that are controlled by the service. @@ -83,18 +104,90 @@ class StageWorkflowService( */ internal val available: MutableSet = mutableSetOf() + + /** + * The maximum number of incoming jobs. + */ + private val throttleLimit: Int = 20000 + + /** + * The load of the system. + */ + internal val load: Double + get() = (available.size / nodes.size.toDouble()) + + /** + * The root listener of this scheduler. + */ + private val rootListener = object : StageWorkflowSchedulerListener { + /** + * The listeners to delegate to. + */ + val listeners = mutableSetOf() + + override fun cycleStarted(scheduler: StageWorkflowService) { + listeners.forEach { it.cycleStarted(scheduler) } + } + + override fun cycleFinished(scheduler: StageWorkflowService) { + listeners.forEach { it.cycleFinished(scheduler) } + } + + override fun jobSubmitted(job: JobState) { + listeners.forEach { it.jobSubmitted(job) } + } + + override fun jobStarted(job: JobState) { + listeners.forEach { it.jobStarted(job) } + } + + override fun jobFinished(job: JobState) { + listeners.forEach { it.jobFinished(job) } + } + + override fun taskReady(task: TaskState) { + listeners.forEach { it.taskReady(task) } + } + + override fun taskAssigned(task: TaskState) { + listeners.forEach { it.taskAssigned(task) } + } + + override fun taskStarted(task: TaskState) { + listeners.forEach { it.taskStarted(task) } + } + + override fun taskFinished(task: TaskState) { + listeners.forEach { it.taskFinished(task) } + } + } + + private val mode: WorkflowSchedulerMode.Logic + private val jobAdmissionPolicy: JobAdmissionPolicy.Logic + private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic + private val resourceFilterPolicy: ResourceFilterPolicy.Logic + private val resourceSelectionPolicy: Comparator + init { ctx.launch { nodes = provisioningService.nodes().toList() available.addAll(nodes) } + + this.mode = mode(this) + this.jobAdmissionPolicy = jobAdmissionPolicy(this) + this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid }) + this.taskEligibilityPolicy = taskEligibilityPolicy(this) + this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid }) + this.resourceFilterPolicy = resourceFilterPolicy(this) + this.resourceSelectionPolicy = resourceSelectionPolicy(this) } override suspend fun submit(job: Job, monitor: WorkflowMonitor) { // J1 Incoming Jobs - val jobInstance = JobView(job, monitor) + val jobInstance = JobState(job, monitor, ctx.clock.millis()) val instances = job.tasks.associateWith { - TaskView(jobInstance, it) + TaskState(jobInstance, it) } for ((task, instance) in instances) { @@ -105,82 +198,84 @@ class StageWorkflowService( // If the task has no dependency, it is a root task and can immediately be evaluated if (instance.isRoot) { - instance.state = TaskState.READY + instance.state = TaskStatus.READY } } - jobInstance.tasks = instances.values.toMutableSet() + instances.values.toCollection(jobInstance.tasks) incomingJobs += jobInstance + rootListener.jobSubmitted(jobInstance) + requestCycle() } - private var next: kotlinx.coroutines.Job? = null /** * Indicate to the scheduler that a scheduling cycle is needed. */ - private fun requestCycle() { - when (mode) { - is WorkflowSchedulerMode.Interactive -> { - ctx.launch { - schedule() - } - } - is WorkflowSchedulerMode.Batch -> { - if (next == null) { - val job = ctx.launch { - delay(mode.quantum) - next = null - schedule() - } - next = job - } - } - } - } + private suspend fun requestCycle() = mode.requestCycle() /** * Perform a scheduling cycle immediately. */ - private suspend fun schedule() { + internal suspend fun schedule() { // J2 Create list of eligible jobs - jobAdmissionPolicy.startCycle(this) - val eligibleJobs = incomingJobs.filter { jobAdmissionPolicy.shouldAdmit(this, it) } + val iterator = incomingJobs.iterator() + while (iterator.hasNext()) { + val jobInstance = iterator.next() + val advice = jobAdmissionPolicy(jobInstance) + if (advice.stop) { + break + } else if (!advice.admit) { + continue + } - for (jobInstance in eligibleJobs) { - incomingJobs -= jobInstance + iterator.remove() + jobQueue.add(jobInstance) activeJobs += jobInstance jobInstance.monitor.onJobStart(jobInstance.job, ctx.clock.millis()) + rootListener.jobStarted(jobInstance) } - // J3 Sort jobs on criterion - val sortedJobs = jobSortingPolicy(this, activeJobs) - // J4 Per job - for (jobInstance in sortedJobs) { - // T1 Create list of eligible tasks - taskEligibilityPolicy.startCycle(this) - val eligibleTasks = jobInstance.tasks.filter { taskEligibilityPolicy.isEligible(this, it) } - - // T2 Sort tasks on criterion - val sortedTasks = taskSortingPolicy(this, eligibleTasks) - - // T3 Per task - for (instance in sortedTasks) { - val hosts = resourceDynamicFilterPolicy(this, nodes, instance) - val host = resourceSelectionPolicy.select(this, hosts, instance) - - if (host != null) { - // T4 Submit task to machine - available -= host - instance.state = TaskState.ACTIVE - - val newHost = provisioningService.deploy(host, instance.task.image, this) - instance.host = newHost - taskByServer[newHost.server!!] = instance - } else { - return + while (jobQueue.isNotEmpty()) { + val jobInstance = jobQueue.poll() + + // Edge-case: job has no tasks + if (jobInstance.isFinished) { + finishJob(jobInstance) + } + + // Add job roots to the scheduling queue + for (task in jobInstance.tasks) { + if (task.state != TaskStatus.READY) { + continue } + + incomingTasks += task + rootListener.taskReady(task) + } + } + + // T3 Per task + while (taskQueue.isNotEmpty()) { + val instance = taskQueue.peek() + val host: Node? = available.firstOrNull() + + if (host != null) { + // T4 Submit task to machine + available -= host + instance.state = TaskStatus.ACTIVE + + val newHost = provisioningService.deploy(host, instance.task.image, this) + instance.host = newHost + taskByServer[newHost.server!!] = instance + + activeTasks += instance + taskQueue.poll() + rootListener.taskAssigned(instance) + } else { + break } } } @@ -189,19 +284,33 @@ class StageWorkflowService( when (server.state) { ServerState.ACTIVE -> { val task = taskByServer.getValue(server) + task.startedAt = ctx.clock.millis() task.job.monitor.onTaskStart(task.job.job, task.task, ctx.clock.millis()) + rootListener.taskStarted(task) } ServerState.SHUTOFF, ServerState.ERROR -> { val task = taskByServer.remove(server) ?: throw IllegalStateException() val job = task.job - task.state = TaskState.FINISHED + task.state = TaskStatus.FINISHED + task.finishedAt = ctx.clock.millis() job.tasks.remove(task) available += task.host!! + activeTasks -= task job.monitor.onTaskFinish(job.job, task.task, 0, ctx.clock.millis()) + rootListener.taskFinished(task) + + // Add job roots to the scheduling queue + for (dependent in task.dependents) { + if (dependent.state != TaskStatus.READY) { + continue + } + + incomingTasks += dependent + rootListener.taskReady(dependent) + } if (job.isFinished) { - activeJobs -= job - job.monitor.onJobFinish(job.job, ctx.clock.millis()) + finishJob(job) } requestCycle() @@ -210,56 +319,18 @@ class StageWorkflowService( } } - class JobView(val job: Job, val monitor: WorkflowMonitor) { - /** - * A flag to indicate whether this job is finished. - */ - val isFinished: Boolean - get() = tasks.isEmpty() - - lateinit var tasks: MutableSet + private suspend fun finishJob(job: JobState) { + activeJobs -= job + job.monitor.onJobFinish(job.job, ctx.clock.millis()) + rootListener.jobFinished(job) } - class TaskView(val job: JobView, val task: Task) { - /** - * The dependencies of this task. - */ - val dependencies = HashSet() - - /** - * The dependents of this task. - */ - val dependents = HashSet() - - /** - * A flag to indicate whether this workflow task instance is a workflow root. - */ - val isRoot: Boolean - get() = dependencies.isEmpty() - - var state: TaskState = TaskState.CREATED - set(value) { - field = value - - // Mark the process as terminated in the graph - if (value == TaskState.FINISHED) { - markTerminated() - } - } - - var host: Node? = null - /** - * Mark the specified [TaskView] as terminated. - */ - private fun markTerminated() { - for (dependent in dependents) { - dependent.dependencies.remove(this) + fun addListener(listener: StageWorkflowSchedulerListener) { + rootListener.listeners += listener + } - if (dependent.isRoot) { - dependent.state = TaskState.READY - } - } - } + fun removeListener(listener: StageWorkflowSchedulerListener) { + rootListener.listeners -= listener } } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt index ee0024f5..a69b5235 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2019 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,12 +24,63 @@ package com.atlarge.opendc.workflows.service -/** - * The state of a workflow task. - */ -public enum class TaskState { - CREATED, - READY, - ACTIVE, - FINISHED +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService.TaskView +import com.atlarge.opendc.workflows.workload.Task + +class TaskState(val job: JobState, val task: Task) { + /** + * The moment in time the task was started. + */ + var startedAt: Long = Long.MIN_VALUE + + /** + * The moment in time the task was finished. + */ + var finishedAt: Long = Long.MIN_VALUE + + /** + * The dependencies of this task. + */ + val dependencies = HashSet() + + /** + * The dependents of this task. + */ + val dependents = HashSet() + + /** + * A flag to indicate whether this workflow task instance is a workflow root. + */ + val isRoot: Boolean + get() = dependencies.isEmpty() + + var state: TaskStatus = TaskStatus.CREATED + set(value) { + field = value + + // Mark the process as terminated in the graph + if (value == TaskStatus.FINISHED) { + markTerminated() + } + } + + var host: Node? = null + + /** + * Mark the specified [TaskView] as terminated. + */ + private fun markTerminated() { + for (dependent in dependents) { + dependent.dependencies.remove(this) + + if (dependent.isRoot) { + dependent.state = TaskStatus.READY + } + } + } + + override fun equals(other: Any?): Boolean = other is TaskState && other.job == job && other.task == task + + override fun hashCode(): Int = task.hashCode() } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskStatus.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskStatus.kt new file mode 100644 index 00000000..c53c6171 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskStatus.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +/** + * The state of a workflow task. + */ +public enum class TaskStatus { + CREATED, + READY, + ACTIVE, + FINISHED +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt index f5060c5c..f5a0f49b 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt @@ -24,17 +24,90 @@ package com.atlarge.opendc.workflows.service +import com.atlarge.odcsim.processContext +import com.atlarge.opendc.workflows.service.stage.StagePolicy +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.yield + /** * The operating mode of a workflow scheduler. */ -sealed class WorkflowSchedulerMode { +sealed class WorkflowSchedulerMode : StagePolicy { + /** + * The logic for operating the cycles of a workflow scheduler. + */ + interface Logic { + /** + * Request a new scheduling cycle to be performed. + */ + suspend fun requestCycle() + } + /** * An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received. */ - object Interactive : WorkflowSchedulerMode() + object Interactive : WorkflowSchedulerMode() { + override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override suspend fun requestCycle() { + yield() + scheduler.schedule() + } + } + + + override fun toString(): String = "Interactive" + } /** * A batch scheduler triggers a scheduling cycle every time quantum if needed. */ - data class Batch(val quantum: Long) : WorkflowSchedulerMode() + data class Batch(val quantum: Long) : WorkflowSchedulerMode() { + private var next: kotlinx.coroutines.Job? = null + + override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override suspend fun requestCycle() { + val ctx = processContext + if (next == null) { + // In batch mode, we assume that the scheduler runs at a fixed slot every time + // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. + val delay = quantum - (ctx.clock.millis() % quantum) + + val job = ctx.launch { + delay(delay) + next = null + scheduler.schedule() + } + next = job + } + } + } + + override fun toString(): String = "Batch($quantum)" + } + + /** + * A scheduling cycle is triggered at a random point in time. + */ + data class Random(private val random: java.util.Random = java.util.Random(123)) : WorkflowSchedulerMode() { + private var next: kotlinx.coroutines.Job? = null + + override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override suspend fun requestCycle() { + val ctx = processContext + if (next == null) { + val delay = random.nextInt(200).toLong() + + val job = ctx.launch { + delay(delay) + next = null + scheduler.schedule() + } + next = job + } + } + } + + override fun toString(): String = "Random" + } } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/StagePolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/StagePolicy.kt new file mode 100644 index 00000000..c7cc3d84 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/StagePolicy.kt @@ -0,0 +1,38 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import java.io.Serializable + +/** + * A scheduling stage policy. + */ +interface StagePolicy : Serializable { + /** + * Build the logic of the stage policy. + */ + operator fun invoke(scheduler: StageWorkflowService): T +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt new file mode 100644 index 00000000..0b3f567a --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt @@ -0,0 +1,102 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.workload.Job +import com.atlarge.opendc.workflows.workload.Task + +/** + * The [DurationJobOrderPolicy] sorts tasks based on the critical path length of the job. + */ +data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator = + object : Comparator, StageWorkflowSchedulerListener { + private val results = HashMap() + + init { + scheduler.addListener(this) + } + + private val Job.duration: Long + get() = results[this]!! + + override fun jobSubmitted(job: JobState) { + results[job.job] = job.job.toposort().sumByDouble { task -> + val estimable = task.application as? Estimable + estimable?.estimate(resources) ?: Duration.POSITIVE_INFINITY + } + } + + override fun jobFinished(job: JobState) { + results.remove(job.job) + } + + override fun compare(o1: JobState, o2: JobState): Int { + return compareValuesBy(o1, o2) { it.job.duration }.let { if (ascending) it else -it } + } + } + + + override fun toString(): String { + return "Job-Duration(${if (ascending) "asc" else "desc"})" + } +} + +/** + * Create a topological sorting of the tasks in a job. + * + * @return The list of tasks within the job topologically sorted. + */ +fun Job.toposort(): List { + val res = mutableListOf() + val visited = mutableSetOf() + val adjacent = mutableMapOf>() + + for (task in tasks) { + for (dependency in task.dependencies) { + adjacent.getOrPut(dependency) { mutableListOf() }.add(task) + } + } + + fun visit(task: Task) { + visited.add(task) + + adjacent[task] ?: emptyList() + .asSequence() + .filter { it !in visited } + .forEach { visit(it) } + + res.add(task) + } + + tasks + .asSequence() + .filter { it !in visited } + .forEach { visit(it) } + return res +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt deleted file mode 100644 index 976fbbf3..00000000 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.workflows.service.stage.job - -import com.atlarge.opendc.workflows.service.StageWorkflowService - -/** - * The [FifoJobSortingPolicy] sorts tasks based on the order of arrival in the queue. - */ -class FifoJobSortingPolicy : JobSortingPolicy { - override fun invoke( - scheduler: StageWorkflowService, - jobs: Collection - ): List = jobs.toList() -} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt index cdaad512..a3645523 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt @@ -24,25 +24,49 @@ package com.atlarge.opendc.workflows.service.stage.job -import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.stage.StagePolicy /** - * A policy interface for admitting [StageWorkflowService.JobView]s to a scheduling cycle. + * A policy interface for admitting [JobState]s to a scheduling cycle. */ -interface JobAdmissionPolicy { - /** - * A method that is invoked at the start of each scheduling cycle. - * - * @param scheduler The scheduler that started the cycle. - */ - fun startCycle(scheduler: StageWorkflowService) {} +interface JobAdmissionPolicy : StagePolicy { + interface Logic { + /** + * Determine whether the specified [JobState] should be admitted to the scheduling cycle. + * + * @param job The workflow that has been submitted. + * @return The advice for admitting the job. + */ + operator fun invoke(job: JobState): Advice + } /** - * Determine whether the specified [StageWorkflowService.JobView] should be admitted to the scheduling cycle. + * The advice given to the scheduler by an admission policy. * - * @param scheduler The scheduler that should admit or reject the job. - * @param job The workflow that has been submitted. - * @return `true` if the workflow may be admitted to the scheduling cycle, `false` otherwise. + * @property admit A flag to indicate to the scheduler that the job should be admitted. + * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait + * for the next scheduling cycle. */ - fun shouldAdmit(scheduler: StageWorkflowService, job: StageWorkflowService.JobView): Boolean + enum class Advice(val admit: Boolean, val stop: Boolean) { + /** + * Admit the current job to the scheduling queue and continue admitting jobs. + */ + ADMIT(true, false), + + /** + * Admit the current job to the scheduling queue and stop admitting jobs. + */ + ADMIT_LAST(true, true), + + /** + * Deny the current job, but continue admitting jobs. + */ + DENY(false, false), + + /** + * Deny the current job and also stop admitting jobs. + */ + STOP(false, true) + } } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt new file mode 100644 index 00000000..488148af --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt @@ -0,0 +1,34 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.stage.StagePolicy + +/** + * A policy interface for ordering admitted workflows in the scheduling queue. + */ +interface JobOrderPolicy : StagePolicy> + diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt deleted file mode 100644 index c3a5dab5..00000000 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.workflows.service.stage.job - -import com.atlarge.opendc.workflows.service.StageWorkflowService - -/** - * A policy interface for ordering admitted workflows in the scheduling queue. - */ -interface JobSortingPolicy { - /** - * Sort the given collection of jobs on a given criterion. - * - * @param scheduler The scheduler that started the cycle. - * @param jobs The collection of tasks that should be sorted. - * @return The sorted list of jobs. - */ - operator fun invoke( - scheduler: StageWorkflowService, - jobs: Collection - ): List -} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt new file mode 100644 index 00000000..6b1faf20 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowService + +/** + * A [JobAdmissionPolicy] that limits the amount of active jobs in the system. + * + * @property limit The maximum number of concurrent jobs in the system. + */ +data class LimitJobAdmissionPolicy(val limit: Int) : JobAdmissionPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : JobAdmissionPolicy.Logic { + override fun invoke( + job: JobState + ): JobAdmissionPolicy.Advice = + if (scheduler.activeJobs.size < limit) + JobAdmissionPolicy.Advice.ADMIT + else + JobAdmissionPolicy.Advice.STOP + } + + override fun toString(): String = "Limit-Active($limit)" +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt new file mode 100644 index 00000000..a58d965f --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowService + +/** + * A [JobAdmissionPolicy] that limits the amount of jobs based on the system load. + * + * @property limit The maximum load before stopping admission. + */ +data class LoadJobAdmissionPolicy(val limit: Double) : JobAdmissionPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : JobAdmissionPolicy.Logic { + override fun invoke( + job: JobState + ): JobAdmissionPolicy.Advice = + if (scheduler.load < limit) + JobAdmissionPolicy.Advice.ADMIT + else + JobAdmissionPolicy.Advice.STOP + } + + override fun toString(): String = "Limit-Load($limit)" +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt index ad90839c..46888467 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt @@ -24,17 +24,16 @@ package com.atlarge.opendc.workflows.service.stage.job +import com.atlarge.opendc.workflows.service.JobState import com.atlarge.opendc.workflows.service.StageWorkflowService /** * A [JobAdmissionPolicy] that admits all jobs. */ object NullJobAdmissionPolicy : JobAdmissionPolicy { - /** - * Admit every submitted job. - */ - override fun shouldAdmit( - scheduler: StageWorkflowService, - job: StageWorkflowService.JobView - ): Boolean = true + override fun invoke(scheduler: StageWorkflowService) = object : JobAdmissionPolicy.Logic { + override fun invoke(job: JobState): JobAdmissionPolicy.Advice = JobAdmissionPolicy.Advice.ADMIT + } + + override fun toString(): String = "Always" } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt new file mode 100644 index 00000000..c1f23376 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt @@ -0,0 +1,59 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.workload.Job +import java.util.Random + +object RandomJobOrderPolicy : JobOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator = + object : Comparator, StageWorkflowSchedulerListener { + private val random = Random(123) + private val ids = HashMap() + + init { + scheduler.addListener(this) + } + + override fun jobSubmitted(job: JobState) { + ids[job.job] = random.nextInt() + } + + override fun jobFinished(job: JobState) { + ids.remove(job.job) + } + + override fun compare(o1: JobState, o2: JobState): Int { + return compareValuesBy(o1, o2) { ids.getValue(it.job) } + } + } + + override fun toString(): String { + return "Random" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt deleted file mode 100644 index 9ce2811c..00000000 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt +++ /dev/null @@ -1,40 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.workflows.service.stage.job - -import com.atlarge.opendc.workflows.service.StageWorkflowService -import kotlin.random.Random - -/** - * The [RandomJobSortingPolicy] sorts tasks randomly. - * - * @property random The [Random] instance to use when sorting the list of tasks. - */ -class RandomJobSortingPolicy(private val random: Random = Random.Default) : JobSortingPolicy { - override fun invoke( - scheduler: StageWorkflowService, - jobs: Collection - ): List = jobs.shuffled(random) -} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt new file mode 100644 index 00000000..d6c3155b --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowService + +/** + * The [SizeJobOrderPolicy] sorts tasks based on the order of arrival in the queue. + */ +data class SizeJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = + compareBy { it.tasks.size.let { if (ascending) it else -it } } + + override fun toString(): String { + return "Job-Size(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt new file mode 100644 index 00000000..bf5bff1d --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowService + +/** + * The [SubmissionTimeJobOrderPolicy] sorts tasks based on the order of arrival in the queue. + */ +data class SubmissionTimeJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = + compareBy { it.submittedAt.let { if (ascending) it else -it } } + + override fun toString(): String { + return "Submission-Time(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt index e2490214..a5671d45 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt @@ -30,11 +30,10 @@ import com.atlarge.opendc.workflows.service.StageWorkflowService /** * A [ResourceSelectionPolicy] that selects the first machine that is available. */ -class FirstFitResourceSelectionPolicy : ResourceSelectionPolicy { - override fun select( - scheduler: StageWorkflowService, - machines: List, - task: StageWorkflowService.TaskView - ): Node? = - machines.firstOrNull() +object FirstFitResourceSelectionPolicy : ResourceSelectionPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : Comparator { + override fun compare(o1: Node, o2: Node): Int = 1 + } + + override fun toString(): String = "First-Fit" } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt deleted file mode 100644 index a8f2fda9..00000000 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.workflows.service.stage.resource - -import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.workflows.service.StageWorkflowService - -/** - * A [ResourceDynamicFilterPolicy] based on the amount of cores available on the machine and the cores required for - * the task. - */ -class FunctionalResourceDynamicFilterPolicy : ResourceDynamicFilterPolicy { - override fun invoke( - scheduler: StageWorkflowService, - machines: List, - task: StageWorkflowService.TaskView - ): List { - return machines - .filter { it in scheduler.available } - } -} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt new file mode 100644 index 00000000..5b0afccf --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt @@ -0,0 +1,44 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.resource + +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +/** + * A [ResourceFilterPolicy] based on the amount of cores available on the machine and the cores required for + * the task. + */ +object FunctionalResourceFilterPolicy : ResourceFilterPolicy { + override fun invoke(scheduler: StageWorkflowService): ResourceFilterPolicy.Logic = + object : ResourceFilterPolicy.Logic { + override fun invoke(hosts: Sequence, task: TaskState): Sequence = + hosts.filter { it in scheduler.available } + } + + + override fun toString(): String = "functional" +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt new file mode 100644 index 00000000..5f070e27 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt @@ -0,0 +1,44 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.resource + +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService +import java.util.Random + +object RandomResourceSelectionPolicy : ResourceSelectionPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : Comparator { + private val ids: Map + + init { + val random = Random(123) + ids = scheduler.nodes.associateWith { random.nextLong() } + } + + override fun compare(o1: Node, o2: Node): Int = compareValuesBy(o1, o2) { ids[it] } + } + + override fun toString(): String = "Random" +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt deleted file mode 100644 index 8d8ceec2..00000000 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt +++ /dev/null @@ -1,49 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.workflows.service.stage.resource - -import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.workflows.service.StageWorkflowService - -/** - * This interface represents the **R4** stage of the Reference Architecture for Schedulers and acts as a filter yielding - * a list of resources with sufficient resource-capacities, based on fixed or dynamic requirements, and on predicted or - * monitored information about processing unit availability, memory occupancy, etc. - */ -interface ResourceDynamicFilterPolicy { - /** - * Filter the list of machines based on dynamic information. - * - * @param scheduler The scheduler to filter the machines. - * @param machines The list of machines in the system. - * @param task The task that is to be scheduled. - * @return The machines on which the task can be scheduled. - */ - operator fun invoke( - scheduler: StageWorkflowService, - machines: List, - task: StageWorkflowService.TaskView - ): List -} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt new file mode 100644 index 00000000..28ef970f --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.resource + +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.TaskState +import com.atlarge.opendc.workflows.service.stage.StagePolicy + +/** + * This interface represents stages **R2**, **R3** and **R4** stage of the Reference Architecture for Schedulers and + * acts as a filter yielding a list of resources with sufficient resource-capacities, based on fixed or dynamic + * requirements, and on predicted or monitored information about processing unit availability, memory occupancy, etc. + */ +interface ResourceFilterPolicy : StagePolicy { + interface Logic { + /** + * Filter the list of machines based on dynamic information. + * + * @param hosts The hosts to filter. + * @param task The task that is to be scheduled. + * @return The machines on which the task can be scheduled. + */ + operator fun invoke(hosts: Sequence, task: TaskState): Sequence + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt index 38fe5886..43053097 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt @@ -25,24 +25,10 @@ package com.atlarge.opendc.workflows.service.stage.resource import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.stage.StagePolicy /** * This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected * task with a (set of) resource(s), using policies such as First-Fit, Worst-Fit, and Best-Fit. */ -interface ResourceSelectionPolicy { - /** - * Select a machine on which the task should be scheduled. - * - * @param scheduler The scheduler to select the machine. - * @param machines The list of machines in the system. - * @param task The task that is to be scheduled. - * @return The selected machine or `null` if no machine could be found. - */ - fun select( - scheduler: StageWorkflowService, - machines: List, - task: StageWorkflowService.TaskView - ): Node? -} +interface ResourceSelectionPolicy : StagePolicy> diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt new file mode 100644 index 00000000..15f0d0dc --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +data class ActiveTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator = + object : Comparator, StageWorkflowSchedulerListener { + private val active = mutableMapOf() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + active[job] = 0 + } + + override fun jobFinished(job: JobState) { + active.remove(job) + } + + override fun taskAssigned(task: TaskState) { + active.merge(task.job, 1, Int::plus) + } + + override fun taskFinished(task: TaskState) { + active.merge(task.job, -1, Int::plus) + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { active.getValue(it.job) }.let { + if (ascending) it else -it + } + } + } + + override fun toString(): String { + return "Active-Per-Job(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt new file mode 100644 index 00000000..f6311644 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt @@ -0,0 +1,71 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState +import kotlin.math.max + +data class BalancingTaskEligibilityPolicy(val tolerance: Double = 1.5) : TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = + object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener { + private val active = mutableMapOf() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + active[job] = 0 + } + + override fun jobFinished(job: JobState) { + active.remove(job) + } + + override fun taskAssigned(task: TaskState) { + active.merge(task.job, 1, Int::plus) + } + + override fun taskFinished(task: TaskState) { + active.merge(task.job, -1, Int::plus) + } + + override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice { + val activeJobs = scheduler.activeJobs.size + val activeTasks = scheduler.activeTasks.size + val baseline = max(activeTasks / activeJobs.toDouble(), 1.0) + val activeForJob = active[task.job]!! + return if ((activeForJob + 1) / baseline < tolerance) + TaskEligibilityPolicy.Advice.ADMIT + else + TaskEligibilityPolicy.Advice.DENY + } + } + + override fun toString(): String = "Job-Balance($tolerance)" +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt new file mode 100644 index 00000000..29d6f6b5 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt @@ -0,0 +1,63 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +data class CompletionTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator = + object : Comparator, StageWorkflowSchedulerListener { + private val finished = mutableMapOf() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + finished[job] = 0 + } + + override fun jobFinished(job: JobState) { + finished.remove(job) + } + + override fun taskFinished(task: TaskState) { + finished.merge(task.job, 1, Int::plus) + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { finished.getValue(it.job) / it.job.tasks.size.toDouble() }.let { + if (ascending) it else -it + } + } + } + + override fun toString(): String { + return "Job-Completion(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt new file mode 100644 index 00000000..e49c4858 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt @@ -0,0 +1,41 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +/** + * The [DependenciesTaskOrderPolicy] sorts tasks based on the number of dependency tasks it has. + */ +data class DependenciesTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = compareBy { + it.task.dependencies.size.let { if (ascending) it else -it } + } + + override fun toString(): String { + return "Task-Dependencies(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt new file mode 100644 index 00000000..9dcaecb2 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt @@ -0,0 +1,41 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +/** + * The [DependentsTaskOrderPolicy] sorts tasks based on the number of dependent tasks it has. + */ +data class DependentsTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = compareBy { + it.dependents.size.let { if (ascending) it else -it } + } + + override fun toString(): String { + return "Task-Dependents(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt new file mode 100644 index 00000000..ea6bf541 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt @@ -0,0 +1,69 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +data class DurationHistoryTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + + override fun invoke(scheduler: StageWorkflowService): Comparator = + object : Comparator, StageWorkflowSchedulerListener { + private val results = HashMap>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + results[job] = mutableListOf() + } + + override fun jobFinished(job: JobState) { + results.remove(job) + } + + override fun taskFinished(task: TaskState) { + results.getValue(task.job) += task.finishedAt - task.startedAt + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { key -> + val history = results.getValue(key.job) + if (history.isEmpty()) { + Long.MAX_VALUE + } else { + history.average() + } + }.let { if (ascending) it else -it } + } + } + + override fun toString(): String { + return "Task-Duration-History(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt new file mode 100644 index 00000000..25214367 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState +import java.util.UUID + +/** + * The [DurationTaskOrderPolicy] sorts tasks based on the duration of the task. + */ +data class DurationTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + + override fun invoke(scheduler: StageWorkflowService): Comparator = + object : Comparator, StageWorkflowSchedulerListener { + private val results = HashMap() + + init { + scheduler.addListener(this) + } + + override fun taskReady(task: TaskState) { + val estimable = task.task.application as? Estimable + results[task.task.uid] = estimable?.estimate(resources) ?: Duration.POSITIVE_INFINITY + } + + override fun taskFinished(task: TaskState) { + results -= task.task.uid + } + + private val TaskState.duration: Double + get() = results.getValue(task.uid) + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { state -> state.duration }.let { + if (ascending) it else -it + } + } + } + + override fun toString(): String { + return "Task-Duration(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt deleted file mode 100644 index bba81d27..00000000 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.workflows.service.stage.task - -import com.atlarge.opendc.workflows.service.StageWorkflowService - -/** - * The [FifoTaskSortingPolicy] sorts tasks based on the order of arrival in the queue. - */ -class FifoTaskSortingPolicy : TaskSortingPolicy { - override fun invoke( - scheduler: StageWorkflowService, - tasks: Collection - ): List = tasks.toList() -} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt deleted file mode 100644 index 72ecbee2..00000000 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.workflows.service.stage.task - -import com.atlarge.opendc.workflows.service.StageWorkflowService -import com.atlarge.opendc.workflows.service.TaskState - -/** - * A [TaskEligibilityPolicy] that marks tasks as eligible if they are tasks roots within the job. - */ -class FunctionalTaskEligibilityPolicy : TaskEligibilityPolicy { - override fun isEligible( - scheduler: StageWorkflowService, - task: StageWorkflowService.TaskView - ): Boolean = task.state == TaskState.READY -} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt new file mode 100644 index 00000000..330b191e --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +data class LimitPerJobTaskEligibilityPolicy(val limit: Int): TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = + object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener { + private val active = mutableMapOf() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + active[job] = 0 + } + + override fun jobFinished(job: JobState) { + active.remove(job) + } + + override fun taskAssigned(task: TaskState) { + active.merge(task.job, 1, Int::plus) + } + + override fun taskFinished(task: TaskState) { + active.merge(task.job, -1, Int::plus) + } + + override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice { + val activeForJob = active[task.job]!! + return if (activeForJob <= limit) + TaskEligibilityPolicy.Advice.ADMIT + else + TaskEligibilityPolicy.Advice.DENY + } + } + + override fun toString(): String = "Limit-Active-Job($limit)" +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt new file mode 100644 index 00000000..1e8a8158 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt @@ -0,0 +1,43 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +data class LimitTaskEligibilityPolicy(val limit: Int): TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { + override fun invoke( + task: TaskState + ): TaskEligibilityPolicy.Advice = + if (scheduler.activeTasks.size < limit) + TaskEligibilityPolicy.Advice.ADMIT + else + TaskEligibilityPolicy.Advice.STOP + } + + + override fun toString(): String = "Limit-Active($limit)" +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt new file mode 100644 index 00000000..a53ab8a5 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt @@ -0,0 +1,43 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +data class LoadTaskEligibilityPolicy(val limit: Double): TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { + override fun invoke( + task: TaskState + ): TaskEligibilityPolicy.Advice = + if (scheduler.load < limit) + TaskEligibilityPolicy.Advice.ADMIT + else + TaskEligibilityPolicy.Advice.STOP + } + + + override fun toString(): String = "Limit-Load($limit)" +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt new file mode 100644 index 00000000..3dc86e85 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt @@ -0,0 +1,43 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +/** + * A [TaskEligibilityPolicy] that marks tasks as eligible if they are tasks roots within the job. + */ +object NullTaskEligibilityPolicy : TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = Logic + + private object Logic : TaskEligibilityPolicy.Logic { + override fun invoke( + task: TaskState + ): TaskEligibilityPolicy.Advice = TaskEligibilityPolicy.Advice.ADMIT + } + + override fun toString(): String = "Always" +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt new file mode 100644 index 00000000..2aa41a2b --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt @@ -0,0 +1,45 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState +import java.util.Random + +data class RandomTaskEligibilityPolicy(val probability: Double = 0.5): TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { + val random = Random(123) + + override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice = + if (random.nextDouble() <= probability || scheduler.activeTasks.isEmpty()) + TaskEligibilityPolicy.Advice.ADMIT + else { + TaskEligibilityPolicy.Advice.DENY + } + } + + + override fun toString(): String = "Random($probability)" +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt new file mode 100644 index 00000000..60aad9a7 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt @@ -0,0 +1,64 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState +import com.atlarge.opendc.workflows.workload.Task +import kotlin.random.Random + +/** + * The [RandomTaskOrderPolicy] sorts tasks randomly. + * + * @property random The [Random] instance to use when sorting the list of tasks. + */ +object RandomTaskOrderPolicy : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator = + object : Comparator, StageWorkflowSchedulerListener { + private val random = Random(123) + private val ids = HashMap() + + init { + scheduler.addListener(this) + } + + override fun taskReady(task: TaskState) { + ids[task.task] = random.nextInt() + } + + override fun taskFinished(task: TaskState) { + ids.remove(task.task) + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { ids.getValue(it.task) } + } + } + + override fun toString(): String { + return "Random" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt deleted file mode 100644 index 1b1d5b44..00000000 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt +++ /dev/null @@ -1,40 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.workflows.service.stage.task - -import com.atlarge.opendc.workflows.service.StageWorkflowService -import kotlin.random.Random - -/** - * The [RandomTaskSortingPolicy] sorts tasks randomly. - * - * @property random The [Random] instance to use when sorting the list of tasks. - */ -class RandomTaskSortingPolicy(private val random: Random = Random.Default) : TaskSortingPolicy { - override fun invoke( - scheduler: StageWorkflowService, - tasks: Collection - ): List = tasks.shuffled(random) -} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt new file mode 100644 index 00000000..998d782b --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt @@ -0,0 +1,41 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +/** + * The [SubmissionTimeTaskOrderPolicy] sorts tasks based on the order of arrival in the queue. + */ +data class SubmissionTimeTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = compareBy { + it.job.submittedAt.let { if (ascending) it else -it } + } + + override fun toString(): String { + return "Submission-Time(${if (ascending) "asc" else "desc"})" + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt index 19954d7b..75529005 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt @@ -24,25 +24,49 @@ package com.atlarge.opendc.workflows.service.stage.task -import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState +import com.atlarge.opendc.workflows.service.stage.StagePolicy /** * A policy interface for determining the eligibility of tasks in a scheduling cycle. */ -interface TaskEligibilityPolicy { - /** - * A method that is invoked at the start of each scheduling cycle. - * - * @param scheduler The scheduler that started the cycle. - */ - fun startCycle(scheduler: StageWorkflowService) {} +interface TaskEligibilityPolicy : StagePolicy { + interface Logic { + /** + * Determine whether the specified [TaskState] is eligible to be scheduled. + * + * @param task The task instance to schedule. + * @return The advice for marking the task. + */ + operator fun invoke(task: TaskState): Advice + } /** - * Determine whether the specified [StageWorkflowService.TaskView] is eligible to be scheduled. + * The advice given to the scheduler by an admission policy. * - * @param scheduler The scheduler that is determining whether the task is eligible. - * @param task The task instance to schedule. - * @return `true` if the task eligible to be scheduled, `false` otherwise. + * @property admit A flag to indicate to the scheduler that the job should be admitted. + * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait + * for the next scheduling cycle. */ - fun isEligible(scheduler: StageWorkflowService, task: StageWorkflowService.TaskView): Boolean + enum class Advice(val admit: Boolean, val stop: Boolean) { + /** + * Admit the current job to the scheduling queue and continue admitting jobs. + */ + ADMIT(true, false), + + /** + * Admit the current job to the scheduling queue and stop admitting jobs. + */ + ADMIT_LAST(true, true), + + /** + * Deny the current job, but continue admitting jobs. + */ + DENY(false, false), + + /** + * Deny the current job and also stop admitting jobs. + */ + STOP(false, true) + } } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskOrderPolicy.kt new file mode 100644 index 00000000..bfdc7924 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskOrderPolicy.kt @@ -0,0 +1,34 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.TaskState +import com.atlarge.opendc.workflows.service.stage.StagePolicy + +/** + * This interface represents the **T2** stage of the Reference Architecture for Datacenter Schedulers and provides the + * scheduler with a sorted list of tasks to schedule. + */ +interface TaskOrderPolicy : StagePolicy> diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt deleted file mode 100644 index aabc44a9..00000000 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.workflows.service.stage.task - -import com.atlarge.opendc.workflows.service.StageWorkflowService - -/** - * This interface represents the **T2** stage of the Reference Architecture for Datacenter Schedulers and provides the - * scheduler with a sorted list of tasks to schedule. - */ -interface TaskSortingPolicy { - /** - * Sort the given list of tasks on a given criterion. - * - * @param scheduler The scheduler that is sorting the tasks. - * @param tasks The collection of tasks that should be sorted. - * @return The sorted list of tasks. - */ - operator fun invoke( - scheduler: StageWorkflowService, - tasks: Collection - ): List -} -- cgit v1.2.3 From c232f6260e6d41ce5e9ac1c930a050690680c704 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 24 Feb 2020 17:16:39 +0100 Subject: feat: Add support for workflow tasks with known duration This change adds support for workflow tasks that have a known duration. This allows the workflow scheduler to employ heuristics for a faster schedule. --- .../opendc/experiments/sc18/TestExperiment.kt | 18 ++++++------- .../opendc/format/trace/gwf/GwfTraceReader.kt | 4 ++- .../workflows/service/StageWorkflowService.kt | 23 ++++++++++++----- .../atlarge/opendc/workflows/service/TaskState.kt | 1 - .../workflows/service/WorkflowSchedulerMode.kt | 1 - .../service/stage/job/DurationJobOrderPolicy.kt | 12 ++++----- .../service/stage/job/JobAdmissionPolicy.kt | 2 +- .../workflows/service/stage/job/JobOrderPolicy.kt | 1 - .../resource/FunctionalResourceFilterPolicy.kt | 1 - .../service/stage/task/DurationTaskOrderPolicy.kt | 9 ++++--- .../stage/task/LimitPerJobTaskEligibilityPolicy.kt | 2 +- .../stage/task/LimitTaskEligibilityPolicy.kt | 3 +-- .../stage/task/LoadTaskEligibilityPolicy.kt | 3 +-- .../stage/task/RandomTaskEligibilityPolicy.kt | 3 +-- .../service/stage/task/TaskEligibilityPolicy.kt | 2 +- .../com/atlarge/opendc/workflows/workload/Job.kt | 4 ++- .../atlarge/opendc/workflows/workload/Metadata.kt | 30 ++++++++++++++++++++++ .../com/atlarge/opendc/workflows/workload/Task.kt | 4 ++- 18 files changed, 82 insertions(+), 41 deletions(-) create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Metadata.kt diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt index 415221ca..96796c07 100644 --- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt +++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt @@ -31,12 +31,12 @@ import com.atlarge.opendc.format.trace.gwf.GwfTraceReader import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.WorkflowSchedulerMode -import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy +import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy -import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceDynamicFilterPolicy -import com.atlarge.opendc.workflows.service.stage.task.FifoTaskSortingPolicy -import com.atlarge.opendc.workflows.service.stage.task.FunctionalTaskEligibilityPolicy +import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy +import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy +import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy import com.atlarge.opendc.workflows.workload.Job import com.atlarge.opendc.workflows.workload.Task import kotlinx.coroutines.channels.Channel @@ -92,11 +92,11 @@ fun main(args: Array) { environment.platforms[0].zones[0].services[ProvisioningService.Key], mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, - jobSortingPolicy = SubmissionTimeJobOrderPolicy(), - taskEligibilityPolicy = FunctionalTaskEligibilityPolicy(), - taskSortingPolicy = FifoTaskSortingPolicy(), - resourceDynamicFilterPolicy = FunctionalResourceDynamicFilterPolicy(), - resourceSelectionPolicy = FirstFitResourceSelectionPolicy() + jobOrderPolicy = SubmissionTimeJobOrderPolicy(), + taskEligibilityPolicy = NullTaskEligibilityPolicy, + taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), + resourceFilterPolicy = FunctionalResourceFilterPolicy, + resourceSelectionPolicy = FirstFitResourceSelectionPolicy ) val reader = GwfTraceReader(File(args[0])) diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt index 498e2d1d..3a4e2e89 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt @@ -30,6 +30,7 @@ import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader import com.atlarge.opendc.workflows.workload.Job import com.atlarge.opendc.workflows.workload.Task +import com.atlarge.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE import java.io.BufferedReader import java.io.File import java.io.InputStream @@ -121,7 +122,8 @@ class GwfTraceReader(reader: BufferedReader) : TraceReader { val task = Task( UUID(0L, taskId), "", FlopsApplicationImage(UUID.randomUUID(), "", emptyMap(), flops, cores), - HashSet() + HashSet(), + mapOf(WORKFLOW_TASK_DEADLINE to runtime) ) entry.submissionTime = min(entry.submissionTime, submitTime) (workflow.tasks as MutableSet).add(task) diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index e19f5446..48f06bcd 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -38,9 +38,9 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy import com.atlarge.opendc.workflows.workload.Job -import kotlinx.coroutines.launch import java.util.PriorityQueue import java.util.Queue +import kotlinx.coroutines.launch /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for @@ -88,7 +88,6 @@ class StageWorkflowService( */ internal val activeTasks: MutableSet = mutableSetOf() - /** * The running tasks by [Server]. */ @@ -104,7 +103,6 @@ class StageWorkflowService( */ internal val available: MutableSet = mutableSetOf() - /** * The maximum number of incoming jobs. */ @@ -180,7 +178,7 @@ class StageWorkflowService( this.taskEligibilityPolicy = taskEligibilityPolicy(this) this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid }) this.resourceFilterPolicy = resourceFilterPolicy(this) - this.resourceSelectionPolicy = resourceSelectionPolicy(this) + this.resourceSelectionPolicy = resourceSelectionPolicy(this) } override suspend fun submit(job: Job, monitor: WorkflowMonitor) { @@ -209,7 +207,6 @@ class StageWorkflowService( requestCycle() } - /** * Indicate to the scheduler that a scheduling cycle is needed. */ @@ -257,6 +254,21 @@ class StageWorkflowService( } } + // T1 Create list of eligible tasks + val taskIterator = incomingTasks.iterator() + while (taskIterator.hasNext()) { + val taskInstance = taskIterator.next() + val advice = taskEligibilityPolicy(taskInstance) + if (advice.stop) { + break + } else if (!advice.admit) { + continue + } + + taskIterator.remove() + taskQueue.add(taskInstance) + } + // T3 Per task while (taskQueue.isNotEmpty()) { val instance = taskQueue.peek() @@ -325,7 +337,6 @@ class StageWorkflowService( rootListener.jobFinished(job) } - fun addListener(listener: StageWorkflowSchedulerListener) { rootListener.listeners += listener } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt index a69b5235..acd5731b 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt @@ -25,7 +25,6 @@ package com.atlarge.opendc.workflows.service import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.workflows.service.StageWorkflowService.TaskView import com.atlarge.opendc.workflows.workload.Task class TaskState(val job: JobState, val task: Task) { diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt index f5a0f49b..cfec93b5 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt @@ -55,7 +55,6 @@ sealed class WorkflowSchedulerMode : StagePolicy { } } - override fun toString(): String = "Interactive" } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt index 0b3f567a..6581b7d3 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt @@ -29,6 +29,7 @@ import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.workload.Job import com.atlarge.opendc.workflows.workload.Task +import com.atlarge.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE /** * The [DurationJobOrderPolicy] sorts tasks based on the critical path length of the job. @@ -36,7 +37,7 @@ import com.atlarge.opendc.workflows.workload.Task data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { override fun invoke(scheduler: StageWorkflowService): Comparator = object : Comparator, StageWorkflowSchedulerListener { - private val results = HashMap() + private val results = HashMap() init { scheduler.addListener(this) @@ -46,10 +47,10 @@ data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolic get() = results[this]!! override fun jobSubmitted(job: JobState) { - results[job.job] = job.job.toposort().sumByDouble { task -> - val estimable = task.application as? Estimable - estimable?.estimate(resources) ?: Duration.POSITIVE_INFINITY - } + results[job.job] = job.job.toposort().map { task -> + val estimable = task.metadata[WORKFLOW_TASK_DEADLINE] as? Long? + estimable ?: Long.MAX_VALUE + }.sum() } override fun jobFinished(job: JobState) { @@ -61,7 +62,6 @@ data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolic } } - override fun toString(): String { return "Job-Duration(${if (ascending) "asc" else "desc"})" } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt index a3645523..535d7792 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt @@ -45,7 +45,7 @@ interface JobAdmissionPolicy : StagePolicy { * The advice given to the scheduler by an admission policy. * * @property admit A flag to indicate to the scheduler that the job should be admitted. - * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait + * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait * for the next scheduling cycle. */ enum class Advice(val admit: Boolean, val stop: Boolean) { diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt index 488148af..ba57f064 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt @@ -31,4 +31,3 @@ import com.atlarge.opendc.workflows.service.stage.StagePolicy * A policy interface for ordering admitted workflows in the scheduling queue. */ interface JobOrderPolicy : StagePolicy> - diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt index 5b0afccf..0e83d8d7 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt @@ -39,6 +39,5 @@ object FunctionalResourceFilterPolicy : ResourceFilterPolicy { hosts.filter { it in scheduler.available } } - override fun toString(): String = "functional" } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt index 25214367..96ee233e 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt @@ -27,6 +27,7 @@ package com.atlarge.opendc.workflows.service.stage.task import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState +import com.atlarge.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE import java.util.UUID /** @@ -36,22 +37,22 @@ data class DurationTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPol override fun invoke(scheduler: StageWorkflowService): Comparator = object : Comparator, StageWorkflowSchedulerListener { - private val results = HashMap() + private val results = HashMap() init { scheduler.addListener(this) } override fun taskReady(task: TaskState) { - val estimable = task.task.application as? Estimable - results[task.task.uid] = estimable?.estimate(resources) ?: Duration.POSITIVE_INFINITY + val deadline = task.task.metadata[WORKFLOW_TASK_DEADLINE] as? Long? + results[task.task.uid] = deadline ?: Long.MAX_VALUE } override fun taskFinished(task: TaskState) { results -= task.task.uid } - private val TaskState.duration: Double + private val TaskState.duration: Long get() = results.getValue(task.uid) override fun compare(o1: TaskState, o2: TaskState): Int { diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt index 330b191e..8202c4cd 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt @@ -29,7 +29,7 @@ import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState -data class LimitPerJobTaskEligibilityPolicy(val limit: Int): TaskEligibilityPolicy { +data class LimitPerJobTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy { override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener { private val active = mutableMapOf() diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt index 1e8a8158..29324a27 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt @@ -27,7 +27,7 @@ package com.atlarge.opendc.workflows.service.stage.task import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState -data class LimitTaskEligibilityPolicy(val limit: Int): TaskEligibilityPolicy { +data class LimitTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy { override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { override fun invoke( task: TaskState @@ -38,6 +38,5 @@ data class LimitTaskEligibilityPolicy(val limit: Int): TaskEligibilityPolicy { TaskEligibilityPolicy.Advice.STOP } - override fun toString(): String = "Limit-Active($limit)" } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt index a53ab8a5..3f5232d4 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt @@ -27,7 +27,7 @@ package com.atlarge.opendc.workflows.service.stage.task import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState -data class LoadTaskEligibilityPolicy(val limit: Double): TaskEligibilityPolicy { +data class LoadTaskEligibilityPolicy(val limit: Double) : TaskEligibilityPolicy { override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { override fun invoke( task: TaskState @@ -38,6 +38,5 @@ data class LoadTaskEligibilityPolicy(val limit: Double): TaskEligibilityPolicy { TaskEligibilityPolicy.Advice.STOP } - override fun toString(): String = "Limit-Load($limit)" } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt index 2aa41a2b..c15ec741 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt @@ -28,7 +28,7 @@ import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState import java.util.Random -data class RandomTaskEligibilityPolicy(val probability: Double = 0.5): TaskEligibilityPolicy { +data class RandomTaskEligibilityPolicy(val probability: Double = 0.5) : TaskEligibilityPolicy { override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { val random = Random(123) @@ -40,6 +40,5 @@ data class RandomTaskEligibilityPolicy(val probability: Double = 0.5): TaskEligi } } - override fun toString(): String = "Random($probability)" } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt index 75529005..72a7fdd0 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt @@ -45,7 +45,7 @@ interface TaskEligibilityPolicy : StagePolicy { * The advice given to the scheduler by an admission policy. * * @property admit A flag to indicate to the scheduler that the job should be admitted. - * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait + * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait * for the next scheduling cycle. */ enum class Advice(val admit: Boolean, val stop: Boolean) { diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt index dece875c..40389ce2 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt @@ -35,12 +35,14 @@ import java.util.UUID * @property name The name of this workflow. * @property owner The owner of the workflow. * @property tasks The tasks that are part of this workflow. + * @property metadata Additional metadata for the job. */ data class Job( override val uid: UUID, override val name: String, override val owner: User, - val tasks: Set + val tasks: Set, + val metadata: Map = emptyMap() ) : Workload { override fun equals(other: Any?): Boolean = other is Job && uid == other.uid diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Metadata.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Metadata.kt new file mode 100644 index 00000000..067f1179 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Metadata.kt @@ -0,0 +1,30 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.workload + +/** + * Meta-data key for the deadline of a task. + */ +const val WORKFLOW_TASK_DEADLINE = "workflow:task:deadline" diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt index b5997b35..82521faa 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt @@ -35,12 +35,14 @@ import java.util.UUID * @property name The name of this task. * @property image The application image to run as part of this workflow task. * @property dependencies The dependencies of this task in order for it to execute. + * @property metadata Additional metadata for this task. */ data class Task( override val uid: UUID, override val name: String, val image: Image, - val dependencies: Set + val dependencies: Set, + val metadata: Map = emptyMap() ) : Identity { override fun equals(other: Any?): Boolean = other is Task && uid == other.uid -- cgit v1.2.3 From 3dedd93628b7fb73613850ea1b4e870b70b45f27 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 24 Feb 2020 20:23:36 +0100 Subject: docs: Document added scheduling policies --- .../opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt | 2 +- .../opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt | 2 +- .../opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt | 3 +++ .../opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt | 2 +- .../workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt | 2 +- .../service/stage/resource/RandomResourceSelectionPolicy.kt | 3 +++ .../opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt | 3 +++ .../workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt | 7 +++++++ .../workflows/service/stage/task/CompletionTaskOrderPolicy.kt | 3 +++ .../workflows/service/stage/task/DependenciesTaskOrderPolicy.kt | 2 +- .../workflows/service/stage/task/DependentsTaskOrderPolicy.kt | 2 +- .../workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt | 3 +++ .../opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt | 2 +- .../service/stage/task/LimitPerJobTaskEligibilityPolicy.kt | 3 +++ .../workflows/service/stage/task/LimitTaskEligibilityPolicy.kt | 3 +++ .../workflows/service/stage/task/LoadTaskEligibilityPolicy.kt | 3 +++ .../workflows/service/stage/task/NullTaskEligibilityPolicy.kt | 2 +- .../workflows/service/stage/task/RandomTaskEligibilityPolicy.kt | 3 +++ .../opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt | 4 +--- .../workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt | 2 +- 20 files changed, 44 insertions(+), 12 deletions(-) diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt index 6581b7d3..bbdb9f71 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt @@ -32,7 +32,7 @@ import com.atlarge.opendc.workflows.workload.Task import com.atlarge.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE /** - * The [DurationJobOrderPolicy] sorts tasks based on the critical path length of the job. + * A [JobOrderPolicy] that orders jobs based on its critical path length. */ data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { override fun invoke(scheduler: StageWorkflowService): Comparator = diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt index a58d965f..e1c27472 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt @@ -28,7 +28,7 @@ import com.atlarge.opendc.workflows.service.JobState import com.atlarge.opendc.workflows.service.StageWorkflowService /** - * A [JobAdmissionPolicy] that limits the amount of jobs based on the system load. + * A [JobAdmissionPolicy] that limits the amount of jobs based on the average system load. * * @property limit The maximum load before stopping admission. */ diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt index c1f23376..14a3d98d 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt @@ -30,6 +30,9 @@ import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.workload.Job import java.util.Random +/** + * A [JobOrderPolicy] that randomly orders jobs. + */ object RandomJobOrderPolicy : JobOrderPolicy { override fun invoke(scheduler: StageWorkflowService): Comparator = object : Comparator, StageWorkflowSchedulerListener { diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt index d6c3155b..3bce43cf 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt @@ -28,7 +28,7 @@ import com.atlarge.opendc.workflows.service.JobState import com.atlarge.opendc.workflows.service.StageWorkflowService /** - * The [SizeJobOrderPolicy] sorts tasks based on the order of arrival in the queue. + * A [SizeJobOrderPolicy] that orders jobs based on the number of tasks it has. */ data class SizeJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { override fun invoke(scheduler: StageWorkflowService) = diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt index bf5bff1d..d6e24b2b 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt @@ -28,7 +28,7 @@ import com.atlarge.opendc.workflows.service.JobState import com.atlarge.opendc.workflows.service.StageWorkflowService /** - * The [SubmissionTimeJobOrderPolicy] sorts tasks based on the order of arrival in the queue. + * A [JobOrderPolicy] orders jobs in FIFO order. */ data class SubmissionTimeJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { override fun invoke(scheduler: StageWorkflowService) = diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt index 5f070e27..9b05cbac 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt @@ -28,6 +28,9 @@ import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.workflows.service.StageWorkflowService import java.util.Random +/** + * A [ResourceSelectionPolicy] that randomly orders the machines. + */ object RandomResourceSelectionPolicy : ResourceSelectionPolicy { override fun invoke(scheduler: StageWorkflowService) = object : Comparator { private val ids: Map diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt index 15f0d0dc..b084d26c 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt @@ -29,6 +29,9 @@ import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState +/** + * A [TaskOrderPolicy] that orders tasks based on the number of active relative tasks (w.r.t. its job) in the system. + */ data class ActiveTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { override fun invoke(scheduler: StageWorkflowService): Comparator = object : Comparator, StageWorkflowSchedulerListener { diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt index f6311644..2255d40c 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt @@ -30,6 +30,13 @@ import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState import kotlin.math.max +/** + * A [TaskEligibilityPolicy] that balances the tasks based on their job, e.g. do not allow a single job to claim all + * resources of the system. + * + * @property tolerance The maximum difference from the average number of tasks per job in the system as a fraction of + * the average. + */ data class BalancingTaskEligibilityPolicy(val tolerance: Double = 1.5) : TaskEligibilityPolicy { override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener { diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt index 29d6f6b5..d0cf1374 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt @@ -29,6 +29,9 @@ import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState +/** + * A [TaskOrderPolicy] that orders tasks based on the number of completed relative tasks. + */ data class CompletionTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { override fun invoke(scheduler: StageWorkflowService): Comparator = object : Comparator, StageWorkflowSchedulerListener { diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt index e49c4858..73d83d21 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt @@ -28,7 +28,7 @@ import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState /** - * The [DependenciesTaskOrderPolicy] sorts tasks based on the number of dependency tasks it has. + * A [TaskOrderPolicy] that orders tasks based on the number of dependency tasks it has. */ data class DependenciesTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { override fun invoke(scheduler: StageWorkflowService) = compareBy { diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt index 9dcaecb2..85b3543f 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt @@ -28,7 +28,7 @@ import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState /** - * The [DependentsTaskOrderPolicy] sorts tasks based on the number of dependent tasks it has. + * A [TaskOrderPolicy] that orders tasks based on the number of dependent tasks it has. */ data class DependentsTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { override fun invoke(scheduler: StageWorkflowService) = compareBy { diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt index ea6bf541..426a76a4 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt @@ -29,6 +29,9 @@ import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState +/** + * A [TaskOrderPolicy] that orders tasks based on the average duration of the preceding tasks in the job. + */ data class DurationHistoryTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { override fun invoke(scheduler: StageWorkflowService): Comparator = diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt index 96ee233e..23b47891 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt @@ -31,7 +31,7 @@ import com.atlarge.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE import java.util.UUID /** - * The [DurationTaskOrderPolicy] sorts tasks based on the duration of the task. + * A [TaskOrderPolicy] orders tasks based on the pre-specified (approximate) duration of the task. */ data class DurationTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt index 8202c4cd..c039bf6f 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt @@ -29,6 +29,9 @@ import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState +/** + * A [TaskEligibilityPolicy] that limits the number of active tasks of a job in the system. + */ data class LimitPerJobTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy { override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener { diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt index 29324a27..75322ef5 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt @@ -27,6 +27,9 @@ package com.atlarge.opendc.workflows.service.stage.task import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState +/** + * A [TaskEligibilityPolicy] that limits the total number of active tasks in the system. + */ data class LimitTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy { override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { override fun invoke( diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt index 3f5232d4..090f7be7 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt @@ -27,6 +27,9 @@ package com.atlarge.opendc.workflows.service.stage.task import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState +/** + * A [TaskEligibilityPolicy] that limits the number of active tasks in the system based on the average system load. + */ data class LoadTaskEligibilityPolicy(val limit: Double) : TaskEligibilityPolicy { override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { override fun invoke( diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt index 3dc86e85..889f2ab5 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt @@ -28,7 +28,7 @@ import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState /** - * A [TaskEligibilityPolicy] that marks tasks as eligible if they are tasks roots within the job. + * A [TaskEligibilityPolicy] that always allows new tasks to enter. */ object NullTaskEligibilityPolicy : TaskEligibilityPolicy { override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = Logic diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt index c15ec741..d6f49d14 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt @@ -28,6 +28,9 @@ import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState import java.util.Random +/** + * A [TaskEligibilityPolicy] that randomly accepts tasks in the system with some [probability]. + */ data class RandomTaskEligibilityPolicy(val probability: Double = 0.5) : TaskEligibilityPolicy { override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { val random = Random(123) diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt index 60aad9a7..4c309085 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt @@ -31,9 +31,7 @@ import com.atlarge.opendc.workflows.workload.Task import kotlin.random.Random /** - * The [RandomTaskOrderPolicy] sorts tasks randomly. - * - * @property random The [Random] instance to use when sorting the list of tasks. + * A [TaskOrderPolicy] that orders the tasks randomly. */ object RandomTaskOrderPolicy : TaskOrderPolicy { override fun invoke(scheduler: StageWorkflowService): Comparator = diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt index 998d782b..a261965f 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt @@ -28,7 +28,7 @@ import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState /** - * The [SubmissionTimeTaskOrderPolicy] sorts tasks based on the order of arrival in the queue. + * A [TaskOrderPolicy] that orders tasks based on the order of arrival in the queue. */ data class SubmissionTimeTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { override fun invoke(scheduler: StageWorkflowService) = compareBy { -- cgit v1.2.3