summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt4
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt42
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerListener.kt39
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt311
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt69
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskStatus.kt35
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt79
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/StagePolicy.kt (renamed from opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt)15
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt102
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt52
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt (renamed from opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt)18
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt47
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt47
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt13
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt59
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt40
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt (renamed from opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt)18
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt13
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt44
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt (renamed from opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt)27
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt (renamed from opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt)36
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt18
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt67
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt71
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt63
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt41
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt (renamed from opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt)21
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt69
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt67
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt67
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt (renamed from opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt)32
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt43
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt (renamed from opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt)17
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt45
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt64
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt41
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt50
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskOrderPolicy.kt (renamed from opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt)15
38 files changed, 1606 insertions, 295 deletions
diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
index 3dc8be51..415221ca 100644
--- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
@@ -31,7 +31,7 @@ import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.service.StageWorkflowService
import com.atlarge.opendc.workflows.service.WorkflowSchedulerMode
-import com.atlarge.opendc.workflows.service.stage.job.FifoJobSortingPolicy
+import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceDynamicFilterPolicy
@@ -92,7 +92,7 @@ fun main(args: Array<String>) {
environment.platforms[0].zones[0].services[ProvisioningService.Key],
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
- jobSortingPolicy = FifoJobSortingPolicy(),
+ jobSortingPolicy = SubmissionTimeJobOrderPolicy(),
taskEligibilityPolicy = FunctionalTaskEligibilityPolicy(),
taskSortingPolicy = FifoTaskSortingPolicy(),
resourceDynamicFilterPolicy = FunctionalResourceDynamicFilterPolicy(),
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt
new file mode 100644
index 00000000..b444f91c
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt
@@ -0,0 +1,42 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service
+
+import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
+import com.atlarge.opendc.workflows.workload.Job
+
+class JobState(val job: Job, val monitor: WorkflowMonitor, val submittedAt: Long) {
+ /**
+ * A flag to indicate whether this job is finished.
+ */
+ val isFinished: Boolean
+ get() = tasks.isEmpty()
+
+ val tasks: MutableSet<TaskState> = mutableSetOf()
+
+ override fun equals(other: Any?): Boolean = other is JobState && other.job == job
+
+ override fun hashCode(): Int = job.hashCode()
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerListener.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerListener.kt
new file mode 100644
index 00000000..73c3e752
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerListener.kt
@@ -0,0 +1,39 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service
+
+interface StageWorkflowSchedulerListener {
+ fun cycleStarted(scheduler: StageWorkflowService) {}
+ fun cycleFinished(scheduler: StageWorkflowService) {}
+
+ fun jobSubmitted(job: JobState) {}
+ fun jobStarted(job: JobState) {}
+ fun jobFinished(job: JobState) {}
+
+ fun taskReady(task: TaskState) {}
+ fun taskAssigned(task: TaskState) {}
+ fun taskStarted(task: TaskState) {}
+ fun taskFinished(task: TaskState) {}
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
index d7b29c32..e19f5446 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -32,15 +32,15 @@ import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy
-import com.atlarge.opendc.workflows.service.stage.job.JobSortingPolicy
-import com.atlarge.opendc.workflows.service.stage.resource.ResourceDynamicFilterPolicy
+import com.atlarge.opendc.workflows.service.stage.job.JobOrderPolicy
+import com.atlarge.opendc.workflows.service.stage.resource.ResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPolicy
import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy
-import com.atlarge.opendc.workflows.service.stage.task.TaskSortingPolicy
+import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy
import com.atlarge.opendc.workflows.workload.Job
-import com.atlarge.opendc.workflows.workload.Task
-import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
+import java.util.PriorityQueue
+import java.util.Queue
/**
* A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for
@@ -49,29 +49,50 @@ import kotlinx.coroutines.launch
class StageWorkflowService(
private val ctx: ProcessContext,
private val provisioningService: ProvisioningService,
- private val mode: WorkflowSchedulerMode,
- private val jobAdmissionPolicy: JobAdmissionPolicy,
- private val jobSortingPolicy: JobSortingPolicy,
- private val taskEligibilityPolicy: TaskEligibilityPolicy,
- private val taskSortingPolicy: TaskSortingPolicy,
- private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy,
- private val resourceSelectionPolicy: ResourceSelectionPolicy
+ mode: WorkflowSchedulerMode,
+ jobAdmissionPolicy: JobAdmissionPolicy,
+ jobOrderPolicy: JobOrderPolicy,
+ taskEligibilityPolicy: TaskEligibilityPolicy,
+ taskOrderPolicy: TaskOrderPolicy,
+ resourceFilterPolicy: ResourceFilterPolicy,
+ resourceSelectionPolicy: ResourceSelectionPolicy
) : WorkflowService, ServerMonitor {
/**
* The incoming jobs ready to be processed by the scheduler.
*/
- internal val incomingJobs: MutableSet<JobView> = mutableSetOf()
+ internal val incomingJobs: MutableSet<JobState> = linkedSetOf()
+
+ /**
+ * The incoming tasks ready to be processed by the scheduler.
+ */
+ internal val incomingTasks: MutableSet<TaskState> = linkedSetOf()
+
+ /**
+ * The job queue.
+ */
+ internal val jobQueue: Queue<JobState>
+
+ /**
+ * The task queue.
+ */
+ internal val taskQueue: Queue<TaskState>
/**
* The active jobs in the system.
*/
- internal val activeJobs: MutableSet<JobView> = mutableSetOf()
+ internal val activeJobs: MutableSet<JobState> = mutableSetOf()
+
+ /**
+ * The active tasks in the system.
+ */
+ internal val activeTasks: MutableSet<TaskState> = mutableSetOf()
+
/**
* The running tasks by [Server].
*/
- internal val taskByServer = mutableMapOf<Server, TaskView>()
+ internal val taskByServer = mutableMapOf<Server, TaskState>()
/**
* The nodes that are controlled by the service.
@@ -83,18 +104,90 @@ class StageWorkflowService(
*/
internal val available: MutableSet<Node> = mutableSetOf()
+
+ /**
+ * The maximum number of incoming jobs.
+ */
+ private val throttleLimit: Int = 20000
+
+ /**
+ * The load of the system.
+ */
+ internal val load: Double
+ get() = (available.size / nodes.size.toDouble())
+
+ /**
+ * The root listener of this scheduler.
+ */
+ private val rootListener = object : StageWorkflowSchedulerListener {
+ /**
+ * The listeners to delegate to.
+ */
+ val listeners = mutableSetOf<StageWorkflowSchedulerListener>()
+
+ override fun cycleStarted(scheduler: StageWorkflowService) {
+ listeners.forEach { it.cycleStarted(scheduler) }
+ }
+
+ override fun cycleFinished(scheduler: StageWorkflowService) {
+ listeners.forEach { it.cycleFinished(scheduler) }
+ }
+
+ override fun jobSubmitted(job: JobState) {
+ listeners.forEach { it.jobSubmitted(job) }
+ }
+
+ override fun jobStarted(job: JobState) {
+ listeners.forEach { it.jobStarted(job) }
+ }
+
+ override fun jobFinished(job: JobState) {
+ listeners.forEach { it.jobFinished(job) }
+ }
+
+ override fun taskReady(task: TaskState) {
+ listeners.forEach { it.taskReady(task) }
+ }
+
+ override fun taskAssigned(task: TaskState) {
+ listeners.forEach { it.taskAssigned(task) }
+ }
+
+ override fun taskStarted(task: TaskState) {
+ listeners.forEach { it.taskStarted(task) }
+ }
+
+ override fun taskFinished(task: TaskState) {
+ listeners.forEach { it.taskFinished(task) }
+ }
+ }
+
+ private val mode: WorkflowSchedulerMode.Logic
+ private val jobAdmissionPolicy: JobAdmissionPolicy.Logic
+ private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
+ private val resourceFilterPolicy: ResourceFilterPolicy.Logic
+ private val resourceSelectionPolicy: Comparator<Node>
+
init {
ctx.launch {
nodes = provisioningService.nodes().toList()
available.addAll(nodes)
}
+
+ this.mode = mode(this)
+ this.jobAdmissionPolicy = jobAdmissionPolicy(this)
+ this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid })
+ this.taskEligibilityPolicy = taskEligibilityPolicy(this)
+ this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid })
+ this.resourceFilterPolicy = resourceFilterPolicy(this)
+ this.resourceSelectionPolicy = resourceSelectionPolicy(this)
}
override suspend fun submit(job: Job, monitor: WorkflowMonitor) {
// J1 Incoming Jobs
- val jobInstance = JobView(job, monitor)
+ val jobInstance = JobState(job, monitor, ctx.clock.millis())
val instances = job.tasks.associateWith {
- TaskView(jobInstance, it)
+ TaskState(jobInstance, it)
}
for ((task, instance) in instances) {
@@ -105,82 +198,84 @@ class StageWorkflowService(
// If the task has no dependency, it is a root task and can immediately be evaluated
if (instance.isRoot) {
- instance.state = TaskState.READY
+ instance.state = TaskStatus.READY
}
}
- jobInstance.tasks = instances.values.toMutableSet()
+ instances.values.toCollection(jobInstance.tasks)
incomingJobs += jobInstance
+ rootListener.jobSubmitted(jobInstance)
+
requestCycle()
}
- private var next: kotlinx.coroutines.Job? = null
/**
* Indicate to the scheduler that a scheduling cycle is needed.
*/
- private fun requestCycle() {
- when (mode) {
- is WorkflowSchedulerMode.Interactive -> {
- ctx.launch {
- schedule()
- }
- }
- is WorkflowSchedulerMode.Batch -> {
- if (next == null) {
- val job = ctx.launch {
- delay(mode.quantum)
- next = null
- schedule()
- }
- next = job
- }
- }
- }
- }
+ private suspend fun requestCycle() = mode.requestCycle()
/**
* Perform a scheduling cycle immediately.
*/
- private suspend fun schedule() {
+ internal suspend fun schedule() {
// J2 Create list of eligible jobs
- jobAdmissionPolicy.startCycle(this)
- val eligibleJobs = incomingJobs.filter { jobAdmissionPolicy.shouldAdmit(this, it) }
+ val iterator = incomingJobs.iterator()
+ while (iterator.hasNext()) {
+ val jobInstance = iterator.next()
+ val advice = jobAdmissionPolicy(jobInstance)
+ if (advice.stop) {
+ break
+ } else if (!advice.admit) {
+ continue
+ }
- for (jobInstance in eligibleJobs) {
- incomingJobs -= jobInstance
+ iterator.remove()
+ jobQueue.add(jobInstance)
activeJobs += jobInstance
jobInstance.monitor.onJobStart(jobInstance.job, ctx.clock.millis())
+ rootListener.jobStarted(jobInstance)
}
- // J3 Sort jobs on criterion
- val sortedJobs = jobSortingPolicy(this, activeJobs)
-
// J4 Per job
- for (jobInstance in sortedJobs) {
- // T1 Create list of eligible tasks
- taskEligibilityPolicy.startCycle(this)
- val eligibleTasks = jobInstance.tasks.filter { taskEligibilityPolicy.isEligible(this, it) }
-
- // T2 Sort tasks on criterion
- val sortedTasks = taskSortingPolicy(this, eligibleTasks)
-
- // T3 Per task
- for (instance in sortedTasks) {
- val hosts = resourceDynamicFilterPolicy(this, nodes, instance)
- val host = resourceSelectionPolicy.select(this, hosts, instance)
-
- if (host != null) {
- // T4 Submit task to machine
- available -= host
- instance.state = TaskState.ACTIVE
-
- val newHost = provisioningService.deploy(host, instance.task.image, this)
- instance.host = newHost
- taskByServer[newHost.server!!] = instance
- } else {
- return
+ while (jobQueue.isNotEmpty()) {
+ val jobInstance = jobQueue.poll()
+
+ // Edge-case: job has no tasks
+ if (jobInstance.isFinished) {
+ finishJob(jobInstance)
+ }
+
+ // Add job roots to the scheduling queue
+ for (task in jobInstance.tasks) {
+ if (task.state != TaskStatus.READY) {
+ continue
}
+
+ incomingTasks += task
+ rootListener.taskReady(task)
+ }
+ }
+
+ // T3 Per task
+ while (taskQueue.isNotEmpty()) {
+ val instance = taskQueue.peek()
+ val host: Node? = available.firstOrNull()
+
+ if (host != null) {
+ // T4 Submit task to machine
+ available -= host
+ instance.state = TaskStatus.ACTIVE
+
+ val newHost = provisioningService.deploy(host, instance.task.image, this)
+ instance.host = newHost
+ taskByServer[newHost.server!!] = instance
+
+ activeTasks += instance
+ taskQueue.poll()
+ rootListener.taskAssigned(instance)
+ } else {
+ break
}
}
}
@@ -189,19 +284,33 @@ class StageWorkflowService(
when (server.state) {
ServerState.ACTIVE -> {
val task = taskByServer.getValue(server)
+ task.startedAt = ctx.clock.millis()
task.job.monitor.onTaskStart(task.job.job, task.task, ctx.clock.millis())
+ rootListener.taskStarted(task)
}
ServerState.SHUTOFF, ServerState.ERROR -> {
val task = taskByServer.remove(server) ?: throw IllegalStateException()
val job = task.job
- task.state = TaskState.FINISHED
+ task.state = TaskStatus.FINISHED
+ task.finishedAt = ctx.clock.millis()
job.tasks.remove(task)
available += task.host!!
+ activeTasks -= task
job.monitor.onTaskFinish(job.job, task.task, 0, ctx.clock.millis())
+ rootListener.taskFinished(task)
+
+ // Add job roots to the scheduling queue
+ for (dependent in task.dependents) {
+ if (dependent.state != TaskStatus.READY) {
+ continue
+ }
+
+ incomingTasks += dependent
+ rootListener.taskReady(dependent)
+ }
if (job.isFinished) {
- activeJobs -= job
- job.monitor.onJobFinish(job.job, ctx.clock.millis())
+ finishJob(job)
}
requestCycle()
@@ -210,56 +319,18 @@ class StageWorkflowService(
}
}
- class JobView(val job: Job, val monitor: WorkflowMonitor) {
- /**
- * A flag to indicate whether this job is finished.
- */
- val isFinished: Boolean
- get() = tasks.isEmpty()
-
- lateinit var tasks: MutableSet<TaskView>
+ private suspend fun finishJob(job: JobState) {
+ activeJobs -= job
+ job.monitor.onJobFinish(job.job, ctx.clock.millis())
+ rootListener.jobFinished(job)
}
- class TaskView(val job: JobView, val task: Task) {
- /**
- * The dependencies of this task.
- */
- val dependencies = HashSet<TaskView>()
-
- /**
- * The dependents of this task.
- */
- val dependents = HashSet<TaskView>()
-
- /**
- * A flag to indicate whether this workflow task instance is a workflow root.
- */
- val isRoot: Boolean
- get() = dependencies.isEmpty()
-
- var state: TaskState = TaskState.CREATED
- set(value) {
- field = value
-
- // Mark the process as terminated in the graph
- if (value == TaskState.FINISHED) {
- markTerminated()
- }
- }
-
- var host: Node? = null
- /**
- * Mark the specified [TaskView] as terminated.
- */
- private fun markTerminated() {
- for (dependent in dependents) {
- dependent.dependencies.remove(this)
+ fun addListener(listener: StageWorkflowSchedulerListener) {
+ rootListener.listeners += listener
+ }
- if (dependent.isRoot) {
- dependent.state = TaskState.READY
- }
- }
- }
+ fun removeListener(listener: StageWorkflowSchedulerListener) {
+ rootListener.listeners -= listener
}
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt
index ee0024f5..a69b5235 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt
@@ -1,7 +1,7 @@
/*
* MIT License
*
- * Copyright (c) 2020 atlarge-research
+ * Copyright (c) 2019 atlarge-research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -24,12 +24,63 @@
package com.atlarge.opendc.workflows.service
-/**
- * The state of a workflow task.
- */
-public enum class TaskState {
- CREATED,
- READY,
- ACTIVE,
- FINISHED
+import com.atlarge.opendc.compute.metal.Node
+import com.atlarge.opendc.workflows.service.StageWorkflowService.TaskView
+import com.atlarge.opendc.workflows.workload.Task
+
+class TaskState(val job: JobState, val task: Task) {
+ /**
+ * The moment in time the task was started.
+ */
+ var startedAt: Long = Long.MIN_VALUE
+
+ /**
+ * The moment in time the task was finished.
+ */
+ var finishedAt: Long = Long.MIN_VALUE
+
+ /**
+ * The dependencies of this task.
+ */
+ val dependencies = HashSet<TaskState>()
+
+ /**
+ * The dependents of this task.
+ */
+ val dependents = HashSet<TaskState>()
+
+ /**
+ * A flag to indicate whether this workflow task instance is a workflow root.
+ */
+ val isRoot: Boolean
+ get() = dependencies.isEmpty()
+
+ var state: TaskStatus = TaskStatus.CREATED
+ set(value) {
+ field = value
+
+ // Mark the process as terminated in the graph
+ if (value == TaskStatus.FINISHED) {
+ markTerminated()
+ }
+ }
+
+ var host: Node? = null
+
+ /**
+ * Mark the specified [TaskView] as terminated.
+ */
+ private fun markTerminated() {
+ for (dependent in dependents) {
+ dependent.dependencies.remove(this)
+
+ if (dependent.isRoot) {
+ dependent.state = TaskStatus.READY
+ }
+ }
+ }
+
+ override fun equals(other: Any?): Boolean = other is TaskState && other.job == job && other.task == task
+
+ override fun hashCode(): Int = task.hashCode()
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskStatus.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskStatus.kt
new file mode 100644
index 00000000..c53c6171
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskStatus.kt
@@ -0,0 +1,35 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service
+
+/**
+ * The state of a workflow task.
+ */
+public enum class TaskStatus {
+ CREATED,
+ READY,
+ ACTIVE,
+ FINISHED
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
index f5060c5c..f5a0f49b 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
@@ -24,17 +24,90 @@
package com.atlarge.opendc.workflows.service
+import com.atlarge.odcsim.processContext
+import com.atlarge.opendc.workflows.service.stage.StagePolicy
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.yield
+
/**
* The operating mode of a workflow scheduler.
*/
-sealed class WorkflowSchedulerMode {
+sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
+ /**
+ * The logic for operating the cycles of a workflow scheduler.
+ */
+ interface Logic {
+ /**
+ * Request a new scheduling cycle to be performed.
+ */
+ suspend fun requestCycle()
+ }
+
/**
* An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received.
*/
- object Interactive : WorkflowSchedulerMode()
+ object Interactive : WorkflowSchedulerMode() {
+ override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
+ override suspend fun requestCycle() {
+ yield()
+ scheduler.schedule()
+ }
+ }
+
+
+ override fun toString(): String = "Interactive"
+ }
/**
* A batch scheduler triggers a scheduling cycle every time quantum if needed.
*/
- data class Batch(val quantum: Long) : WorkflowSchedulerMode()
+ data class Batch(val quantum: Long) : WorkflowSchedulerMode() {
+ private var next: kotlinx.coroutines.Job? = null
+
+ override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
+ override suspend fun requestCycle() {
+ val ctx = processContext
+ if (next == null) {
+ // In batch mode, we assume that the scheduler runs at a fixed slot every time
+ // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot.
+ val delay = quantum - (ctx.clock.millis() % quantum)
+
+ val job = ctx.launch {
+ delay(delay)
+ next = null
+ scheduler.schedule()
+ }
+ next = job
+ }
+ }
+ }
+
+ override fun toString(): String = "Batch($quantum)"
+ }
+
+ /**
+ * A scheduling cycle is triggered at a random point in time.
+ */
+ data class Random(private val random: java.util.Random = java.util.Random(123)) : WorkflowSchedulerMode() {
+ private var next: kotlinx.coroutines.Job? = null
+
+ override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
+ override suspend fun requestCycle() {
+ val ctx = processContext
+ if (next == null) {
+ val delay = random.nextInt(200).toLong()
+
+ val job = ctx.launch {
+ delay(delay)
+ next = null
+ scheduler.schedule()
+ }
+ next = job
+ }
+ }
+ }
+
+ override fun toString(): String = "Random"
+ }
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/StagePolicy.kt
index 976fbbf3..c7cc3d84 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/StagePolicy.kt
@@ -22,16 +22,17 @@
* SOFTWARE.
*/
-package com.atlarge.opendc.workflows.service.stage.job
+package com.atlarge.opendc.workflows.service.stage
import com.atlarge.opendc.workflows.service.StageWorkflowService
+import java.io.Serializable
/**
- * The [FifoJobSortingPolicy] sorts tasks based on the order of arrival in the queue.
+ * A scheduling stage policy.
*/
-class FifoJobSortingPolicy : JobSortingPolicy {
- override fun invoke(
- scheduler: StageWorkflowService,
- jobs: Collection<StageWorkflowService.JobView>
- ): List<StageWorkflowService.JobView> = jobs.toList()
+interface StagePolicy<T : Any> : Serializable {
+ /**
+ * Build the logic of the stage policy.
+ */
+ operator fun invoke(scheduler: StageWorkflowService): T
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt
new file mode 100644
index 00000000..0b3f567a
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt
@@ -0,0 +1,102 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.job
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.workload.Job
+import com.atlarge.opendc.workflows.workload.Task
+
+/**
+ * The [DurationJobOrderPolicy] sorts tasks based on the critical path length of the job.
+ */
+data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> =
+ object : Comparator<JobState>, StageWorkflowSchedulerListener {
+ private val results = HashMap<Job, Double>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ private val Job.duration: Long
+ get() = results[this]!!
+
+ override fun jobSubmitted(job: JobState) {
+ results[job.job] = job.job.toposort().sumByDouble { task ->
+ val estimable = task.application as? Estimable
+ estimable?.estimate(resources) ?: Duration.POSITIVE_INFINITY
+ }
+ }
+
+ override fun jobFinished(job: JobState) {
+ results.remove(job.job)
+ }
+
+ override fun compare(o1: JobState, o2: JobState): Int {
+ return compareValuesBy(o1, o2) { it.job.duration }.let { if (ascending) it else -it }
+ }
+ }
+
+
+ override fun toString(): String {
+ return "Job-Duration(${if (ascending) "asc" else "desc"})"
+ }
+}
+
+/**
+ * Create a topological sorting of the tasks in a job.
+ *
+ * @return The list of tasks within the job topologically sorted.
+ */
+fun Job.toposort(): List<Task> {
+ val res = mutableListOf<Task>()
+ val visited = mutableSetOf<Task>()
+ val adjacent = mutableMapOf<Task, MutableList<Task>>()
+
+ for (task in tasks) {
+ for (dependency in task.dependencies) {
+ adjacent.getOrPut(dependency) { mutableListOf() }.add(task)
+ }
+ }
+
+ fun visit(task: Task) {
+ visited.add(task)
+
+ adjacent[task] ?: emptyList<Task>()
+ .asSequence()
+ .filter { it !in visited }
+ .forEach { visit(it) }
+
+ res.add(task)
+ }
+
+ tasks
+ .asSequence()
+ .filter { it !in visited }
+ .forEach { visit(it) }
+ return res
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt
index cdaad512..a3645523 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt
@@ -24,25 +24,49 @@
package com.atlarge.opendc.workflows.service.stage.job
-import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.stage.StagePolicy
/**
- * A policy interface for admitting [StageWorkflowService.JobView]s to a scheduling cycle.
+ * A policy interface for admitting [JobState]s to a scheduling cycle.
*/
-interface JobAdmissionPolicy {
- /**
- * A method that is invoked at the start of each scheduling cycle.
- *
- * @param scheduler The scheduler that started the cycle.
- */
- fun startCycle(scheduler: StageWorkflowService) {}
+interface JobAdmissionPolicy : StagePolicy<JobAdmissionPolicy.Logic> {
+ interface Logic {
+ /**
+ * Determine whether the specified [JobState] should be admitted to the scheduling cycle.
+ *
+ * @param job The workflow that has been submitted.
+ * @return The advice for admitting the job.
+ */
+ operator fun invoke(job: JobState): Advice
+ }
/**
- * Determine whether the specified [StageWorkflowService.JobView] should be admitted to the scheduling cycle.
+ * The advice given to the scheduler by an admission policy.
*
- * @param scheduler The scheduler that should admit or reject the job.
- * @param job The workflow that has been submitted.
- * @return `true` if the workflow may be admitted to the scheduling cycle, `false` otherwise.
+ * @property admit A flag to indicate to the scheduler that the job should be admitted.
+ * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait
+ * for the next scheduling cycle.
*/
- fun shouldAdmit(scheduler: StageWorkflowService, job: StageWorkflowService.JobView): Boolean
+ enum class Advice(val admit: Boolean, val stop: Boolean) {
+ /**
+ * Admit the current job to the scheduling queue and continue admitting jobs.
+ */
+ ADMIT(true, false),
+
+ /**
+ * Admit the current job to the scheduling queue and stop admitting jobs.
+ */
+ ADMIT_LAST(true, true),
+
+ /**
+ * Deny the current job, but continue admitting jobs.
+ */
+ DENY(false, false),
+
+ /**
+ * Deny the current job and also stop admitting jobs.
+ */
+ STOP(false, true)
+ }
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt
index c3a5dab5..488148af 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt
@@ -24,21 +24,11 @@
package com.atlarge.opendc.workflows.service.stage.job
-import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.stage.StagePolicy
/**
* A policy interface for ordering admitted workflows in the scheduling queue.
*/
-interface JobSortingPolicy {
- /**
- * Sort the given collection of jobs on a given criterion.
- *
- * @param scheduler The scheduler that started the cycle.
- * @param jobs The collection of tasks that should be sorted.
- * @return The sorted list of jobs.
- */
- operator fun invoke(
- scheduler: StageWorkflowService,
- jobs: Collection<StageWorkflowService.JobView>
- ): List<StageWorkflowService.JobView>
-}
+interface JobOrderPolicy : StagePolicy<Comparator<JobState>>
+
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt
new file mode 100644
index 00000000..6b1faf20
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt
@@ -0,0 +1,47 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.job
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+
+/**
+ * A [JobAdmissionPolicy] that limits the amount of active jobs in the system.
+ *
+ * @property limit The maximum number of concurrent jobs in the system.
+ */
+data class LimitJobAdmissionPolicy(val limit: Int) : JobAdmissionPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = object : JobAdmissionPolicy.Logic {
+ override fun invoke(
+ job: JobState
+ ): JobAdmissionPolicy.Advice =
+ if (scheduler.activeJobs.size < limit)
+ JobAdmissionPolicy.Advice.ADMIT
+ else
+ JobAdmissionPolicy.Advice.STOP
+ }
+
+ override fun toString(): String = "Limit-Active($limit)"
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt
new file mode 100644
index 00000000..a58d965f
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt
@@ -0,0 +1,47 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.job
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+
+/**
+ * A [JobAdmissionPolicy] that limits the amount of jobs based on the system load.
+ *
+ * @property limit The maximum load before stopping admission.
+ */
+data class LoadJobAdmissionPolicy(val limit: Double) : JobAdmissionPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = object : JobAdmissionPolicy.Logic {
+ override fun invoke(
+ job: JobState
+ ): JobAdmissionPolicy.Advice =
+ if (scheduler.load < limit)
+ JobAdmissionPolicy.Advice.ADMIT
+ else
+ JobAdmissionPolicy.Advice.STOP
+ }
+
+ override fun toString(): String = "Limit-Load($limit)"
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt
index ad90839c..46888467 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt
@@ -24,17 +24,16 @@
package com.atlarge.opendc.workflows.service.stage.job
+import com.atlarge.opendc.workflows.service.JobState
import com.atlarge.opendc.workflows.service.StageWorkflowService
/**
* A [JobAdmissionPolicy] that admits all jobs.
*/
object NullJobAdmissionPolicy : JobAdmissionPolicy {
- /**
- * Admit every submitted job.
- */
- override fun shouldAdmit(
- scheduler: StageWorkflowService,
- job: StageWorkflowService.JobView
- ): Boolean = true
+ override fun invoke(scheduler: StageWorkflowService) = object : JobAdmissionPolicy.Logic {
+ override fun invoke(job: JobState): JobAdmissionPolicy.Advice = JobAdmissionPolicy.Advice.ADMIT
+ }
+
+ override fun toString(): String = "Always"
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt
new file mode 100644
index 00000000..c1f23376
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt
@@ -0,0 +1,59 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.job
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.workload.Job
+import java.util.Random
+
+object RandomJobOrderPolicy : JobOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> =
+ object : Comparator<JobState>, StageWorkflowSchedulerListener {
+ private val random = Random(123)
+ private val ids = HashMap<Job, Int>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun jobSubmitted(job: JobState) {
+ ids[job.job] = random.nextInt()
+ }
+
+ override fun jobFinished(job: JobState) {
+ ids.remove(job.job)
+ }
+
+ override fun compare(o1: JobState, o2: JobState): Int {
+ return compareValuesBy(o1, o2) { ids.getValue(it.job) }
+ }
+ }
+
+ override fun toString(): String {
+ return "Random"
+ }
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt
new file mode 100644
index 00000000..d6c3155b
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt
@@ -0,0 +1,40 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.job
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+
+/**
+ * The [SizeJobOrderPolicy] sorts tasks based on the order of arrival in the queue.
+ */
+data class SizeJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService) =
+ compareBy<JobState> { it.tasks.size.let { if (ascending) it else -it } }
+
+ override fun toString(): String {
+ return "Job-Size(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt
index 9ce2811c..bf5bff1d 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt
@@ -24,17 +24,17 @@
package com.atlarge.opendc.workflows.service.stage.job
+import com.atlarge.opendc.workflows.service.JobState
import com.atlarge.opendc.workflows.service.StageWorkflowService
-import kotlin.random.Random
/**
- * The [RandomJobSortingPolicy] sorts tasks randomly.
- *
- * @property random The [Random] instance to use when sorting the list of tasks.
+ * The [SubmissionTimeJobOrderPolicy] sorts tasks based on the order of arrival in the queue.
*/
-class RandomJobSortingPolicy(private val random: Random = Random.Default) : JobSortingPolicy {
- override fun invoke(
- scheduler: StageWorkflowService,
- jobs: Collection<StageWorkflowService.JobView>
- ): List<StageWorkflowService.JobView> = jobs.shuffled(random)
+data class SubmissionTimeJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService) =
+ compareBy<JobState> { it.submittedAt.let { if (ascending) it else -it } }
+
+ override fun toString(): String {
+ return "Submission-Time(${if (ascending) "asc" else "desc"})"
+ }
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt
index e2490214..a5671d45 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt
@@ -30,11 +30,10 @@ import com.atlarge.opendc.workflows.service.StageWorkflowService
/**
* A [ResourceSelectionPolicy] that selects the first machine that is available.
*/
-class FirstFitResourceSelectionPolicy : ResourceSelectionPolicy {
- override fun select(
- scheduler: StageWorkflowService,
- machines: List<Node>,
- task: StageWorkflowService.TaskView
- ): Node? =
- machines.firstOrNull()
+object FirstFitResourceSelectionPolicy : ResourceSelectionPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = object : Comparator<Node> {
+ override fun compare(o1: Node, o2: Node): Int = 1
+ }
+
+ override fun toString(): String = "First-Fit"
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt
new file mode 100644
index 00000000..5b0afccf
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt
@@ -0,0 +1,44 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.resource
+
+import com.atlarge.opendc.compute.metal.Node
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+
+/**
+ * A [ResourceFilterPolicy] based on the amount of cores available on the machine and the cores required for
+ * the task.
+ */
+object FunctionalResourceFilterPolicy : ResourceFilterPolicy {
+ override fun invoke(scheduler: StageWorkflowService): ResourceFilterPolicy.Logic =
+ object : ResourceFilterPolicy.Logic {
+ override fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node> =
+ hosts.filter { it in scheduler.available }
+ }
+
+
+ override fun toString(): String = "functional"
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt
index a8f2fda9..5f070e27 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt
@@ -1,7 +1,7 @@
/*
* MIT License
*
- * Copyright (c) 2019 atlarge-research
+ * Copyright (c) 2020 atlarge-research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -26,18 +26,19 @@ package com.atlarge.opendc.workflows.service.stage.resource
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.workflows.service.StageWorkflowService
+import java.util.Random
-/**
- * A [ResourceDynamicFilterPolicy] based on the amount of cores available on the machine and the cores required for
- * the task.
- */
-class FunctionalResourceDynamicFilterPolicy : ResourceDynamicFilterPolicy {
- override fun invoke(
- scheduler: StageWorkflowService,
- machines: List<Node>,
- task: StageWorkflowService.TaskView
- ): List<Node> {
- return machines
- .filter { it in scheduler.available }
+object RandomResourceSelectionPolicy : ResourceSelectionPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = object : Comparator<Node> {
+ private val ids: Map<Node, Long>
+
+ init {
+ val random = Random(123)
+ ids = scheduler.nodes.associateWith { random.nextLong() }
+ }
+
+ override fun compare(o1: Node, o2: Node): Int = compareValuesBy(o1, o2) { ids[it] }
}
+
+ override fun toString(): String = "Random"
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt
index 8d8ceec2..28ef970f 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt
@@ -1,7 +1,7 @@
/*
* MIT License
*
- * Copyright (c) 2019 atlarge-research
+ * Copyright (c) 2020 atlarge-research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -25,25 +25,23 @@
package com.atlarge.opendc.workflows.service.stage.resource
import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+import com.atlarge.opendc.workflows.service.stage.StagePolicy
/**
- * This interface represents the **R4** stage of the Reference Architecture for Schedulers and acts as a filter yielding
- * a list of resources with sufficient resource-capacities, based on fixed or dynamic requirements, and on predicted or
- * monitored information about processing unit availability, memory occupancy, etc.
+ * This interface represents stages **R2**, **R3** and **R4** stage of the Reference Architecture for Schedulers and
+ * acts as a filter yielding a list of resources with sufficient resource-capacities, based on fixed or dynamic
+ * requirements, and on predicted or monitored information about processing unit availability, memory occupancy, etc.
*/
-interface ResourceDynamicFilterPolicy {
- /**
- * Filter the list of machines based on dynamic information.
- *
- * @param scheduler The scheduler to filter the machines.
- * @param machines The list of machines in the system.
- * @param task The task that is to be scheduled.
- * @return The machines on which the task can be scheduled.
- */
- operator fun invoke(
- scheduler: StageWorkflowService,
- machines: List<Node>,
- task: StageWorkflowService.TaskView
- ): List<Node>
+interface ResourceFilterPolicy : StagePolicy<ResourceFilterPolicy.Logic> {
+ interface Logic {
+ /**
+ * Filter the list of machines based on dynamic information.
+ *
+ * @param hosts The hosts to filter.
+ * @param task The task that is to be scheduled.
+ * @return The machines on which the task can be scheduled.
+ */
+ operator fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node>
+ }
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt
index 38fe5886..43053097 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt
@@ -25,24 +25,10 @@
package com.atlarge.opendc.workflows.service.stage.resource
import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.stage.StagePolicy
/**
* This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected
* task with a (set of) resource(s), using policies such as First-Fit, Worst-Fit, and Best-Fit.
*/
-interface ResourceSelectionPolicy {
- /**
- * Select a machine on which the task should be scheduled.
- *
- * @param scheduler The scheduler to select the machine.
- * @param machines The list of machines in the system.
- * @param task The task that is to be scheduled.
- * @return The selected machine or `null` if no machine could be found.
- */
- fun select(
- scheduler: StageWorkflowService,
- machines: List<Node>,
- task: StageWorkflowService.TaskView
- ): Node?
-}
+interface ResourceSelectionPolicy : StagePolicy<Comparator<Node>>
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt
new file mode 100644
index 00000000..15f0d0dc
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt
@@ -0,0 +1,67 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+
+data class ActiveTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> =
+ object : Comparator<TaskState>, StageWorkflowSchedulerListener {
+ private val active = mutableMapOf<JobState, Int>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun jobStarted(job: JobState) {
+ active[job] = 0
+ }
+
+ override fun jobFinished(job: JobState) {
+ active.remove(job)
+ }
+
+ override fun taskAssigned(task: TaskState) {
+ active.merge(task.job, 1, Int::plus)
+ }
+
+ override fun taskFinished(task: TaskState) {
+ active.merge(task.job, -1, Int::plus)
+ }
+
+ override fun compare(o1: TaskState, o2: TaskState): Int {
+ return compareValuesBy(o1, o2) { active.getValue(it.job) }.let {
+ if (ascending) it else -it
+ }
+ }
+ }
+
+ override fun toString(): String {
+ return "Active-Per-Job(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt
new file mode 100644
index 00000000..f6311644
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt
@@ -0,0 +1,71 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+import kotlin.math.max
+
+data class BalancingTaskEligibilityPolicy(val tolerance: Double = 1.5) : TaskEligibilityPolicy {
+ override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic =
+ object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener {
+ private val active = mutableMapOf<JobState, Int>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun jobStarted(job: JobState) {
+ active[job] = 0
+ }
+
+ override fun jobFinished(job: JobState) {
+ active.remove(job)
+ }
+
+ override fun taskAssigned(task: TaskState) {
+ active.merge(task.job, 1, Int::plus)
+ }
+
+ override fun taskFinished(task: TaskState) {
+ active.merge(task.job, -1, Int::plus)
+ }
+
+ override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice {
+ val activeJobs = scheduler.activeJobs.size
+ val activeTasks = scheduler.activeTasks.size
+ val baseline = max(activeTasks / activeJobs.toDouble(), 1.0)
+ val activeForJob = active[task.job]!!
+ return if ((activeForJob + 1) / baseline < tolerance)
+ TaskEligibilityPolicy.Advice.ADMIT
+ else
+ TaskEligibilityPolicy.Advice.DENY
+ }
+ }
+
+ override fun toString(): String = "Job-Balance($tolerance)"
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt
new file mode 100644
index 00000000..29d6f6b5
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt
@@ -0,0 +1,63 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+
+data class CompletionTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> =
+ object : Comparator<TaskState>, StageWorkflowSchedulerListener {
+ private val finished = mutableMapOf<JobState, Int>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun jobStarted(job: JobState) {
+ finished[job] = 0
+ }
+
+ override fun jobFinished(job: JobState) {
+ finished.remove(job)
+ }
+
+ override fun taskFinished(task: TaskState) {
+ finished.merge(task.job, 1, Int::plus)
+ }
+
+ override fun compare(o1: TaskState, o2: TaskState): Int {
+ return compareValuesBy(o1, o2) { finished.getValue(it.job) / it.job.tasks.size.toDouble() }.let {
+ if (ascending) it else -it
+ }
+ }
+ }
+
+ override fun toString(): String {
+ return "Job-Completion(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt
new file mode 100644
index 00000000..e49c4858
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt
@@ -0,0 +1,41 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+
+/**
+ * The [DependenciesTaskOrderPolicy] sorts tasks based on the number of dependency tasks it has.
+ */
+data class DependenciesTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = compareBy<TaskState> {
+ it.task.dependencies.size.let { if (ascending) it else -it }
+ }
+
+ override fun toString(): String {
+ return "Task-Dependencies(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt
index 1b1d5b44..9dcaecb2 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt
@@ -1,7 +1,7 @@
/*
* MIT License
*
- * Copyright (c) 2019 atlarge-research
+ * Copyright (c) 2020 atlarge-research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -25,16 +25,17 @@
package com.atlarge.opendc.workflows.service.stage.task
import com.atlarge.opendc.workflows.service.StageWorkflowService
-import kotlin.random.Random
+import com.atlarge.opendc.workflows.service.TaskState
/**
- * The [RandomTaskSortingPolicy] sorts tasks randomly.
- *
- * @property random The [Random] instance to use when sorting the list of tasks.
+ * The [DependentsTaskOrderPolicy] sorts tasks based on the number of dependent tasks it has.
*/
-class RandomTaskSortingPolicy(private val random: Random = Random.Default) : TaskSortingPolicy {
- override fun invoke(
- scheduler: StageWorkflowService,
- tasks: Collection<StageWorkflowService.TaskView>
- ): List<StageWorkflowService.TaskView> = tasks.shuffled(random)
+data class DependentsTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = compareBy<TaskState> {
+ it.dependents.size.let { if (ascending) it else -it }
+ }
+
+ override fun toString(): String {
+ return "Task-Dependents(${if (ascending) "asc" else "desc"})"
+ }
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt
new file mode 100644
index 00000000..ea6bf541
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt
@@ -0,0 +1,69 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+
+data class DurationHistoryTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy {
+
+ override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> =
+ object : Comparator<TaskState>, StageWorkflowSchedulerListener {
+ private val results = HashMap<JobState, MutableList<Long>>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun jobStarted(job: JobState) {
+ results[job] = mutableListOf()
+ }
+
+ override fun jobFinished(job: JobState) {
+ results.remove(job)
+ }
+
+ override fun taskFinished(task: TaskState) {
+ results.getValue(task.job) += task.finishedAt - task.startedAt
+ }
+
+ override fun compare(o1: TaskState, o2: TaskState): Int {
+ return compareValuesBy(o1, o2) { key ->
+ val history = results.getValue(key.job)
+ if (history.isEmpty()) {
+ Long.MAX_VALUE
+ } else {
+ history.average()
+ }
+ }.let { if (ascending) it else -it }
+ }
+ }
+
+ override fun toString(): String {
+ return "Task-Duration-History(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt
new file mode 100644
index 00000000..25214367
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt
@@ -0,0 +1,67 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+import java.util.UUID
+
+/**
+ * The [DurationTaskOrderPolicy] sorts tasks based on the duration of the task.
+ */
+data class DurationTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy {
+
+ override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> =
+ object : Comparator<TaskState>, StageWorkflowSchedulerListener {
+ private val results = HashMap<UUID, Double>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun taskReady(task: TaskState) {
+ val estimable = task.task.application as? Estimable
+ results[task.task.uid] = estimable?.estimate(resources) ?: Duration.POSITIVE_INFINITY
+ }
+
+ override fun taskFinished(task: TaskState) {
+ results -= task.task.uid
+ }
+
+ private val TaskState.duration: Double
+ get() = results.getValue(task.uid)
+
+ override fun compare(o1: TaskState, o2: TaskState): Int {
+ return compareValuesBy(o1, o2) { state -> state.duration }.let {
+ if (ascending) it else -it
+ }
+ }
+ }
+
+ override fun toString(): String {
+ return "Task-Duration(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt
new file mode 100644
index 00000000..330b191e
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt
@@ -0,0 +1,67 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+
+data class LimitPerJobTaskEligibilityPolicy(val limit: Int): TaskEligibilityPolicy {
+ override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic =
+ object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener {
+ private val active = mutableMapOf<JobState, Int>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun jobStarted(job: JobState) {
+ active[job] = 0
+ }
+
+ override fun jobFinished(job: JobState) {
+ active.remove(job)
+ }
+
+ override fun taskAssigned(task: TaskState) {
+ active.merge(task.job, 1, Int::plus)
+ }
+
+ override fun taskFinished(task: TaskState) {
+ active.merge(task.job, -1, Int::plus)
+ }
+
+ override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice {
+ val activeForJob = active[task.job]!!
+ return if (activeForJob <= limit)
+ TaskEligibilityPolicy.Advice.ADMIT
+ else
+ TaskEligibilityPolicy.Advice.DENY
+ }
+ }
+
+ override fun toString(): String = "Limit-Active-Job($limit)"
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt
index aabc44a9..1e8a8158 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt
@@ -1,7 +1,7 @@
/*
* MIT License
*
- * Copyright (c) 2019 atlarge-research
+ * Copyright (c) 2020 atlarge-research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -25,21 +25,19 @@
package com.atlarge.opendc.workflows.service.stage.task
import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
-/**
- * This interface represents the **T2** stage of the Reference Architecture for Datacenter Schedulers and provides the
- * scheduler with a sorted list of tasks to schedule.
- */
-interface TaskSortingPolicy {
- /**
- * Sort the given list of tasks on a given criterion.
- *
- * @param scheduler The scheduler that is sorting the tasks.
- * @param tasks The collection of tasks that should be sorted.
- * @return The sorted list of tasks.
- */
- operator fun invoke(
- scheduler: StageWorkflowService,
- tasks: Collection<StageWorkflowService.TaskView>
- ): List<StageWorkflowService.TaskView>
+data class LimitTaskEligibilityPolicy(val limit: Int): TaskEligibilityPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic {
+ override fun invoke(
+ task: TaskState
+ ): TaskEligibilityPolicy.Advice =
+ if (scheduler.activeTasks.size < limit)
+ TaskEligibilityPolicy.Advice.ADMIT
+ else
+ TaskEligibilityPolicy.Advice.STOP
+ }
+
+
+ override fun toString(): String = "Limit-Active($limit)"
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt
new file mode 100644
index 00000000..a53ab8a5
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt
@@ -0,0 +1,43 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+
+data class LoadTaskEligibilityPolicy(val limit: Double): TaskEligibilityPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic {
+ override fun invoke(
+ task: TaskState
+ ): TaskEligibilityPolicy.Advice =
+ if (scheduler.load < limit)
+ TaskEligibilityPolicy.Advice.ADMIT
+ else
+ TaskEligibilityPolicy.Advice.STOP
+ }
+
+
+ override fun toString(): String = "Limit-Load($limit)"
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt
index 72ecbee2..3dc86e85 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt
@@ -1,7 +1,7 @@
/*
* MIT License
*
- * Copyright (c) 2019 atlarge-research
+ * Copyright (c) 2020 atlarge-research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -30,9 +30,14 @@ import com.atlarge.opendc.workflows.service.TaskState
/**
* A [TaskEligibilityPolicy] that marks tasks as eligible if they are tasks roots within the job.
*/
-class FunctionalTaskEligibilityPolicy : TaskEligibilityPolicy {
- override fun isEligible(
- scheduler: StageWorkflowService,
- task: StageWorkflowService.TaskView
- ): Boolean = task.state == TaskState.READY
+object NullTaskEligibilityPolicy : TaskEligibilityPolicy {
+ override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = Logic
+
+ private object Logic : TaskEligibilityPolicy.Logic {
+ override fun invoke(
+ task: TaskState
+ ): TaskEligibilityPolicy.Advice = TaskEligibilityPolicy.Advice.ADMIT
+ }
+
+ override fun toString(): String = "Always"
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt
new file mode 100644
index 00000000..2aa41a2b
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt
@@ -0,0 +1,45 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+import java.util.Random
+
+data class RandomTaskEligibilityPolicy(val probability: Double = 0.5): TaskEligibilityPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic {
+ val random = Random(123)
+
+ override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice =
+ if (random.nextDouble() <= probability || scheduler.activeTasks.isEmpty())
+ TaskEligibilityPolicy.Advice.ADMIT
+ else {
+ TaskEligibilityPolicy.Advice.DENY
+ }
+ }
+
+
+ override fun toString(): String = "Random($probability)"
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt
new file mode 100644
index 00000000..60aad9a7
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt
@@ -0,0 +1,64 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+import com.atlarge.opendc.workflows.workload.Task
+import kotlin.random.Random
+
+/**
+ * The [RandomTaskOrderPolicy] sorts tasks randomly.
+ *
+ * @property random The [Random] instance to use when sorting the list of tasks.
+ */
+object RandomTaskOrderPolicy : TaskOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> =
+ object : Comparator<TaskState>, StageWorkflowSchedulerListener {
+ private val random = Random(123)
+ private val ids = HashMap<Task, Int>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun taskReady(task: TaskState) {
+ ids[task.task] = random.nextInt()
+ }
+
+ override fun taskFinished(task: TaskState) {
+ ids.remove(task.task)
+ }
+
+ override fun compare(o1: TaskState, o2: TaskState): Int {
+ return compareValuesBy(o1, o2) { ids.getValue(it.task) }
+ }
+ }
+
+ override fun toString(): String {
+ return "Random"
+ }
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt
new file mode 100644
index 00000000..998d782b
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt
@@ -0,0 +1,41 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+
+/**
+ * The [SubmissionTimeTaskOrderPolicy] sorts tasks based on the order of arrival in the queue.
+ */
+data class SubmissionTimeTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = compareBy<TaskState> {
+ it.job.submittedAt.let { if (ascending) it else -it }
+ }
+
+ override fun toString(): String {
+ return "Submission-Time(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt
index 19954d7b..75529005 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt
@@ -24,25 +24,49 @@
package com.atlarge.opendc.workflows.service.stage.task
-import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+import com.atlarge.opendc.workflows.service.stage.StagePolicy
/**
* A policy interface for determining the eligibility of tasks in a scheduling cycle.
*/
-interface TaskEligibilityPolicy {
- /**
- * A method that is invoked at the start of each scheduling cycle.
- *
- * @param scheduler The scheduler that started the cycle.
- */
- fun startCycle(scheduler: StageWorkflowService) {}
+interface TaskEligibilityPolicy : StagePolicy<TaskEligibilityPolicy.Logic> {
+ interface Logic {
+ /**
+ * Determine whether the specified [TaskState] is eligible to be scheduled.
+ *
+ * @param task The task instance to schedule.
+ * @return The advice for marking the task.
+ */
+ operator fun invoke(task: TaskState): Advice
+ }
/**
- * Determine whether the specified [StageWorkflowService.TaskView] is eligible to be scheduled.
+ * The advice given to the scheduler by an admission policy.
*
- * @param scheduler The scheduler that is determining whether the task is eligible.
- * @param task The task instance to schedule.
- * @return `true` if the task eligible to be scheduled, `false` otherwise.
+ * @property admit A flag to indicate to the scheduler that the job should be admitted.
+ * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait
+ * for the next scheduling cycle.
*/
- fun isEligible(scheduler: StageWorkflowService, task: StageWorkflowService.TaskView): Boolean
+ enum class Advice(val admit: Boolean, val stop: Boolean) {
+ /**
+ * Admit the current job to the scheduling queue and continue admitting jobs.
+ */
+ ADMIT(true, false),
+
+ /**
+ * Admit the current job to the scheduling queue and stop admitting jobs.
+ */
+ ADMIT_LAST(true, true),
+
+ /**
+ * Deny the current job, but continue admitting jobs.
+ */
+ DENY(false, false),
+
+ /**
+ * Deny the current job and also stop admitting jobs.
+ */
+ STOP(false, true)
+ }
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskOrderPolicy.kt
index bba81d27..bfdc7924 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskOrderPolicy.kt
@@ -1,7 +1,7 @@
/*
* MIT License
*
- * Copyright (c) 2019 atlarge-research
+ * Copyright (c) 2020 atlarge-research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -24,14 +24,11 @@
package com.atlarge.opendc.workflows.service.stage.task
-import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+import com.atlarge.opendc.workflows.service.stage.StagePolicy
/**
- * The [FifoTaskSortingPolicy] sorts tasks based on the order of arrival in the queue.
+ * This interface represents the **T2** stage of the Reference Architecture for Datacenter Schedulers and provides the
+ * scheduler with a sorted list of tasks to schedule.
*/
-class FifoTaskSortingPolicy : TaskSortingPolicy {
- override fun invoke(
- scheduler: StageWorkflowService,
- tasks: Collection<StageWorkflowService.TaskView>
- ): List<StageWorkflowService.TaskView> = tasks.toList()
-}
+interface TaskOrderPolicy : StagePolicy<Comparator<TaskState>>