diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-02-24 17:16:39 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-02-24 17:23:51 +0100 |
| commit | c232f6260e6d41ce5e9ac1c930a050690680c704 (patch) | |
| tree | 69c3f1bba2e47ef513188825db91903df8cf1ac6 | |
| parent | e5345dc2192f88ba1d5949eca60ae505759038e0 (diff) | |
feat: Add support for workflow tasks with known duration
This change adds support for workflow tasks that have a known duration.
This allows the workflow scheduler to employ heuristics for a faster
schedule.
18 files changed, 82 insertions, 41 deletions
diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt index 415221ca..96796c07 100644 --- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt +++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt @@ -31,12 +31,12 @@ import com.atlarge.opendc.format.trace.gwf.GwfTraceReader import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.WorkflowSchedulerMode -import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy +import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy -import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceDynamicFilterPolicy -import com.atlarge.opendc.workflows.service.stage.task.FifoTaskSortingPolicy -import com.atlarge.opendc.workflows.service.stage.task.FunctionalTaskEligibilityPolicy +import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy +import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy +import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy import com.atlarge.opendc.workflows.workload.Job import com.atlarge.opendc.workflows.workload.Task import kotlinx.coroutines.channels.Channel @@ -92,11 +92,11 @@ fun main(args: Array<String>) { environment.platforms[0].zones[0].services[ProvisioningService.Key], mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, - jobSortingPolicy = SubmissionTimeJobOrderPolicy(), - taskEligibilityPolicy = FunctionalTaskEligibilityPolicy(), - taskSortingPolicy = FifoTaskSortingPolicy(), - resourceDynamicFilterPolicy = FunctionalResourceDynamicFilterPolicy(), - resourceSelectionPolicy = FirstFitResourceSelectionPolicy() + jobOrderPolicy = SubmissionTimeJobOrderPolicy(), + taskEligibilityPolicy = NullTaskEligibilityPolicy, + taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), + resourceFilterPolicy = FunctionalResourceFilterPolicy, + resourceSelectionPolicy = FirstFitResourceSelectionPolicy ) val reader = GwfTraceReader(File(args[0])) diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt index 498e2d1d..3a4e2e89 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/gwf/GwfTraceReader.kt @@ -30,6 +30,7 @@ import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader import com.atlarge.opendc.workflows.workload.Job import com.atlarge.opendc.workflows.workload.Task +import com.atlarge.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE import java.io.BufferedReader import java.io.File import java.io.InputStream @@ -121,7 +122,8 @@ class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> { val task = Task( UUID(0L, taskId), "<unnamed>", FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), flops, cores), - HashSet() + HashSet(), + mapOf(WORKFLOW_TASK_DEADLINE to runtime) ) entry.submissionTime = min(entry.submissionTime, submitTime) (workflow.tasks as MutableSet<Task>).add(task) diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index e19f5446..48f06bcd 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -38,9 +38,9 @@ import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPoli import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy import com.atlarge.opendc.workflows.workload.Job -import kotlinx.coroutines.launch import java.util.PriorityQueue import java.util.Queue +import kotlinx.coroutines.launch /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for @@ -88,7 +88,6 @@ class StageWorkflowService( */ internal val activeTasks: MutableSet<TaskState> = mutableSetOf() - /** * The running tasks by [Server]. */ @@ -104,7 +103,6 @@ class StageWorkflowService( */ internal val available: MutableSet<Node> = mutableSetOf() - /** * The maximum number of incoming jobs. */ @@ -180,7 +178,7 @@ class StageWorkflowService( this.taskEligibilityPolicy = taskEligibilityPolicy(this) this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid }) this.resourceFilterPolicy = resourceFilterPolicy(this) - this.resourceSelectionPolicy = resourceSelectionPolicy(this) + this.resourceSelectionPolicy = resourceSelectionPolicy(this) } override suspend fun submit(job: Job, monitor: WorkflowMonitor) { @@ -209,7 +207,6 @@ class StageWorkflowService( requestCycle() } - /** * Indicate to the scheduler that a scheduling cycle is needed. */ @@ -257,6 +254,21 @@ class StageWorkflowService( } } + // T1 Create list of eligible tasks + val taskIterator = incomingTasks.iterator() + while (taskIterator.hasNext()) { + val taskInstance = taskIterator.next() + val advice = taskEligibilityPolicy(taskInstance) + if (advice.stop) { + break + } else if (!advice.admit) { + continue + } + + taskIterator.remove() + taskQueue.add(taskInstance) + } + // T3 Per task while (taskQueue.isNotEmpty()) { val instance = taskQueue.peek() @@ -325,7 +337,6 @@ class StageWorkflowService( rootListener.jobFinished(job) } - fun addListener(listener: StageWorkflowSchedulerListener) { rootListener.listeners += listener } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt index a69b5235..acd5731b 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt @@ -25,7 +25,6 @@ package com.atlarge.opendc.workflows.service import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.workflows.service.StageWorkflowService.TaskView import com.atlarge.opendc.workflows.workload.Task class TaskState(val job: JobState, val task: Task) { diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt index f5a0f49b..cfec93b5 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt @@ -55,7 +55,6 @@ sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { } } - override fun toString(): String = "Interactive" } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt index 0b3f567a..6581b7d3 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt @@ -29,6 +29,7 @@ import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.workload.Job import com.atlarge.opendc.workflows.workload.Task +import com.atlarge.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE /** * The [DurationJobOrderPolicy] sorts tasks based on the critical path length of the job. @@ -36,7 +37,7 @@ import com.atlarge.opendc.workflows.workload.Task data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> = object : Comparator<JobState>, StageWorkflowSchedulerListener { - private val results = HashMap<Job, Double>() + private val results = HashMap<Job, Long>() init { scheduler.addListener(this) @@ -46,10 +47,10 @@ data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolic get() = results[this]!! override fun jobSubmitted(job: JobState) { - results[job.job] = job.job.toposort().sumByDouble { task -> - val estimable = task.application as? Estimable - estimable?.estimate(resources) ?: Duration.POSITIVE_INFINITY - } + results[job.job] = job.job.toposort().map { task -> + val estimable = task.metadata[WORKFLOW_TASK_DEADLINE] as? Long? + estimable ?: Long.MAX_VALUE + }.sum() } override fun jobFinished(job: JobState) { @@ -61,7 +62,6 @@ data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolic } } - override fun toString(): String { return "Job-Duration(${if (ascending) "asc" else "desc"})" } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt index a3645523..535d7792 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt @@ -45,7 +45,7 @@ interface JobAdmissionPolicy : StagePolicy<JobAdmissionPolicy.Logic> { * The advice given to the scheduler by an admission policy. * * @property admit A flag to indicate to the scheduler that the job should be admitted. - * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait + * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait * for the next scheduling cycle. */ enum class Advice(val admit: Boolean, val stop: Boolean) { diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt index 488148af..ba57f064 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt @@ -31,4 +31,3 @@ import com.atlarge.opendc.workflows.service.stage.StagePolicy * A policy interface for ordering admitted workflows in the scheduling queue. */ interface JobOrderPolicy : StagePolicy<Comparator<JobState>> - diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt index 5b0afccf..0e83d8d7 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt @@ -39,6 +39,5 @@ object FunctionalResourceFilterPolicy : ResourceFilterPolicy { hosts.filter { it in scheduler.available } } - override fun toString(): String = "functional" } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt index 25214367..96ee233e 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt @@ -27,6 +27,7 @@ package com.atlarge.opendc.workflows.service.stage.task import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState +import com.atlarge.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE import java.util.UUID /** @@ -36,22 +37,22 @@ data class DurationTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPol override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = object : Comparator<TaskState>, StageWorkflowSchedulerListener { - private val results = HashMap<UUID, Double>() + private val results = HashMap<UUID, Long>() init { scheduler.addListener(this) } override fun taskReady(task: TaskState) { - val estimable = task.task.application as? Estimable - results[task.task.uid] = estimable?.estimate(resources) ?: Duration.POSITIVE_INFINITY + val deadline = task.task.metadata[WORKFLOW_TASK_DEADLINE] as? Long? + results[task.task.uid] = deadline ?: Long.MAX_VALUE } override fun taskFinished(task: TaskState) { results -= task.task.uid } - private val TaskState.duration: Double + private val TaskState.duration: Long get() = results.getValue(task.uid) override fun compare(o1: TaskState, o2: TaskState): Int { diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt index 330b191e..8202c4cd 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt @@ -29,7 +29,7 @@ import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState -data class LimitPerJobTaskEligibilityPolicy(val limit: Int): TaskEligibilityPolicy { +data class LimitPerJobTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy { override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener { private val active = mutableMapOf<JobState, Int>() diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt index 1e8a8158..29324a27 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt @@ -27,7 +27,7 @@ package com.atlarge.opendc.workflows.service.stage.task import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState -data class LimitTaskEligibilityPolicy(val limit: Int): TaskEligibilityPolicy { +data class LimitTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy { override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { override fun invoke( task: TaskState @@ -38,6 +38,5 @@ data class LimitTaskEligibilityPolicy(val limit: Int): TaskEligibilityPolicy { TaskEligibilityPolicy.Advice.STOP } - override fun toString(): String = "Limit-Active($limit)" } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt index a53ab8a5..3f5232d4 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt @@ -27,7 +27,7 @@ package com.atlarge.opendc.workflows.service.stage.task import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState -data class LoadTaskEligibilityPolicy(val limit: Double): TaskEligibilityPolicy { +data class LoadTaskEligibilityPolicy(val limit: Double) : TaskEligibilityPolicy { override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { override fun invoke( task: TaskState @@ -38,6 +38,5 @@ data class LoadTaskEligibilityPolicy(val limit: Double): TaskEligibilityPolicy { TaskEligibilityPolicy.Advice.STOP } - override fun toString(): String = "Limit-Load($limit)" } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt index 2aa41a2b..c15ec741 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt @@ -28,7 +28,7 @@ import com.atlarge.opendc.workflows.service.StageWorkflowService import com.atlarge.opendc.workflows.service.TaskState import java.util.Random -data class RandomTaskEligibilityPolicy(val probability: Double = 0.5): TaskEligibilityPolicy { +data class RandomTaskEligibilityPolicy(val probability: Double = 0.5) : TaskEligibilityPolicy { override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { val random = Random(123) @@ -40,6 +40,5 @@ data class RandomTaskEligibilityPolicy(val probability: Double = 0.5): TaskEligi } } - override fun toString(): String = "Random($probability)" } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt index 75529005..72a7fdd0 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt @@ -45,7 +45,7 @@ interface TaskEligibilityPolicy : StagePolicy<TaskEligibilityPolicy.Logic> { * The advice given to the scheduler by an admission policy. * * @property admit A flag to indicate to the scheduler that the job should be admitted. - * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait + * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait * for the next scheduling cycle. */ enum class Advice(val admit: Boolean, val stop: Boolean) { diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt index dece875c..40389ce2 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt @@ -35,12 +35,14 @@ import java.util.UUID * @property name The name of this workflow. * @property owner The owner of the workflow. * @property tasks The tasks that are part of this workflow. + * @property metadata Additional metadata for the job. */ data class Job( override val uid: UUID, override val name: String, override val owner: User, - val tasks: Set<Task> + val tasks: Set<Task>, + val metadata: Map<String, Any> = emptyMap() ) : Workload { override fun equals(other: Any?): Boolean = other is Job && uid == other.uid diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Metadata.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Metadata.kt new file mode 100644 index 00000000..067f1179 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Metadata.kt @@ -0,0 +1,30 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.workload + +/** + * Meta-data key for the deadline of a task. + */ +const val WORKFLOW_TASK_DEADLINE = "workflow:task:deadline" diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt index b5997b35..82521faa 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt @@ -35,12 +35,14 @@ import java.util.UUID * @property name The name of this task. * @property image The application image to run as part of this workflow task. * @property dependencies The dependencies of this task in order for it to execute. + * @property metadata Additional metadata for this task. */ data class Task( override val uid: UUID, override val name: String, val image: Image, - val dependencies: Set<Task> + val dependencies: Set<Task>, + val metadata: Map<String, Any> = emptyMap() ) : Identity { override fun equals(other: Any?): Boolean = other is Task && uid == other.uid |
