summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-02-24 17:16:39 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-02-24 17:23:51 +0100
commitc232f6260e6d41ce5e9ac1c930a050690680c704 (patch)
tree69c3f1bba2e47ef513188825db91903df8cf1ac6 /opendc
parente5345dc2192f88ba1d5949eca60ae505759038e0 (diff)
feat: Add support for workflow tasks with known duration
This change adds support for workflow tasks that have a known duration. This allows the workflow scheduler to employ heuristics for a faster schedule.
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt18
-rw-r--r--opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt4
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt23
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt1
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt1
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt12
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt2
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt1
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt1
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt9
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt2
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt3
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt3
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt3
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt2
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt4
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Metadata.kt30
-rw-r--r--opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt4
18 files changed, 82 insertions, 41 deletions
diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
index 415221ca..96796c07 100644
--- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
@@ -31,12 +31,12 @@ import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
import com.atlarge.opendc.workflows.monitor.WorkflowMonitor
import com.atlarge.opendc.workflows.service.StageWorkflowService
import com.atlarge.opendc.workflows.service.WorkflowSchedulerMode
-import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
+import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy
-import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceDynamicFilterPolicy
-import com.atlarge.opendc.workflows.service.stage.task.FifoTaskSortingPolicy
-import com.atlarge.opendc.workflows.service.stage.task.FunctionalTaskEligibilityPolicy
+import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
+import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
+import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
import com.atlarge.opendc.workflows.workload.Job
import com.atlarge.opendc.workflows.workload.Task
import kotlinx.coroutines.channels.Channel
@@ -92,11 +92,11 @@ fun main(args: Array<String>) {
environment.platforms[0].zones[0].services[ProvisioningService.Key],
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
- jobSortingPolicy = SubmissionTimeJobOrderPolicy(),
- taskEligibilityPolicy = FunctionalTaskEligibilityPolicy(),
- taskSortingPolicy = FifoTaskSortingPolicy(),
- resourceDynamicFilterPolicy = FunctionalResourceDynamicFilterPolicy(),
- resourceSelectionPolicy = FirstFitResourceSelectionPolicy()
+ jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
+ taskEligibilityPolicy = NullTaskEligibilityPolicy,
+ taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
+ resourceFilterPolicy = FunctionalResourceFilterPolicy,
+ resourceSelectionPolicy = FirstFitResourceSelectionPolicy
)
val reader = GwfTraceReader(File(args[0]))
diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt
index 498e2d1d..3a4e2e89 100644
--- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt
+++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt
@@ -30,6 +30,7 @@ import com.atlarge.opendc.format.trace.TraceEntry
import com.atlarge.opendc.format.trace.TraceReader
import com.atlarge.opendc.workflows.workload.Job
import com.atlarge.opendc.workflows.workload.Task
+import com.atlarge.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE
import java.io.BufferedReader
import java.io.File
import java.io.InputStream
@@ -121,7 +122,8 @@ class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
val task = Task(
UUID(0L, taskId), "<unnamed>",
FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), flops, cores),
- HashSet()
+ HashSet(),
+ mapOf(WORKFLOW_TASK_DEADLINE to runtime)
)
entry.submissionTime = min(entry.submissionTime, submitTime)
(workflow.tasks as MutableSet<Task>).add(task)
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
index e19f5446..48f06bcd 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -38,9 +38,9 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli
import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy
import com.atlarge.opendc.workflows.workload.Job
-import kotlinx.coroutines.launch
import java.util.PriorityQueue
import java.util.Queue
+import kotlinx.coroutines.launch
/**
* A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for
@@ -88,7 +88,6 @@ class StageWorkflowService(
*/
internal val activeTasks: MutableSet<TaskState> = mutableSetOf()
-
/**
* The running tasks by [Server].
*/
@@ -104,7 +103,6 @@ class StageWorkflowService(
*/
internal val available: MutableSet<Node> = mutableSetOf()
-
/**
* The maximum number of incoming jobs.
*/
@@ -180,7 +178,7 @@ class StageWorkflowService(
this.taskEligibilityPolicy = taskEligibilityPolicy(this)
this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid })
this.resourceFilterPolicy = resourceFilterPolicy(this)
- this.resourceSelectionPolicy = resourceSelectionPolicy(this)
+ this.resourceSelectionPolicy = resourceSelectionPolicy(this)
}
override suspend fun submit(job: Job, monitor: WorkflowMonitor) {
@@ -209,7 +207,6 @@ class StageWorkflowService(
requestCycle()
}
-
/**
* Indicate to the scheduler that a scheduling cycle is needed.
*/
@@ -257,6 +254,21 @@ class StageWorkflowService(
}
}
+ // T1 Create list of eligible tasks
+ val taskIterator = incomingTasks.iterator()
+ while (taskIterator.hasNext()) {
+ val taskInstance = taskIterator.next()
+ val advice = taskEligibilityPolicy(taskInstance)
+ if (advice.stop) {
+ break
+ } else if (!advice.admit) {
+ continue
+ }
+
+ taskIterator.remove()
+ taskQueue.add(taskInstance)
+ }
+
// T3 Per task
while (taskQueue.isNotEmpty()) {
val instance = taskQueue.peek()
@@ -325,7 +337,6 @@ class StageWorkflowService(
rootListener.jobFinished(job)
}
-
fun addListener(listener: StageWorkflowSchedulerListener) {
rootListener.listeners += listener
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt
index a69b5235..acd5731b 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt
@@ -25,7 +25,6 @@
package com.atlarge.opendc.workflows.service
import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.workflows.service.StageWorkflowService.TaskView
import com.atlarge.opendc.workflows.workload.Task
class TaskState(val job: JobState, val task: Task) {
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
index f5a0f49b..cfec93b5 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
@@ -55,7 +55,6 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
}
}
-
override fun toString(): String = "Interactive"
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt
index 0b3f567a..6581b7d3 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt
@@ -29,6 +29,7 @@ import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
import com.atlarge.opendc.workflows.service.StageWorkflowService
import com.atlarge.opendc.workflows.workload.Job
import com.atlarge.opendc.workflows.workload.Task
+import com.atlarge.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE
/**
* The [DurationJobOrderPolicy] sorts tasks based on the critical path length of the job.
@@ -36,7 +37,7 @@ import com.atlarge.opendc.workflows.workload.Task
data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy {
override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> =
object : Comparator<JobState>, StageWorkflowSchedulerListener {
- private val results = HashMap<Job, Double>()
+ private val results = HashMap<Job, Long>()
init {
scheduler.addListener(this)
@@ -46,10 +47,10 @@ data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolic
get() = results[this]!!
override fun jobSubmitted(job: JobState) {
- results[job.job] = job.job.toposort().sumByDouble { task ->
- val estimable = task.application as? Estimable
- estimable?.estimate(resources) ?: Duration.POSITIVE_INFINITY
- }
+ results[job.job] = job.job.toposort().map { task ->
+ val estimable = task.metadata[WORKFLOW_TASK_DEADLINE] as? Long?
+ estimable ?: Long.MAX_VALUE
+ }.sum()
}
override fun jobFinished(job: JobState) {
@@ -61,7 +62,6 @@ data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolic
}
}
-
override fun toString(): String {
return "Job-Duration(${if (ascending) "asc" else "desc"})"
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt
index a3645523..535d7792 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt
@@ -45,7 +45,7 @@ interface JobAdmissionPolicy : StagePolicy<JobAdmissionPolicy.Logic> {
* The advice given to the scheduler by an admission policy.
*
* @property admit A flag to indicate to the scheduler that the job should be admitted.
- * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait
+ * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait
* for the next scheduling cycle.
*/
enum class Advice(val admit: Boolean, val stop: Boolean) {
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt
index 488148af..ba57f064 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt
@@ -31,4 +31,3 @@ import com.atlarge.opendc.workflows.service.stage.StagePolicy
* A policy interface for ordering admitted workflows in the scheduling queue.
*/
interface JobOrderPolicy : StagePolicy<Comparator<JobState>>
-
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt
index 5b0afccf..0e83d8d7 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt
@@ -39,6 +39,5 @@ object FunctionalResourceFilterPolicy : ResourceFilterPolicy {
hosts.filter { it in scheduler.available }
}
-
override fun toString(): String = "functional"
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt
index 25214367..96ee233e 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt
@@ -27,6 +27,7 @@ package com.atlarge.opendc.workflows.service.stage.task
import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
import com.atlarge.opendc.workflows.service.StageWorkflowService
import com.atlarge.opendc.workflows.service.TaskState
+import com.atlarge.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE
import java.util.UUID
/**
@@ -36,22 +37,22 @@ data class DurationTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPol
override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> =
object : Comparator<TaskState>, StageWorkflowSchedulerListener {
- private val results = HashMap<UUID, Double>()
+ private val results = HashMap<UUID, Long>()
init {
scheduler.addListener(this)
}
override fun taskReady(task: TaskState) {
- val estimable = task.task.application as? Estimable
- results[task.task.uid] = estimable?.estimate(resources) ?: Duration.POSITIVE_INFINITY
+ val deadline = task.task.metadata[WORKFLOW_TASK_DEADLINE] as? Long?
+ results[task.task.uid] = deadline ?: Long.MAX_VALUE
}
override fun taskFinished(task: TaskState) {
results -= task.task.uid
}
- private val TaskState.duration: Double
+ private val TaskState.duration: Long
get() = results.getValue(task.uid)
override fun compare(o1: TaskState, o2: TaskState): Int {
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt
index 330b191e..8202c4cd 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt
@@ -29,7 +29,7 @@ import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
import com.atlarge.opendc.workflows.service.StageWorkflowService
import com.atlarge.opendc.workflows.service.TaskState
-data class LimitPerJobTaskEligibilityPolicy(val limit: Int): TaskEligibilityPolicy {
+data class LimitPerJobTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy {
override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic =
object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener {
private val active = mutableMapOf<JobState, Int>()
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt
index 1e8a8158..29324a27 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt
@@ -27,7 +27,7 @@ package com.atlarge.opendc.workflows.service.stage.task
import com.atlarge.opendc.workflows.service.StageWorkflowService
import com.atlarge.opendc.workflows.service.TaskState
-data class LimitTaskEligibilityPolicy(val limit: Int): TaskEligibilityPolicy {
+data class LimitTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy {
override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic {
override fun invoke(
task: TaskState
@@ -38,6 +38,5 @@ data class LimitTaskEligibilityPolicy(val limit: Int): TaskEligibilityPolicy {
TaskEligibilityPolicy.Advice.STOP
}
-
override fun toString(): String = "Limit-Active($limit)"
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt
index a53ab8a5..3f5232d4 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt
@@ -27,7 +27,7 @@ package com.atlarge.opendc.workflows.service.stage.task
import com.atlarge.opendc.workflows.service.StageWorkflowService
import com.atlarge.opendc.workflows.service.TaskState
-data class LoadTaskEligibilityPolicy(val limit: Double): TaskEligibilityPolicy {
+data class LoadTaskEligibilityPolicy(val limit: Double) : TaskEligibilityPolicy {
override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic {
override fun invoke(
task: TaskState
@@ -38,6 +38,5 @@ data class LoadTaskEligibilityPolicy(val limit: Double): TaskEligibilityPolicy {
TaskEligibilityPolicy.Advice.STOP
}
-
override fun toString(): String = "Limit-Load($limit)"
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt
index 2aa41a2b..c15ec741 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt
@@ -28,7 +28,7 @@ import com.atlarge.opendc.workflows.service.StageWorkflowService
import com.atlarge.opendc.workflows.service.TaskState
import java.util.Random
-data class RandomTaskEligibilityPolicy(val probability: Double = 0.5): TaskEligibilityPolicy {
+data class RandomTaskEligibilityPolicy(val probability: Double = 0.5) : TaskEligibilityPolicy {
override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic {
val random = Random(123)
@@ -40,6 +40,5 @@ data class RandomTaskEligibilityPolicy(val probability: Double = 0.5): TaskEligi
}
}
-
override fun toString(): String = "Random($probability)"
}
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt
index 75529005..72a7fdd0 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt
@@ -45,7 +45,7 @@ interface TaskEligibilityPolicy : StagePolicy<TaskEligibilityPolicy.Logic> {
* The advice given to the scheduler by an admission policy.
*
* @property admit A flag to indicate to the scheduler that the job should be admitted.
- * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait
+ * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait
* for the next scheduling cycle.
*/
enum class Advice(val admit: Boolean, val stop: Boolean) {
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt
index dece875c..40389ce2 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt
@@ -35,12 +35,14 @@ import java.util.UUID
* @property name The name of this workflow.
* @property owner The owner of the workflow.
* @property tasks The tasks that are part of this workflow.
+ * @property metadata Additional metadata for the job.
*/
data class Job(
override val uid: UUID,
override val name: String,
override val owner: User,
- val tasks: Set<Task>
+ val tasks: Set<Task>,
+ val metadata: Map<String, Any> = emptyMap()
) : Workload {
override fun equals(other: Any?): Boolean = other is Job && uid == other.uid
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Metadata.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Metadata.kt
new file mode 100644
index 00000000..067f1179
--- /dev/null
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Metadata.kt
@@ -0,0 +1,30 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.workload
+
+/**
+ * Meta-data key for the deadline of a task.
+ */
+const val WORKFLOW_TASK_DEADLINE = "workflow:task:deadline"
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt
index b5997b35..82521faa 100644
--- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt
+++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt
@@ -35,12 +35,14 @@ import java.util.UUID
* @property name The name of this task.
* @property image The application image to run as part of this workflow task.
* @property dependencies The dependencies of this task in order for it to execute.
+ * @property metadata Additional metadata for this task.
*/
data class Task(
override val uid: UUID,
override val name: String,
val image: Image,
- val dependencies: Set<Task>
+ val dependencies: Set<Task>,
+ val metadata: Map<String, Any> = emptyMap()
) : Identity {
override fun equals(other: Any?): Boolean = other is Task && uid == other.uid