summaryrefslogtreecommitdiff
path: root/simulator/opendc/opendc-workflows/src/main
diff options
context:
space:
mode:
authorGeorgios Andreadis <info@gandreadis.com>2020-06-29 16:04:57 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-08-24 16:18:13 +0200
commit46b06fb446e79c390c01953d31d700b8e73da24d (patch)
treeb2329630ebf2c90d297ba0d3046ccd558d12d042 /simulator/opendc/opendc-workflows/src/main
parentebcacf96fbc1cd16a91523f95dd01db046fb7f90 (diff)
Prepare simulator repository for monorepo
This change prepares the simulator Git repository for the monorepo residing at https://github.com/atlarge-research.com/opendc. To accomodate for this, we move all files into a simulator subdirectory.
Diffstat (limited to 'simulator/opendc/opendc-workflows/src/main')
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt41
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerListener.kt39
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt361
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt85
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskStatus.kt35
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt76
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt112
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt52
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/StagePolicy.kt38
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt102
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt72
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt33
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt47
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt47
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt39
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt62
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt40
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt40
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt39
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt43
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt47
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt47
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt34
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt70
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt78
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt66
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt41
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt41
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt72
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt68
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt70
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt45
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt45
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt43
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt47
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt62
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt41
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt72
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskOrderPolicy.kt34
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt52
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Metadata.kt30
-rw-r--r--simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt50
42 files changed, 2558 insertions, 0 deletions
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt
new file mode 100644
index 00000000..1cb2de97
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt
@@ -0,0 +1,41 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service
+
+import com.atlarge.opendc.workflows.workload.Job
+
+class JobState(val job: Job, val submittedAt: Long) {
+ /**
+ * A flag to indicate whether this job is finished.
+ */
+ val isFinished: Boolean
+ get() = tasks.isEmpty()
+
+ val tasks: MutableSet<TaskState> = mutableSetOf()
+
+ override fun equals(other: Any?): Boolean = other is JobState && other.job == job
+
+ override fun hashCode(): Int = job.hashCode()
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerListener.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerListener.kt
new file mode 100644
index 00000000..73c3e752
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerListener.kt
@@ -0,0 +1,39 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service
+
+interface StageWorkflowSchedulerListener {
+ fun cycleStarted(scheduler: StageWorkflowService) {}
+ fun cycleFinished(scheduler: StageWorkflowService) {}
+
+ fun jobSubmitted(job: JobState) {}
+ fun jobStarted(job: JobState) {}
+ fun jobFinished(job: JobState) {}
+
+ fun taskReady(task: TaskState) {}
+ fun taskAssigned(task: TaskState) {}
+ fun taskStarted(task: TaskState) {}
+ fun taskFinished(task: TaskState) {}
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
new file mode 100644
index 00000000..7a20363c
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt
@@ -0,0 +1,361 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service
+
+import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.flow.EventFlow
+import com.atlarge.odcsim.simulationContext
+import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerEvent
+import com.atlarge.opendc.compute.core.ServerState
+import com.atlarge.opendc.compute.metal.Node
+import com.atlarge.opendc.compute.metal.service.ProvisioningService
+import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy
+import com.atlarge.opendc.workflows.service.stage.job.JobOrderPolicy
+import com.atlarge.opendc.workflows.service.stage.resource.ResourceFilterPolicy
+import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPolicy
+import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy
+import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy
+import com.atlarge.opendc.workflows.workload.Job
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
+import java.util.PriorityQueue
+import java.util.Queue
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.withContext
+
+/**
+ * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for
+ * Datacenter Scheduling.
+ */
+class StageWorkflowService(
+ private val domain: Domain,
+ private val provisioningService: ProvisioningService,
+ mode: WorkflowSchedulerMode,
+ jobAdmissionPolicy: JobAdmissionPolicy,
+ jobOrderPolicy: JobOrderPolicy,
+ taskEligibilityPolicy: TaskEligibilityPolicy,
+ taskOrderPolicy: TaskOrderPolicy,
+ resourceFilterPolicy: ResourceFilterPolicy,
+ resourceSelectionPolicy: ResourceSelectionPolicy
+) : WorkflowService, CoroutineScope by domain {
+
+ /**
+ * The incoming jobs ready to be processed by the scheduler.
+ */
+ internal val incomingJobs: MutableSet<JobState> = linkedSetOf()
+
+ /**
+ * The incoming tasks ready to be processed by the scheduler.
+ */
+ internal val incomingTasks: MutableSet<TaskState> = linkedSetOf()
+
+ /**
+ * The job queue.
+ */
+ internal val jobQueue: Queue<JobState>
+
+ /**
+ * The task queue.
+ */
+ internal val taskQueue: Queue<TaskState>
+
+ /**
+ * The active jobs in the system.
+ */
+ internal val activeJobs: MutableSet<JobState> = mutableSetOf()
+
+ /**
+ * The active tasks in the system.
+ */
+ internal val activeTasks: MutableSet<TaskState> = mutableSetOf()
+
+ /**
+ * The running tasks by [Server].
+ */
+ internal val taskByServer = mutableMapOf<Server, TaskState>()
+
+ /**
+ * The nodes that are controlled by the service.
+ */
+ internal lateinit var nodes: List<Node>
+
+ /**
+ * The available nodes.
+ */
+ internal val available: MutableSet<Node> = mutableSetOf()
+
+ /**
+ * The maximum number of incoming jobs.
+ */
+ private val throttleLimit: Int = 20000
+
+ /**
+ * The load of the system.
+ */
+ internal val load: Double
+ get() = (available.size / nodes.size.toDouble())
+
+ /**
+ * The root listener of this scheduler.
+ */
+ private val rootListener = object : StageWorkflowSchedulerListener {
+ /**
+ * The listeners to delegate to.
+ */
+ val listeners = mutableSetOf<StageWorkflowSchedulerListener>()
+
+ override fun cycleStarted(scheduler: StageWorkflowService) {
+ listeners.forEach { it.cycleStarted(scheduler) }
+ }
+
+ override fun cycleFinished(scheduler: StageWorkflowService) {
+ listeners.forEach { it.cycleFinished(scheduler) }
+ }
+
+ override fun jobSubmitted(job: JobState) {
+ listeners.forEach { it.jobSubmitted(job) }
+ }
+
+ override fun jobStarted(job: JobState) {
+ listeners.forEach { it.jobStarted(job) }
+ }
+
+ override fun jobFinished(job: JobState) {
+ listeners.forEach { it.jobFinished(job) }
+ }
+
+ override fun taskReady(task: TaskState) {
+ listeners.forEach { it.taskReady(task) }
+ }
+
+ override fun taskAssigned(task: TaskState) {
+ listeners.forEach { it.taskAssigned(task) }
+ }
+
+ override fun taskStarted(task: TaskState) {
+ listeners.forEach { it.taskStarted(task) }
+ }
+
+ override fun taskFinished(task: TaskState) {
+ listeners.forEach { it.taskFinished(task) }
+ }
+ }
+
+ private val mode: WorkflowSchedulerMode.Logic
+ private val jobAdmissionPolicy: JobAdmissionPolicy.Logic
+ private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
+ private val resourceFilterPolicy: ResourceFilterPolicy.Logic
+ private val resourceSelectionPolicy: Comparator<Node>
+ private val eventFlow = EventFlow<WorkflowEvent>()
+
+ init {
+ domain.launch {
+ nodes = provisioningService.nodes().toList()
+ available.addAll(nodes)
+ }
+
+ this.mode = mode(this)
+ this.jobAdmissionPolicy = jobAdmissionPolicy(this)
+ this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid })
+ this.taskEligibilityPolicy = taskEligibilityPolicy(this)
+ this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid })
+ this.resourceFilterPolicy = resourceFilterPolicy(this)
+ this.resourceSelectionPolicy = resourceSelectionPolicy(this)
+ }
+
+ override val events: Flow<WorkflowEvent> = eventFlow
+
+ override suspend fun submit(job: Job) = withContext(domain.coroutineContext) {
+ // J1 Incoming Jobs
+ val jobInstance = JobState(job, simulationContext.clock.millis())
+ val instances = job.tasks.associateWith {
+ TaskState(jobInstance, it)
+ }
+
+ for ((task, instance) in instances) {
+ instance.dependencies.addAll(task.dependencies.map { instances[it]!! })
+ task.dependencies.forEach {
+ instances[it]!!.dependents.add(instance)
+ }
+
+ // If the task has no dependency, it is a root task and can immediately be evaluated
+ if (instance.isRoot) {
+ instance.state = TaskStatus.READY
+ }
+ }
+
+ instances.values.toCollection(jobInstance.tasks)
+ incomingJobs += jobInstance
+ rootListener.jobSubmitted(jobInstance)
+
+ requestCycle()
+ }
+
+ /**
+ * Indicate to the scheduler that a scheduling cycle is needed.
+ */
+ private suspend fun requestCycle() = mode.requestCycle()
+
+ /**
+ * Perform a scheduling cycle immediately.
+ */
+ @OptIn(ExperimentalCoroutinesApi::class)
+ internal suspend fun schedule() {
+ // J2 Create list of eligible jobs
+ val iterator = incomingJobs.iterator()
+ while (iterator.hasNext()) {
+ val jobInstance = iterator.next()
+ val advice = jobAdmissionPolicy(jobInstance)
+ if (advice.stop) {
+ break
+ } else if (!advice.admit) {
+ continue
+ }
+
+ iterator.remove()
+ jobQueue.add(jobInstance)
+ activeJobs += jobInstance
+ eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, simulationContext.clock.millis()))
+ rootListener.jobStarted(jobInstance)
+ }
+
+ // J4 Per job
+ while (jobQueue.isNotEmpty()) {
+ val jobInstance = jobQueue.poll()
+
+ // Edge-case: job has no tasks
+ if (jobInstance.isFinished) {
+ finishJob(jobInstance)
+ }
+
+ // Add job roots to the scheduling queue
+ for (task in jobInstance.tasks) {
+ if (task.state != TaskStatus.READY) {
+ continue
+ }
+
+ incomingTasks += task
+ rootListener.taskReady(task)
+ }
+ }
+
+ // T1 Create list of eligible tasks
+ val taskIterator = incomingTasks.iterator()
+ while (taskIterator.hasNext()) {
+ val taskInstance = taskIterator.next()
+ val advice = taskEligibilityPolicy(taskInstance)
+ if (advice.stop) {
+ break
+ } else if (!advice.admit) {
+ continue
+ }
+
+ taskIterator.remove()
+ taskQueue.add(taskInstance)
+ }
+
+ // T3 Per task
+ while (taskQueue.isNotEmpty()) {
+ val instance = taskQueue.peek()
+ val host: Node? = available.firstOrNull()
+
+ if (host != null) {
+ // T4 Submit task to machine
+ available -= host
+ instance.state = TaskStatus.ACTIVE
+ val newHost = provisioningService.deploy(host, instance.task.image)
+ val server = newHost.server!!
+ instance.host = newHost
+ taskByServer[server] = instance
+ server.events
+ .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) }
+ .launchIn(this)
+
+ activeTasks += instance
+ taskQueue.poll()
+ rootListener.taskAssigned(instance)
+ } else {
+ break
+ }
+ }
+ }
+
+ private suspend fun stateChanged(server: Server) {
+ when (server.state) {
+ ServerState.ACTIVE -> {
+ val task = taskByServer.getValue(server)
+ task.startedAt = simulationContext.clock.millis()
+ eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis()))
+ rootListener.taskStarted(task)
+ }
+ ServerState.SHUTOFF, ServerState.ERROR -> {
+ val task = taskByServer.remove(server) ?: throw IllegalStateException()
+ val job = task.job
+ task.state = TaskStatus.FINISHED
+ task.finishedAt = simulationContext.clock.millis()
+ job.tasks.remove(task)
+ available += task.host!!
+ activeTasks -= task
+ eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis()))
+ rootListener.taskFinished(task)
+
+ // Add job roots to the scheduling queue
+ for (dependent in task.dependents) {
+ if (dependent.state != TaskStatus.READY) {
+ continue
+ }
+
+ incomingTasks += dependent
+ rootListener.taskReady(dependent)
+ }
+
+ if (job.isFinished) {
+ finishJob(job)
+ }
+
+ requestCycle()
+ }
+ else -> throw IllegalStateException()
+ }
+ }
+
+ private suspend fun finishJob(job: JobState) {
+ activeJobs -= job
+ eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, simulationContext.clock.millis()))
+ rootListener.jobFinished(job)
+ }
+
+ fun addListener(listener: StageWorkflowSchedulerListener) {
+ rootListener.listeners += listener
+ }
+
+ fun removeListener(listener: StageWorkflowSchedulerListener) {
+ rootListener.listeners -= listener
+ }
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt
new file mode 100644
index 00000000..acd5731b
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt
@@ -0,0 +1,85 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service
+
+import com.atlarge.opendc.compute.metal.Node
+import com.atlarge.opendc.workflows.workload.Task
+
+class TaskState(val job: JobState, val task: Task) {
+ /**
+ * The moment in time the task was started.
+ */
+ var startedAt: Long = Long.MIN_VALUE
+
+ /**
+ * The moment in time the task was finished.
+ */
+ var finishedAt: Long = Long.MIN_VALUE
+
+ /**
+ * The dependencies of this task.
+ */
+ val dependencies = HashSet<TaskState>()
+
+ /**
+ * The dependents of this task.
+ */
+ val dependents = HashSet<TaskState>()
+
+ /**
+ * A flag to indicate whether this workflow task instance is a workflow root.
+ */
+ val isRoot: Boolean
+ get() = dependencies.isEmpty()
+
+ var state: TaskStatus = TaskStatus.CREATED
+ set(value) {
+ field = value
+
+ // Mark the process as terminated in the graph
+ if (value == TaskStatus.FINISHED) {
+ markTerminated()
+ }
+ }
+
+ var host: Node? = null
+
+ /**
+ * Mark the specified [TaskView] as terminated.
+ */
+ private fun markTerminated() {
+ for (dependent in dependents) {
+ dependent.dependencies.remove(this)
+
+ if (dependent.isRoot) {
+ dependent.state = TaskStatus.READY
+ }
+ }
+ }
+
+ override fun equals(other: Any?): Boolean = other is TaskState && other.job == job && other.task == task
+
+ override fun hashCode(): Int = task.hashCode()
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskStatus.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskStatus.kt
new file mode 100644
index 00000000..c53c6171
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskStatus.kt
@@ -0,0 +1,35 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service
+
+/**
+ * The state of a workflow task.
+ */
+public enum class TaskStatus {
+ CREATED,
+ READY,
+ ACTIVE,
+ FINISHED
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt
new file mode 100644
index 00000000..2ca5a19d
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt
@@ -0,0 +1,76 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service
+
+import com.atlarge.opendc.workflows.workload.Job
+import com.atlarge.opendc.workflows.workload.Task
+
+/**
+ * An event emitted by the [WorkflowService].
+ */
+public sealed class WorkflowEvent {
+ /**
+ * The [WorkflowService] that emitted the event.
+ */
+ public abstract val service: WorkflowService
+
+ /**
+ * This event is emitted when a job has become active.
+ */
+ public data class JobStarted(
+ override val service: WorkflowService,
+ public val job: Job,
+ public val time: Long
+ ) : WorkflowEvent()
+
+ /**
+ * This event is emitted when a job has finished processing.
+ */
+ public data class JobFinished(
+ override val service: WorkflowService,
+ public val job: Job,
+ public val time: Long
+ ) : WorkflowEvent()
+
+ /**
+ * This event is emitted when a task of a job has started processing.
+ */
+ public data class TaskStarted(
+ override val service: WorkflowService,
+ public val job: Job,
+ public val task: Task,
+ public val time: Long
+ ) : WorkflowEvent()
+
+ /**
+ * This event is emitted when a task of a job has started processing.
+ */
+ public data class TaskFinished(
+ override val service: WorkflowService,
+ public val job: Job,
+ public val task: Task,
+ public val time: Long
+ ) : WorkflowEvent()
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
new file mode 100644
index 00000000..776f0b07
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt
@@ -0,0 +1,112 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service
+
+import com.atlarge.odcsim.simulationContext
+import com.atlarge.opendc.workflows.service.stage.StagePolicy
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.yield
+
+/**
+ * The operating mode of a workflow scheduler.
+ */
+sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
+ /**
+ * The logic for operating the cycles of a workflow scheduler.
+ */
+ interface Logic {
+ /**
+ * Request a new scheduling cycle to be performed.
+ */
+ suspend fun requestCycle()
+ }
+
+ /**
+ * An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received.
+ */
+ object Interactive : WorkflowSchedulerMode() {
+ override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
+ override suspend fun requestCycle() {
+ yield()
+ scheduler.schedule()
+ }
+ }
+
+ override fun toString(): String = "Interactive"
+ }
+
+ /**
+ * A batch scheduler triggers a scheduling cycle every time quantum if needed.
+ */
+ data class Batch(val quantum: Long) : WorkflowSchedulerMode() {
+ private var next: kotlinx.coroutines.Job? = null
+
+ override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
+ override suspend fun requestCycle() {
+ val ctx = simulationContext
+ if (next == null) {
+ // In batch mode, we assume that the scheduler runs at a fixed slot every time
+ // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot.
+ val delay = quantum - (ctx.clock.millis() % quantum)
+
+ val job = ctx.domain.launch {
+ delay(delay)
+ next = null
+ scheduler.schedule()
+ }
+ next = job
+ }
+ }
+ }
+
+ override fun toString(): String = "Batch($quantum)"
+ }
+
+ /**
+ * A scheduling cycle is triggered at a random point in time.
+ */
+ data class Random(private val random: java.util.Random = java.util.Random(123)) : WorkflowSchedulerMode() {
+ private var next: kotlinx.coroutines.Job? = null
+
+ override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic {
+ override suspend fun requestCycle() {
+ val ctx = simulationContext
+ if (next == null) {
+ val delay = random.nextInt(200).toLong()
+
+ val job = ctx.domain.launch {
+ delay(delay)
+ next = null
+ scheduler.schedule()
+ }
+ next = job
+ }
+ }
+ }
+
+ override fun toString(): String = "Random"
+ }
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
new file mode 100644
index 00000000..38ea49c4
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt
@@ -0,0 +1,52 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service
+
+import com.atlarge.opendc.core.services.AbstractServiceKey
+import com.atlarge.opendc.workflows.workload.Job
+import kotlinx.coroutines.flow.Flow
+import java.util.UUID
+
+/**
+ * A service for cloud workflow management.
+ *
+ * The workflow scheduler is modelled after the Reference Architecture for Datacenter Scheduling by Andreadis et al.
+ */
+public interface WorkflowService {
+ /**
+ * Thie events emitted by the workflow scheduler.
+ */
+ public val events: Flow<WorkflowEvent>
+
+ /**
+ * Submit the specified [Job] to the workflow service for scheduling.
+ */
+ public suspend fun submit(job: Job)
+
+ /**
+ * The service key for the workflow scheduler.
+ */
+ companion object Key : AbstractServiceKey<WorkflowService>(UUID.randomUUID(), "workflows")
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/StagePolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/StagePolicy.kt
new file mode 100644
index 00000000..c7cc3d84
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/StagePolicy.kt
@@ -0,0 +1,38 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage
+
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import java.io.Serializable
+
+/**
+ * A scheduling stage policy.
+ */
+interface StagePolicy<T : Any> : Serializable {
+ /**
+ * Build the logic of the stage policy.
+ */
+ operator fun invoke(scheduler: StageWorkflowService): T
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt
new file mode 100644
index 00000000..bbdb9f71
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt
@@ -0,0 +1,102 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.job
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.workload.Job
+import com.atlarge.opendc.workflows.workload.Task
+import com.atlarge.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE
+
+/**
+ * A [JobOrderPolicy] that orders jobs based on its critical path length.
+ */
+data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> =
+ object : Comparator<JobState>, StageWorkflowSchedulerListener {
+ private val results = HashMap<Job, Long>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ private val Job.duration: Long
+ get() = results[this]!!
+
+ override fun jobSubmitted(job: JobState) {
+ results[job.job] = job.job.toposort().map { task ->
+ val estimable = task.metadata[WORKFLOW_TASK_DEADLINE] as? Long?
+ estimable ?: Long.MAX_VALUE
+ }.sum()
+ }
+
+ override fun jobFinished(job: JobState) {
+ results.remove(job.job)
+ }
+
+ override fun compare(o1: JobState, o2: JobState): Int {
+ return compareValuesBy(o1, o2) { it.job.duration }.let { if (ascending) it else -it }
+ }
+ }
+
+ override fun toString(): String {
+ return "Job-Duration(${if (ascending) "asc" else "desc"})"
+ }
+}
+
+/**
+ * Create a topological sorting of the tasks in a job.
+ *
+ * @return The list of tasks within the job topologically sorted.
+ */
+fun Job.toposort(): List<Task> {
+ val res = mutableListOf<Task>()
+ val visited = mutableSetOf<Task>()
+ val adjacent = mutableMapOf<Task, MutableList<Task>>()
+
+ for (task in tasks) {
+ for (dependency in task.dependencies) {
+ adjacent.getOrPut(dependency) { mutableListOf() }.add(task)
+ }
+ }
+
+ fun visit(task: Task) {
+ visited.add(task)
+
+ adjacent[task] ?: emptyList<Task>()
+ .asSequence()
+ .filter { it !in visited }
+ .forEach { visit(it) }
+
+ res.add(task)
+ }
+
+ tasks
+ .asSequence()
+ .filter { it !in visited }
+ .forEach { visit(it) }
+ return res
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt
new file mode 100644
index 00000000..535d7792
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt
@@ -0,0 +1,72 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.job
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.stage.StagePolicy
+
+/**
+ * A policy interface for admitting [JobState]s to a scheduling cycle.
+ */
+interface JobAdmissionPolicy : StagePolicy<JobAdmissionPolicy.Logic> {
+ interface Logic {
+ /**
+ * Determine whether the specified [JobState] should be admitted to the scheduling cycle.
+ *
+ * @param job The workflow that has been submitted.
+ * @return The advice for admitting the job.
+ */
+ operator fun invoke(job: JobState): Advice
+ }
+
+ /**
+ * The advice given to the scheduler by an admission policy.
+ *
+ * @property admit A flag to indicate to the scheduler that the job should be admitted.
+ * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait
+ * for the next scheduling cycle.
+ */
+ enum class Advice(val admit: Boolean, val stop: Boolean) {
+ /**
+ * Admit the current job to the scheduling queue and continue admitting jobs.
+ */
+ ADMIT(true, false),
+
+ /**
+ * Admit the current job to the scheduling queue and stop admitting jobs.
+ */
+ ADMIT_LAST(true, true),
+
+ /**
+ * Deny the current job, but continue admitting jobs.
+ */
+ DENY(false, false),
+
+ /**
+ * Deny the current job and also stop admitting jobs.
+ */
+ STOP(false, true)
+ }
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt
new file mode 100644
index 00000000..ba57f064
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt
@@ -0,0 +1,33 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.job
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.stage.StagePolicy
+
+/**
+ * A policy interface for ordering admitted workflows in the scheduling queue.
+ */
+interface JobOrderPolicy : StagePolicy<Comparator<JobState>>
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt
new file mode 100644
index 00000000..6b1faf20
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt
@@ -0,0 +1,47 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.job
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+
+/**
+ * A [JobAdmissionPolicy] that limits the amount of active jobs in the system.
+ *
+ * @property limit The maximum number of concurrent jobs in the system.
+ */
+data class LimitJobAdmissionPolicy(val limit: Int) : JobAdmissionPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = object : JobAdmissionPolicy.Logic {
+ override fun invoke(
+ job: JobState
+ ): JobAdmissionPolicy.Advice =
+ if (scheduler.activeJobs.size < limit)
+ JobAdmissionPolicy.Advice.ADMIT
+ else
+ JobAdmissionPolicy.Advice.STOP
+ }
+
+ override fun toString(): String = "Limit-Active($limit)"
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt
new file mode 100644
index 00000000..e1c27472
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt
@@ -0,0 +1,47 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.job
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+
+/**
+ * A [JobAdmissionPolicy] that limits the amount of jobs based on the average system load.
+ *
+ * @property limit The maximum load before stopping admission.
+ */
+data class LoadJobAdmissionPolicy(val limit: Double) : JobAdmissionPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = object : JobAdmissionPolicy.Logic {
+ override fun invoke(
+ job: JobState
+ ): JobAdmissionPolicy.Advice =
+ if (scheduler.load < limit)
+ JobAdmissionPolicy.Advice.ADMIT
+ else
+ JobAdmissionPolicy.Advice.STOP
+ }
+
+ override fun toString(): String = "Limit-Load($limit)"
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt
new file mode 100644
index 00000000..46888467
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt
@@ -0,0 +1,39 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.job
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+
+/**
+ * A [JobAdmissionPolicy] that admits all jobs.
+ */
+object NullJobAdmissionPolicy : JobAdmissionPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = object : JobAdmissionPolicy.Logic {
+ override fun invoke(job: JobState): JobAdmissionPolicy.Advice = JobAdmissionPolicy.Advice.ADMIT
+ }
+
+ override fun toString(): String = "Always"
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt
new file mode 100644
index 00000000..14a3d98d
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt
@@ -0,0 +1,62 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.job
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.workload.Job
+import java.util.Random
+
+/**
+ * A [JobOrderPolicy] that randomly orders jobs.
+ */
+object RandomJobOrderPolicy : JobOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> =
+ object : Comparator<JobState>, StageWorkflowSchedulerListener {
+ private val random = Random(123)
+ private val ids = HashMap<Job, Int>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun jobSubmitted(job: JobState) {
+ ids[job.job] = random.nextInt()
+ }
+
+ override fun jobFinished(job: JobState) {
+ ids.remove(job.job)
+ }
+
+ override fun compare(o1: JobState, o2: JobState): Int {
+ return compareValuesBy(o1, o2) { ids.getValue(it.job) }
+ }
+ }
+
+ override fun toString(): String {
+ return "Random"
+ }
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt
new file mode 100644
index 00000000..3bce43cf
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt
@@ -0,0 +1,40 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.job
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+
+/**
+ * A [SizeJobOrderPolicy] that orders jobs based on the number of tasks it has.
+ */
+data class SizeJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService) =
+ compareBy<JobState> { it.tasks.size.let { if (ascending) it else -it } }
+
+ override fun toString(): String {
+ return "Job-Size(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt
new file mode 100644
index 00000000..d6e24b2b
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt
@@ -0,0 +1,40 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.job
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+
+/**
+ * A [JobOrderPolicy] orders jobs in FIFO order.
+ */
+data class SubmissionTimeJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService) =
+ compareBy<JobState> { it.submittedAt.let { if (ascending) it else -it } }
+
+ override fun toString(): String {
+ return "Submission-Time(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt
new file mode 100644
index 00000000..a5671d45
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt
@@ -0,0 +1,39 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.resource
+
+import com.atlarge.opendc.compute.metal.Node
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+
+/**
+ * A [ResourceSelectionPolicy] that selects the first machine that is available.
+ */
+object FirstFitResourceSelectionPolicy : ResourceSelectionPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = object : Comparator<Node> {
+ override fun compare(o1: Node, o2: Node): Int = 1
+ }
+
+ override fun toString(): String = "First-Fit"
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt
new file mode 100644
index 00000000..0e83d8d7
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt
@@ -0,0 +1,43 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.resource
+
+import com.atlarge.opendc.compute.metal.Node
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+
+/**
+ * A [ResourceFilterPolicy] based on the amount of cores available on the machine and the cores required for
+ * the task.
+ */
+object FunctionalResourceFilterPolicy : ResourceFilterPolicy {
+ override fun invoke(scheduler: StageWorkflowService): ResourceFilterPolicy.Logic =
+ object : ResourceFilterPolicy.Logic {
+ override fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node> =
+ hosts.filter { it in scheduler.available }
+ }
+
+ override fun toString(): String = "functional"
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt
new file mode 100644
index 00000000..9b05cbac
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt
@@ -0,0 +1,47 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.resource
+
+import com.atlarge.opendc.compute.metal.Node
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import java.util.Random
+
+/**
+ * A [ResourceSelectionPolicy] that randomly orders the machines.
+ */
+object RandomResourceSelectionPolicy : ResourceSelectionPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = object : Comparator<Node> {
+ private val ids: Map<Node, Long>
+
+ init {
+ val random = Random(123)
+ ids = scheduler.nodes.associateWith { random.nextLong() }
+ }
+
+ override fun compare(o1: Node, o2: Node): Int = compareValuesBy(o1, o2) { ids[it] }
+ }
+
+ override fun toString(): String = "Random"
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt
new file mode 100644
index 00000000..28ef970f
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt
@@ -0,0 +1,47 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.resource
+
+import com.atlarge.opendc.compute.metal.Node
+import com.atlarge.opendc.workflows.service.TaskState
+import com.atlarge.opendc.workflows.service.stage.StagePolicy
+
+/**
+ * This interface represents stages **R2**, **R3** and **R4** stage of the Reference Architecture for Schedulers and
+ * acts as a filter yielding a list of resources with sufficient resource-capacities, based on fixed or dynamic
+ * requirements, and on predicted or monitored information about processing unit availability, memory occupancy, etc.
+ */
+interface ResourceFilterPolicy : StagePolicy<ResourceFilterPolicy.Logic> {
+ interface Logic {
+ /**
+ * Filter the list of machines based on dynamic information.
+ *
+ * @param hosts The hosts to filter.
+ * @param task The task that is to be scheduled.
+ * @return The machines on which the task can be scheduled.
+ */
+ operator fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node>
+ }
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt
new file mode 100644
index 00000000..43053097
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt
@@ -0,0 +1,34 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.resource
+
+import com.atlarge.opendc.compute.metal.Node
+import com.atlarge.opendc.workflows.service.stage.StagePolicy
+
+/**
+ * This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected
+ * task with a (set of) resource(s), using policies such as First-Fit, Worst-Fit, and Best-Fit.
+ */
+interface ResourceSelectionPolicy : StagePolicy<Comparator<Node>>
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt
new file mode 100644
index 00000000..b084d26c
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt
@@ -0,0 +1,70 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+
+/**
+ * A [TaskOrderPolicy] that orders tasks based on the number of active relative tasks (w.r.t. its job) in the system.
+ */
+data class ActiveTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> =
+ object : Comparator<TaskState>, StageWorkflowSchedulerListener {
+ private val active = mutableMapOf<JobState, Int>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun jobStarted(job: JobState) {
+ active[job] = 0
+ }
+
+ override fun jobFinished(job: JobState) {
+ active.remove(job)
+ }
+
+ override fun taskAssigned(task: TaskState) {
+ active.merge(task.job, 1, Int::plus)
+ }
+
+ override fun taskFinished(task: TaskState) {
+ active.merge(task.job, -1, Int::plus)
+ }
+
+ override fun compare(o1: TaskState, o2: TaskState): Int {
+ return compareValuesBy(o1, o2) { active.getValue(it.job) }.let {
+ if (ascending) it else -it
+ }
+ }
+ }
+
+ override fun toString(): String {
+ return "Active-Per-Job(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt
new file mode 100644
index 00000000..2255d40c
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt
@@ -0,0 +1,78 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+import kotlin.math.max
+
+/**
+ * A [TaskEligibilityPolicy] that balances the tasks based on their job, e.g. do not allow a single job to claim all
+ * resources of the system.
+ *
+ * @property tolerance The maximum difference from the average number of tasks per job in the system as a fraction of
+ * the average.
+ */
+data class BalancingTaskEligibilityPolicy(val tolerance: Double = 1.5) : TaskEligibilityPolicy {
+ override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic =
+ object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener {
+ private val active = mutableMapOf<JobState, Int>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun jobStarted(job: JobState) {
+ active[job] = 0
+ }
+
+ override fun jobFinished(job: JobState) {
+ active.remove(job)
+ }
+
+ override fun taskAssigned(task: TaskState) {
+ active.merge(task.job, 1, Int::plus)
+ }
+
+ override fun taskFinished(task: TaskState) {
+ active.merge(task.job, -1, Int::plus)
+ }
+
+ override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice {
+ val activeJobs = scheduler.activeJobs.size
+ val activeTasks = scheduler.activeTasks.size
+ val baseline = max(activeTasks / activeJobs.toDouble(), 1.0)
+ val activeForJob = active[task.job]!!
+ return if ((activeForJob + 1) / baseline < tolerance)
+ TaskEligibilityPolicy.Advice.ADMIT
+ else
+ TaskEligibilityPolicy.Advice.DENY
+ }
+ }
+
+ override fun toString(): String = "Job-Balance($tolerance)"
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt
new file mode 100644
index 00000000..d0cf1374
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt
@@ -0,0 +1,66 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+
+/**
+ * A [TaskOrderPolicy] that orders tasks based on the number of completed relative tasks.
+ */
+data class CompletionTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> =
+ object : Comparator<TaskState>, StageWorkflowSchedulerListener {
+ private val finished = mutableMapOf<JobState, Int>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun jobStarted(job: JobState) {
+ finished[job] = 0
+ }
+
+ override fun jobFinished(job: JobState) {
+ finished.remove(job)
+ }
+
+ override fun taskFinished(task: TaskState) {
+ finished.merge(task.job, 1, Int::plus)
+ }
+
+ override fun compare(o1: TaskState, o2: TaskState): Int {
+ return compareValuesBy(o1, o2) { finished.getValue(it.job) / it.job.tasks.size.toDouble() }.let {
+ if (ascending) it else -it
+ }
+ }
+ }
+
+ override fun toString(): String {
+ return "Job-Completion(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt
new file mode 100644
index 00000000..73d83d21
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt
@@ -0,0 +1,41 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+
+/**
+ * A [TaskOrderPolicy] that orders tasks based on the number of dependency tasks it has.
+ */
+data class DependenciesTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = compareBy<TaskState> {
+ it.task.dependencies.size.let { if (ascending) it else -it }
+ }
+
+ override fun toString(): String {
+ return "Task-Dependencies(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt
new file mode 100644
index 00000000..85b3543f
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt
@@ -0,0 +1,41 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+
+/**
+ * A [TaskOrderPolicy] that orders tasks based on the number of dependent tasks it has.
+ */
+data class DependentsTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = compareBy<TaskState> {
+ it.dependents.size.let { if (ascending) it else -it }
+ }
+
+ override fun toString(): String {
+ return "Task-Dependents(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt
new file mode 100644
index 00000000..426a76a4
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt
@@ -0,0 +1,72 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+
+/**
+ * A [TaskOrderPolicy] that orders tasks based on the average duration of the preceding tasks in the job.
+ */
+data class DurationHistoryTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy {
+
+ override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> =
+ object : Comparator<TaskState>, StageWorkflowSchedulerListener {
+ private val results = HashMap<JobState, MutableList<Long>>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun jobStarted(job: JobState) {
+ results[job] = mutableListOf()
+ }
+
+ override fun jobFinished(job: JobState) {
+ results.remove(job)
+ }
+
+ override fun taskFinished(task: TaskState) {
+ results.getValue(task.job) += task.finishedAt - task.startedAt
+ }
+
+ override fun compare(o1: TaskState, o2: TaskState): Int {
+ return compareValuesBy(o1, o2) { key ->
+ val history = results.getValue(key.job)
+ if (history.isEmpty()) {
+ Long.MAX_VALUE
+ } else {
+ history.average()
+ }
+ }.let { if (ascending) it else -it }
+ }
+ }
+
+ override fun toString(): String {
+ return "Task-Duration-History(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt
new file mode 100644
index 00000000..23b47891
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt
@@ -0,0 +1,68 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+import com.atlarge.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE
+import java.util.UUID
+
+/**
+ * A [TaskOrderPolicy] orders tasks based on the pre-specified (approximate) duration of the task.
+ */
+data class DurationTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy {
+
+ override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> =
+ object : Comparator<TaskState>, StageWorkflowSchedulerListener {
+ private val results = HashMap<UUID, Long>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun taskReady(task: TaskState) {
+ val deadline = task.task.metadata[WORKFLOW_TASK_DEADLINE] as? Long?
+ results[task.task.uid] = deadline ?: Long.MAX_VALUE
+ }
+
+ override fun taskFinished(task: TaskState) {
+ results -= task.task.uid
+ }
+
+ private val TaskState.duration: Long
+ get() = results.getValue(task.uid)
+
+ override fun compare(o1: TaskState, o2: TaskState): Int {
+ return compareValuesBy(o1, o2) { state -> state.duration }.let {
+ if (ascending) it else -it
+ }
+ }
+ }
+
+ override fun toString(): String {
+ return "Task-Duration(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt
new file mode 100644
index 00000000..c039bf6f
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt
@@ -0,0 +1,70 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.JobState
+import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+
+/**
+ * A [TaskEligibilityPolicy] that limits the number of active tasks of a job in the system.
+ */
+data class LimitPerJobTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy {
+ override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic =
+ object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener {
+ private val active = mutableMapOf<JobState, Int>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun jobStarted(job: JobState) {
+ active[job] = 0
+ }
+
+ override fun jobFinished(job: JobState) {
+ active.remove(job)
+ }
+
+ override fun taskAssigned(task: TaskState) {
+ active.merge(task.job, 1, Int::plus)
+ }
+
+ override fun taskFinished(task: TaskState) {
+ active.merge(task.job, -1, Int::plus)
+ }
+
+ override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice {
+ val activeForJob = active[task.job]!!
+ return if (activeForJob <= limit)
+ TaskEligibilityPolicy.Advice.ADMIT
+ else
+ TaskEligibilityPolicy.Advice.DENY
+ }
+ }
+
+ override fun toString(): String = "Limit-Active-Job($limit)"
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt
new file mode 100644
index 00000000..75322ef5
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt
@@ -0,0 +1,45 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+
+/**
+ * A [TaskEligibilityPolicy] that limits the total number of active tasks in the system.
+ */
+data class LimitTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic {
+ override fun invoke(
+ task: TaskState
+ ): TaskEligibilityPolicy.Advice =
+ if (scheduler.activeTasks.size < limit)
+ TaskEligibilityPolicy.Advice.ADMIT
+ else
+ TaskEligibilityPolicy.Advice.STOP
+ }
+
+ override fun toString(): String = "Limit-Active($limit)"
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt
new file mode 100644
index 00000000..090f7be7
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt
@@ -0,0 +1,45 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+
+/**
+ * A [TaskEligibilityPolicy] that limits the number of active tasks in the system based on the average system load.
+ */
+data class LoadTaskEligibilityPolicy(val limit: Double) : TaskEligibilityPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic {
+ override fun invoke(
+ task: TaskState
+ ): TaskEligibilityPolicy.Advice =
+ if (scheduler.load < limit)
+ TaskEligibilityPolicy.Advice.ADMIT
+ else
+ TaskEligibilityPolicy.Advice.STOP
+ }
+
+ override fun toString(): String = "Limit-Load($limit)"
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt
new file mode 100644
index 00000000..889f2ab5
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt
@@ -0,0 +1,43 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+
+/**
+ * A [TaskEligibilityPolicy] that always allows new tasks to enter.
+ */
+object NullTaskEligibilityPolicy : TaskEligibilityPolicy {
+ override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = Logic
+
+ private object Logic : TaskEligibilityPolicy.Logic {
+ override fun invoke(
+ task: TaskState
+ ): TaskEligibilityPolicy.Advice = TaskEligibilityPolicy.Advice.ADMIT
+ }
+
+ override fun toString(): String = "Always"
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt
new file mode 100644
index 00000000..d6f49d14
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt
@@ -0,0 +1,47 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+import java.util.Random
+
+/**
+ * A [TaskEligibilityPolicy] that randomly accepts tasks in the system with some [probability].
+ */
+data class RandomTaskEligibilityPolicy(val probability: Double = 0.5) : TaskEligibilityPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic {
+ val random = Random(123)
+
+ override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice =
+ if (random.nextDouble() <= probability || scheduler.activeTasks.isEmpty())
+ TaskEligibilityPolicy.Advice.ADMIT
+ else {
+ TaskEligibilityPolicy.Advice.DENY
+ }
+ }
+
+ override fun toString(): String = "Random($probability)"
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt
new file mode 100644
index 00000000..4c309085
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt
@@ -0,0 +1,62 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+import com.atlarge.opendc.workflows.workload.Task
+import kotlin.random.Random
+
+/**
+ * A [TaskOrderPolicy] that orders the tasks randomly.
+ */
+object RandomTaskOrderPolicy : TaskOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> =
+ object : Comparator<TaskState>, StageWorkflowSchedulerListener {
+ private val random = Random(123)
+ private val ids = HashMap<Task, Int>()
+
+ init {
+ scheduler.addListener(this)
+ }
+
+ override fun taskReady(task: TaskState) {
+ ids[task.task] = random.nextInt()
+ }
+
+ override fun taskFinished(task: TaskState) {
+ ids.remove(task.task)
+ }
+
+ override fun compare(o1: TaskState, o2: TaskState): Int {
+ return compareValuesBy(o1, o2) { ids.getValue(it.task) }
+ }
+ }
+
+ override fun toString(): String {
+ return "Random"
+ }
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt
new file mode 100644
index 00000000..a261965f
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt
@@ -0,0 +1,41 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.StageWorkflowService
+import com.atlarge.opendc.workflows.service.TaskState
+
+/**
+ * A [TaskOrderPolicy] that orders tasks based on the order of arrival in the queue.
+ */
+data class SubmissionTimeTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy {
+ override fun invoke(scheduler: StageWorkflowService) = compareBy<TaskState> {
+ it.job.submittedAt.let { if (ascending) it else -it }
+ }
+
+ override fun toString(): String {
+ return "Submission-Time(${if (ascending) "asc" else "desc"})"
+ }
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt
new file mode 100644
index 00000000..72a7fdd0
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt
@@ -0,0 +1,72 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.TaskState
+import com.atlarge.opendc.workflows.service.stage.StagePolicy
+
+/**
+ * A policy interface for determining the eligibility of tasks in a scheduling cycle.
+ */
+interface TaskEligibilityPolicy : StagePolicy<TaskEligibilityPolicy.Logic> {
+ interface Logic {
+ /**
+ * Determine whether the specified [TaskState] is eligible to be scheduled.
+ *
+ * @param task The task instance to schedule.
+ * @return The advice for marking the task.
+ */
+ operator fun invoke(task: TaskState): Advice
+ }
+
+ /**
+ * The advice given to the scheduler by an admission policy.
+ *
+ * @property admit A flag to indicate to the scheduler that the job should be admitted.
+ * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait
+ * for the next scheduling cycle.
+ */
+ enum class Advice(val admit: Boolean, val stop: Boolean) {
+ /**
+ * Admit the current job to the scheduling queue and continue admitting jobs.
+ */
+ ADMIT(true, false),
+
+ /**
+ * Admit the current job to the scheduling queue and stop admitting jobs.
+ */
+ ADMIT_LAST(true, true),
+
+ /**
+ * Deny the current job, but continue admitting jobs.
+ */
+ DENY(false, false),
+
+ /**
+ * Deny the current job and also stop admitting jobs.
+ */
+ STOP(false, true)
+ }
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskOrderPolicy.kt
new file mode 100644
index 00000000..bfdc7924
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskOrderPolicy.kt
@@ -0,0 +1,34 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.service.stage.task
+
+import com.atlarge.opendc.workflows.service.TaskState
+import com.atlarge.opendc.workflows.service.stage.StagePolicy
+
+/**
+ * This interface represents the **T2** stage of the Reference Architecture for Datacenter Schedulers and provides the
+ * scheduler with a sorted list of tasks to schedule.
+ */
+interface TaskOrderPolicy : StagePolicy<Comparator<TaskState>>
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt
new file mode 100644
index 00000000..02969d8a
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt
@@ -0,0 +1,52 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.workload
+
+import com.atlarge.opendc.core.User
+import com.atlarge.opendc.core.workload.Workload
+import java.util.UUID
+
+/**
+ * A workload that represents a directed acyclic graph (DAG) of tasks with control and data dependencies between tasks.
+ *
+ * @property uid A unique identified of this workflow.
+ * @property name The name of this workflow.
+ * @property owner The owner of the workflow.
+ * @property tasks The tasks that are part of this workflow.
+ * @property metadata Additional metadata for the job.
+ */
+data class Job(
+ override val uid: UUID,
+ override val name: String,
+ override val owner: User,
+ val tasks: Set<Task>,
+ val metadata: Map<String, Any> = emptyMap()
+) : Workload {
+ override fun equals(other: Any?): Boolean = other is Job && uid == other.uid
+
+ override fun hashCode(): Int = uid.hashCode()
+
+ override fun toString(): String = "Job(uid=$uid, name=$name, tasks=${tasks.size}, metadata=$metadata)"
+}
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Metadata.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Metadata.kt
new file mode 100644
index 00000000..067f1179
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Metadata.kt
@@ -0,0 +1,30 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.workload
+
+/**
+ * Meta-data key for the deadline of a task.
+ */
+const val WORKFLOW_TASK_DEADLINE = "workflow:task:deadline"
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt
new file mode 100644
index 00000000..82521faa
--- /dev/null
+++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt
@@ -0,0 +1,50 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.workflows.workload
+
+import com.atlarge.opendc.compute.core.image.Image
+import com.atlarge.opendc.core.Identity
+import java.util.UUID
+
+/**
+ * A stage of a [Job].
+ *
+ * @property uid A unique identified of this task.
+ * @property name The name of this task.
+ * @property image The application image to run as part of this workflow task.
+ * @property dependencies The dependencies of this task in order for it to execute.
+ * @property metadata Additional metadata for this task.
+ */
+data class Task(
+ override val uid: UUID,
+ override val name: String,
+ val image: Image,
+ val dependencies: Set<Task>,
+ val metadata: Map<String, Any> = emptyMap()
+) : Identity {
+ override fun equals(other: Any?): Boolean = other is Task && uid == other.uid
+
+ override fun hashCode(): Int = uid.hashCode()
+}