summaryrefslogtreecommitdiff
path: root/opendc-workflows/src/main/kotlin/com/atlarge
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2019-05-15 22:37:35 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2019-05-15 22:43:59 +0200
commit7c8a3fd217418c6f956a9315eb13c2a31a9f85a0 (patch)
treee10534cd58f4e2b9a568fbf14e56166fa7585863 /opendc-workflows/src/main/kotlin/com/atlarge
parentb0b1577ace36022faec1a4ed0369f1c1271d5ccd (diff)
feat: Add initial version of OpenDC simulation model
This change adds the initial version of the port of the OpenDC simulation model to version 2.x of the simulator. The simulation model has been reworked to support immutability and event-driven simulation, with speed-ups up to 75x.
Diffstat (limited to 'opendc-workflows/src/main/kotlin/com/atlarge')
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowScheduler.kt58
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowSchedulerLogic.kt248
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowScheduler.kt47
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerLogic.kt55
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerMode.kt42
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowService.kt193
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/FifoJobSortingPolicy.kt37
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobAdmissionPolicy.kt48
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobSortingPolicy.kt44
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/NullJobAdmissionPolicy.kt40
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/RandomJobSortingPolicy.kt40
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FirstFitResourceSelectionPolicy.kt40
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FunctionalResourceDynamicFilterPolicy.kt43
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceDynamicFilterPolicy.kt49
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceSelectionPolicy.kt48
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FifoTaskSortingPolicy.kt37
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FunctionalTaskEligibilityPolicy.kt38
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/RandomTaskSortingPolicy.kt40
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskEligibilityPolicy.kt48
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskSortingPolicy.kt45
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Job.kt48
-rw-r--r--opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Task.kt48
22 files changed, 1336 insertions, 0 deletions
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowScheduler.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowScheduler.kt
new file mode 100644
index 00000000..45f3c4b0
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowScheduler.kt
@@ -0,0 +1,58 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows
+
+import com.atlarge.odcsim.ActorContext
+import com.atlarge.odcsim.TimerScheduler
+import com.atlarge.opendc.model.services.provisioning.ProvisioningResponse
+import com.atlarge.opendc.model.services.workflows.stages.job.JobAdmissionPolicy
+import com.atlarge.opendc.model.services.workflows.stages.job.JobSortingPolicy
+import com.atlarge.opendc.model.services.workflows.stages.resources.ResourceDynamicFilterPolicy
+import com.atlarge.opendc.model.services.workflows.stages.resources.ResourceSelectionPolicy
+import com.atlarge.opendc.model.services.workflows.stages.task.TaskEligibilityPolicy
+import com.atlarge.opendc.model.services.workflows.stages.task.TaskSortingPolicy
+
+/**
+ * A [WorkflowScheduler] that distributes work through a multi-stage process based on the Reference Architecture for
+ * Datacenter Scheduling.
+ */
+class StageWorkflowScheduler(
+ private val mode: WorkflowSchedulerMode,
+ private val jobAdmissionPolicy: JobAdmissionPolicy,
+ private val jobSortingPolicy: JobSortingPolicy,
+ private val taskEligibilityPolicy: TaskEligibilityPolicy,
+ private val taskSortingPolicy: TaskSortingPolicy,
+ private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy,
+ private val resourceSelectionPolicy: ResourceSelectionPolicy
+) : WorkflowScheduler {
+ override fun invoke(
+ ctx: ActorContext<WorkflowMessage>,
+ timers: TimerScheduler<WorkflowMessage>,
+ lease: ProvisioningResponse.Lease
+ ): WorkflowSchedulerLogic {
+ return StageWorkflowSchedulerLogic(ctx, timers, lease, mode, jobAdmissionPolicy,
+ jobSortingPolicy, taskEligibilityPolicy, taskSortingPolicy, resourceDynamicFilterPolicy, resourceSelectionPolicy)
+ }
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowSchedulerLogic.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowSchedulerLogic.kt
new file mode 100644
index 00000000..9d5f4bea
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/StageWorkflowSchedulerLogic.kt
@@ -0,0 +1,248 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows
+
+import com.atlarge.odcsim.ActorContext
+import com.atlarge.odcsim.ActorRef
+import com.atlarge.odcsim.TimerScheduler
+import com.atlarge.odcsim.unsafeCast
+import com.atlarge.opendc.model.resources.compute.MachineMessage
+import com.atlarge.opendc.model.resources.compute.MachineRef
+import com.atlarge.opendc.model.resources.compute.scheduling.ProcessState
+import com.atlarge.opendc.model.services.provisioning.ProvisioningResponse
+import com.atlarge.opendc.model.services.resources.HostView
+import com.atlarge.opendc.model.services.workflows.stages.job.JobAdmissionPolicy
+import com.atlarge.opendc.model.services.workflows.stages.job.JobSortingPolicy
+import com.atlarge.opendc.model.services.workflows.stages.resources.ResourceDynamicFilterPolicy
+import com.atlarge.opendc.model.services.workflows.stages.resources.ResourceSelectionPolicy
+import com.atlarge.opendc.model.services.workflows.stages.task.TaskEligibilityPolicy
+import com.atlarge.opendc.model.services.workflows.stages.task.TaskSortingPolicy
+import com.atlarge.opendc.model.workload.application.Application
+import com.atlarge.opendc.model.workload.application.Pid
+import com.atlarge.opendc.model.workload.workflow.Job
+import com.atlarge.opendc.model.workload.workflow.Task
+
+/**
+ * Logic of the [StageWorkflowScheduler].
+ */
+class StageWorkflowSchedulerLogic(
+ ctx: ActorContext<WorkflowMessage>,
+ timers: TimerScheduler<WorkflowMessage>,
+ lease: ProvisioningResponse.Lease,
+ private val mode: WorkflowSchedulerMode,
+ private val jobAdmissionPolicy: JobAdmissionPolicy,
+ private val jobSortingPolicy: JobSortingPolicy,
+ private val taskEligibilityPolicy: TaskEligibilityPolicy,
+ private val taskSortingPolicy: TaskSortingPolicy,
+ private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy,
+ private val resourceSelectionPolicy: ResourceSelectionPolicy
+) : WorkflowSchedulerLogic(ctx, timers, lease) {
+
+ /**
+ * The incoming jobs ready to be processed by the scheduler.
+ */
+ internal val incomingJobs: MutableSet<JobView> = mutableSetOf()
+
+ /**
+ * The active jobs in the system.
+ */
+ internal val activeJobs: MutableSet<JobView> = mutableSetOf()
+
+ /**
+ * The running tasks by [Pid].
+ */
+ internal val taskByPid = mutableMapOf<Pid, TaskView>()
+
+ /**
+ * The available processor cores on the leased machines.
+ */
+ internal val machineCores: MutableMap<HostView, Int> = HashMap()
+
+ init {
+ lease.hosts.forEach { machineCores[it] = it.host.cores.count() }
+ }
+
+ override fun submit(job: Job, handler: ActorRef<WorkflowEvent>) {
+ // J1 Incoming Jobs
+ val jobInstance = JobView(job, handler)
+ val instances = job.tasks.associateWith {
+ TaskView(jobInstance, it)
+ }
+
+ for ((task, instance) in instances) {
+ instance.dependencies.addAll(task.dependencies.map { instances[it]!! })
+ task.dependencies.forEach {
+ instances[it]!!.dependents.add(instance)
+ }
+
+ // If the task has no dependency, it is a root task and can immediately be evaluated
+ if (instance.isRoot) {
+ instance.state = ProcessState.READY
+ }
+ }
+
+ jobInstance.tasks = instances.values.toMutableSet()
+ incomingJobs += jobInstance
+ ctx.send(handler, WorkflowEvent.JobSubmitted(ctx.self, job, ctx.time))
+ requestCycle()
+ }
+
+ /**
+ * Indicate to the scheduler that a scheduling cycle is needed.
+ */
+ private fun requestCycle() {
+ when (mode) {
+ is WorkflowSchedulerMode.Interactive -> {
+ schedule()
+ }
+ is WorkflowSchedulerMode.Batch -> {
+ timers.after(mode, mode.quantum) {
+ schedule()
+ }
+ }
+ }
+ }
+
+ /**
+ * Perform a scheduling cycle immediately.
+ */
+ override fun schedule() {
+ // J2 Create list of eligible jobs
+ jobAdmissionPolicy.startCycle(this)
+ val eligibleJobs = incomingJobs.filter { jobAdmissionPolicy.shouldAdmit(this, it) }
+ for (jobInstance in eligibleJobs) {
+ incomingJobs -= jobInstance
+ activeJobs += jobInstance
+ ctx.send(jobInstance.broker, WorkflowEvent.JobStarted(ctx.self, jobInstance.job, ctx.time))
+ }
+
+ // J3 Sort jobs on criterion
+ val sortedJobs = jobSortingPolicy(this, activeJobs)
+
+ // J4 Per job
+ for (jobInstance in sortedJobs) {
+ // T1 Create list of eligible tasks
+ taskEligibilityPolicy.startCycle(this)
+ val eligibleTasks = jobInstance.tasks.filter { taskEligibilityPolicy.isEligible(this, it) }
+
+ // T2 Sort tasks on criterion
+ val sortedTasks = taskSortingPolicy(this, eligibleTasks)
+
+ // T3 Per task
+ for (instance in sortedTasks) {
+ val hosts = resourceDynamicFilterPolicy(this, lease.hosts, instance)
+ val host = resourceSelectionPolicy.select(this, hosts, instance)
+
+ if (host != null) {
+ // T4 Submit task to machine
+ ctx.send(host.ref, MachineMessage.Submit(instance.task.application, instance, ctx.self.unsafeCast()))
+ instance.host = host
+ instance.state = ProcessState.RUNNING // Assume the application starts running
+ machineCores.merge(host, instance.task.application.cores, Int::minus)
+ } else {
+ return
+ }
+ }
+ }
+ }
+
+ override fun onSubmission(instance: MachineRef, application: Application, key: Any, pid: Pid) {
+ val task = key as TaskView
+ task.pid = pid
+ taskByPid[pid] = task
+ ctx.send(task.job.broker, WorkflowEvent.TaskStarted(ctx.self, task.job.job, task.task, ctx.time))
+ }
+
+ override fun onTermination(instance: MachineRef, pid: Pid, status: Int) {
+ val task = taskByPid.remove(pid)!!
+ val job = task.job
+ task.state = ProcessState.TERMINATED
+ job.tasks.remove(task)
+ machineCores.merge(task.host!!, task.task.application.cores, Int::plus)
+ ctx.send(job.broker, WorkflowEvent.TaskFinished(ctx.self, job.job, task.task, status, ctx.time))
+
+ if (job.isFinished) {
+ activeJobs -= job
+ ctx.send(job.broker, WorkflowEvent.JobFinished(ctx.self, job.job, ctx.time))
+ }
+
+ requestCycle()
+ }
+
+ class JobView(val job: Job, val broker: ActorRef<WorkflowEvent>) {
+ /**
+ * A flag to indicate whether this job is finished.
+ */
+ val isFinished: Boolean
+ get() = tasks.isEmpty()
+
+ lateinit var tasks: MutableSet<TaskView>
+ }
+
+ class TaskView(val job: JobView, val task: Task) {
+ /**
+ * The dependencies of this task.
+ */
+ val dependencies = HashSet<TaskView>()
+
+ /**
+ * The dependents of this task.
+ */
+ val dependents = HashSet<TaskView>()
+
+ /**
+ * A flag to indicate whether this workflow task instance is a workflow root.
+ */
+ val isRoot: Boolean
+ get() = dependencies.isEmpty()
+
+ var state: ProcessState = ProcessState.CREATED
+ set(value) {
+ field = value
+
+ // Mark the process as terminated in the graph
+ if (value == ProcessState.TERMINATED) {
+ markTerminated()
+ }
+ }
+
+ var pid: Pid? = null
+
+ var host: HostView? = null
+
+ /**
+ * Mark the specified [TaskView] as terminated.
+ */
+ private fun markTerminated() {
+ for (dependent in dependents) {
+ dependent.dependencies.remove(this)
+
+ if (dependent.isRoot) {
+ dependent.state = ProcessState.READY
+ }
+ }
+ }
+ }
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowScheduler.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowScheduler.kt
new file mode 100644
index 00000000..c81085d4
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowScheduler.kt
@@ -0,0 +1,47 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows
+
+import com.atlarge.odcsim.ActorContext
+import com.atlarge.odcsim.TimerScheduler
+import com.atlarge.opendc.model.services.provisioning.ProvisioningResponse
+
+/**
+ * A factory interface for constructing a [WorkflowSchedulerLogic].
+ */
+interface WorkflowScheduler {
+ /**
+ * Construct a [WorkflowSchedulerLogic] in the given [ActorContext].
+ *
+ * @param ctx The context in which the scheduler runs.
+ * @param timers The timer scheduler to use.
+ * @param lease The resource lease to use.
+ */
+ operator fun invoke(
+ ctx: ActorContext<WorkflowMessage>,
+ timers: TimerScheduler<WorkflowMessage>,
+ lease: ProvisioningResponse.Lease
+ ): WorkflowSchedulerLogic
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerLogic.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerLogic.kt
new file mode 100644
index 00000000..09cb0ef9
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerLogic.kt
@@ -0,0 +1,55 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows
+
+import com.atlarge.odcsim.ActorContext
+import com.atlarge.odcsim.ActorRef
+import com.atlarge.odcsim.TimerScheduler
+import com.atlarge.opendc.model.resources.compute.scheduling.ProcessObserver
+import com.atlarge.opendc.model.services.provisioning.ProvisioningResponse
+import com.atlarge.opendc.model.workload.workflow.Job
+
+/**
+ * A workflow scheduler interface that schedules jobs across machines.
+ *
+ * @property ctx The context in which the scheduler runs.
+ * @property timers The timer scheduler to use.
+ * @property lease The resource lease to use.
+ */
+abstract class WorkflowSchedulerLogic(
+ protected val ctx: ActorContext<WorkflowMessage>,
+ protected val timers: TimerScheduler<WorkflowMessage>,
+ protected val lease: ProvisioningResponse.Lease
+) : ProcessObserver {
+ /**
+ * Submit the specified workflow for scheduling.
+ */
+ abstract fun submit(job: Job, handler: ActorRef<WorkflowEvent>)
+
+ /**
+ * Trigger an immediate scheduling cycle.
+ */
+ abstract fun schedule()
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerMode.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerMode.kt
new file mode 100644
index 00000000..0a4b40e5
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowSchedulerMode.kt
@@ -0,0 +1,42 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows
+
+import com.atlarge.odcsim.Duration
+
+/**
+ * The operating mode of a workflow scheduler.
+ */
+sealed class WorkflowSchedulerMode {
+ /**
+ * An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received.
+ */
+ object Interactive : WorkflowSchedulerMode()
+
+ /**
+ * A batch scheduler triggers a scheduling cycle every time quantum if needed.
+ */
+ data class Batch(val quantum: Duration) : WorkflowSchedulerMode()
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowService.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowService.kt
new file mode 100644
index 00000000..72397203
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/WorkflowService.kt
@@ -0,0 +1,193 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows
+
+import com.atlarge.odcsim.ActorRef
+import com.atlarge.odcsim.Behavior
+import com.atlarge.odcsim.Instant
+import com.atlarge.odcsim.TimerScheduler
+import com.atlarge.odcsim.coroutines.actorContext
+import com.atlarge.odcsim.coroutines.dsl.ask
+import com.atlarge.odcsim.coroutines.suspending
+import com.atlarge.odcsim.receiveMessage
+import com.atlarge.odcsim.same
+import com.atlarge.odcsim.unhandled
+import com.atlarge.odcsim.withTimers
+import com.atlarge.opendc.model.Zone
+import com.atlarge.opendc.model.ZoneRef
+import com.atlarge.opendc.model.find
+import com.atlarge.opendc.model.resources.compute.MachineEvent
+import com.atlarge.opendc.model.services.AbstractService
+import com.atlarge.opendc.model.services.Service
+import com.atlarge.opendc.model.services.ServiceProvider
+import com.atlarge.opendc.model.services.provisioning.ProvisioningMessage
+import com.atlarge.opendc.model.services.provisioning.ProvisioningResponse
+import com.atlarge.opendc.model.services.provisioning.ProvisioningService
+import com.atlarge.opendc.model.workload.workflow.Job
+import com.atlarge.opendc.model.workload.workflow.Task
+import java.util.UUID
+
+/**
+ * A service for cloud workflow management.
+ *
+ * The workflow scheduler is modelled after the Reference Architecture for Datacenter Scheduling by Andreadis et al.
+ */
+class WorkflowService(val scheduler: WorkflowScheduler) : ServiceProvider {
+ override val uid: UUID = UUID.randomUUID()
+ override val name: String = "workflows"
+ override val provides: Set<Service<*>> = setOf(WorkflowService)
+ override val dependencies: Set<Service<*>> = setOf(ProvisioningService)
+
+ /**
+ * Build the runtime [Behavior] for the workflow service, responding to messages of shape [WorkflowMessage].
+ */
+ override fun invoke(zone: Zone, ref: ZoneRef): Behavior<WorkflowMessage> = suspending { ctx ->
+ val provisioner = ref.find(ProvisioningService)
+ // Wait for 0.1 sec before asking the provisioner to allow it to initialize. Will return empty response if asked
+ // immediately.
+ val lease: ProvisioningResponse.Lease = actorContext<ProvisioningResponse>().ask(provisioner, after = 0.1) { ProvisioningMessage.Request(Int.MAX_VALUE, it) }
+
+ withTimers<Any> { timers ->
+ @Suppress("UNCHECKED_CAST")
+ val schedulerLogic = scheduler(ctx, timers as TimerScheduler<WorkflowMessage>, lease)
+
+ receiveMessage { msg ->
+ when (msg) {
+ is WorkflowMessage.Submit -> {
+ schedulerLogic.submit(msg.job, msg.broker)
+ same()
+ }
+ is MachineEvent.Submitted -> {
+ schedulerLogic.onSubmission(msg.instance, msg.application, msg.key, msg.pid)
+ same()
+ }
+ is MachineEvent.Terminated -> {
+ schedulerLogic.onTermination(msg.instance, msg.pid, msg.status)
+ same()
+ }
+ else ->
+ unhandled()
+ }
+ }
+ }.narrow()
+ }
+
+ companion object : AbstractService<WorkflowMessage>(UUID.randomUUID(), "workflows")
+}
+
+/**
+ * A reference to the workflow service instance.
+ */
+typealias WorkflowServiceRef = ActorRef<WorkflowMessage>
+
+/**
+ * A message protocol for communicating to the workflow service.
+ */
+sealed class WorkflowMessage {
+ /**
+ * Submit the specified [Job] to the workflow service for scheduling.
+ *
+ * @property job The workflow to submit for scheduling.
+ * @property broker The broker that has submitted this workflow on behalf of a user and that needs to be kept
+ * up-to-date.
+ */
+ data class Submit(val job: Job, val broker: ActorRef<WorkflowEvent>) : WorkflowMessage()
+}
+
+/**
+ * A message protocol used by the workflow service to respond to [WorkflowMessage]s.
+ */
+sealed class WorkflowEvent {
+ /**
+ * Indicate that the specified [Job] was submitted to the workflow service.
+ *
+ * @property service The reference to the service the job was submitted to.
+ * @property job The job that has been submitted.
+ * @property time A timestamp of the moment the job was received.
+ */
+ data class JobSubmitted(
+ val service: WorkflowServiceRef,
+ val job: Job,
+ val time: Instant
+ ) : WorkflowEvent()
+
+ /**
+ * Indicate that the specified [Job] has become active.
+ *
+ * @property service The reference to the service the job was submitted to.
+ * @property job The job that has been submitted.
+ * @property time A timestamp of the moment the job started.
+ */
+ data class JobStarted(
+ val service: WorkflowServiceRef,
+ val job: Job,
+ val time: Instant
+ ) : WorkflowEvent()
+
+ /**
+ * Indicate that the specified [Task] has started processing.
+ *
+ * @property service The reference to the service the job was submitted to.
+ * @property job The job that contains this task.
+ * @property task The task that has started processing.
+ * @property time A timestamp of the moment the task started.
+ */
+ data class TaskStarted(
+ val service: WorkflowServiceRef,
+ val job: Job,
+ val task: Task,
+ val time: Instant
+ ) : WorkflowEvent()
+
+ /**
+ * Indicate that the specified [Task] has started processing.
+ *
+ * @property service The reference to the service the job was submitted to.
+ * @property job The job that contains this task.
+ * @property task The task that has started processing.
+ * @property status The exit code of the task, where zero means successful.
+ * @property time A timestamp of the moment the task finished.
+ */
+ data class TaskFinished(
+ val service: WorkflowServiceRef,
+ val job: Job,
+ val task: Task,
+ val status: Int,
+ val time: Instant
+ ) : WorkflowEvent()
+
+ /**
+ * Indicate that the specified [Job] has finished processing.
+ *
+ * @property service The reference to the service the job was submitted to.
+ * @property job The job that has finished processing.
+ * @property time A timestamp of the moment the task finished.
+ */
+ data class JobFinished(
+ val service: WorkflowServiceRef,
+ val job: Job,
+ val time: Instant
+ ) : WorkflowEvent()
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/FifoJobSortingPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/FifoJobSortingPolicy.kt
new file mode 100644
index 00000000..c58d2210
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/FifoJobSortingPolicy.kt
@@ -0,0 +1,37 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows.stages.job
+
+import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic
+
+/**
+ * The [FifoJobSortingPolicy] sorts tasks based on the order of arrival in the queue.
+ */
+class FifoJobSortingPolicy : JobSortingPolicy {
+ override fun invoke(
+ scheduler: StageWorkflowSchedulerLogic,
+ jobs: Collection<StageWorkflowSchedulerLogic.JobView>
+ ): List<StageWorkflowSchedulerLogic.JobView> = jobs.toList()
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobAdmissionPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobAdmissionPolicy.kt
new file mode 100644
index 00000000..be60fa9b
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobAdmissionPolicy.kt
@@ -0,0 +1,48 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows.stages.job
+
+import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic
+
+/**
+ * A policy interface for admitting [StageWorkflowSchedulerLogic.JobView]s to a scheduling cycle.
+ */
+interface JobAdmissionPolicy {
+ /**
+ * A method that is invoked at the start of each scheduling cycle.
+ *
+ * @param scheduler The scheduler that started the cycle.
+ */
+ fun startCycle(scheduler: StageWorkflowSchedulerLogic) {}
+
+ /**
+ * Determine whether the specified [StageWorkflowSchedulerLogic.JobView] should be admitted to the scheduling cycle.
+ *
+ * @param scheduler The scheduler that should admit or reject the job.
+ * @param job The workflow that has been submitted.
+ * @return `true` if the workflow may be admitted to the scheduling cycle, `false` otherwise.
+ */
+ fun shouldAdmit(scheduler: StageWorkflowSchedulerLogic, job: StageWorkflowSchedulerLogic.JobView): Boolean
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobSortingPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobSortingPolicy.kt
new file mode 100644
index 00000000..3af88aa7
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/JobSortingPolicy.kt
@@ -0,0 +1,44 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows.stages.job
+
+import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic
+
+/**
+ * A policy interface for ordering admitted workflows in the scheduling queue.
+ */
+interface JobSortingPolicy {
+ /**
+ * Sort the given collection of jobs on a given criterion.
+ *
+ * @param scheduler The scheduler that started the cycle.
+ * @param jobs The collection of tasks that should be sorted.
+ * @return The sorted list of jobs.
+ */
+ operator fun invoke(
+ scheduler: StageWorkflowSchedulerLogic,
+ jobs: Collection<StageWorkflowSchedulerLogic.JobView>
+ ): List<StageWorkflowSchedulerLogic.JobView>
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/NullJobAdmissionPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/NullJobAdmissionPolicy.kt
new file mode 100644
index 00000000..5436a1a1
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/NullJobAdmissionPolicy.kt
@@ -0,0 +1,40 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows.stages.job
+
+import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic
+
+/**
+ * A [JobAdmissionPolicy] that admits all jobs.
+ */
+object NullJobAdmissionPolicy : JobAdmissionPolicy {
+ /**
+ * Admit every submitted job.
+ */
+ override fun shouldAdmit(
+ scheduler: StageWorkflowSchedulerLogic,
+ job: StageWorkflowSchedulerLogic.JobView
+ ): Boolean = true
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/RandomJobSortingPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/RandomJobSortingPolicy.kt
new file mode 100644
index 00000000..7da59692
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/job/RandomJobSortingPolicy.kt
@@ -0,0 +1,40 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows.stages.job
+
+import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic
+import kotlin.random.Random
+
+/**
+ * The [RandomJobSortingPolicy] sorts tasks randomly.
+ *
+ * @property random The [Random] instance to use when sorting the list of tasks.
+ */
+class RandomJobSortingPolicy(private val random: Random = Random.Default) : JobSortingPolicy {
+ override fun invoke(
+ scheduler: StageWorkflowSchedulerLogic,
+ jobs: Collection<StageWorkflowSchedulerLogic.JobView>
+ ): List<StageWorkflowSchedulerLogic.JobView> = jobs.shuffled(random)
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FirstFitResourceSelectionPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FirstFitResourceSelectionPolicy.kt
new file mode 100644
index 00000000..afaf075d
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FirstFitResourceSelectionPolicy.kt
@@ -0,0 +1,40 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows.stages.resources
+
+import com.atlarge.opendc.model.services.resources.HostView
+import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic
+
+/**
+ * A [ResourceSelectionPolicy] that selects the first machine that is available.
+ */
+class FirstFitResourceSelectionPolicy : ResourceSelectionPolicy {
+ override fun select(
+ scheduler: StageWorkflowSchedulerLogic,
+ machines: List<HostView>,
+ task: StageWorkflowSchedulerLogic.TaskView
+ ): HostView? =
+ machines.firstOrNull()
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FunctionalResourceDynamicFilterPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FunctionalResourceDynamicFilterPolicy.kt
new file mode 100644
index 00000000..3f28a040
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/FunctionalResourceDynamicFilterPolicy.kt
@@ -0,0 +1,43 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows.stages.resources
+
+import com.atlarge.opendc.model.services.resources.HostView
+import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic
+
+/**
+ * A [ResourceDynamicFilterPolicy] based on the amount of cores available on the machine and the cores required for
+ * the task.
+ */
+class FunctionalResourceDynamicFilterPolicy : ResourceDynamicFilterPolicy {
+ override fun invoke(
+ scheduler: StageWorkflowSchedulerLogic,
+ machines: List<HostView>,
+ task: StageWorkflowSchedulerLogic.TaskView
+ ): List<HostView> {
+ return machines
+ .filter { scheduler.machineCores[it] ?: 0 >= task.task.application.cores }
+ }
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceDynamicFilterPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceDynamicFilterPolicy.kt
new file mode 100644
index 00000000..f73c0d9e
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceDynamicFilterPolicy.kt
@@ -0,0 +1,49 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows.stages.resources
+
+import com.atlarge.opendc.model.services.resources.HostView
+import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic
+
+/**
+ * This interface represents the **R4** stage of the Reference Architecture for Schedulers and acts as a filter yielding
+ * a list of resources with sufficient resource-capacities, based on fixed or dynamic requirements, and on predicted or
+ * monitored information about processing unit availability, memory occupancy, etc.
+ */
+interface ResourceDynamicFilterPolicy {
+ /**
+ * Filter the list of machines based on dynamic information.
+ *
+ * @param scheduler The scheduler to filter the machines.
+ * @param machines The list of machines in the system.
+ * @param task The task that is to be scheduled.
+ * @return The machines on which the task can be scheduled.
+ */
+ operator fun invoke(
+ scheduler: StageWorkflowSchedulerLogic,
+ machines: List<HostView>,
+ task: StageWorkflowSchedulerLogic.TaskView
+ ): List<HostView>
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceSelectionPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceSelectionPolicy.kt
new file mode 100644
index 00000000..a9172a53
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/resources/ResourceSelectionPolicy.kt
@@ -0,0 +1,48 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows.stages.resources
+
+import com.atlarge.opendc.model.services.resources.HostView
+import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic
+
+/**
+ * This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected
+ * task with a (set of) resource(s), using policies such as First-Fit, Worst-Fit, and Best-Fit.
+ */
+interface ResourceSelectionPolicy {
+ /**
+ * Select a machine on which the task should be scheduled.
+ *
+ * @param scheduler The scheduler to select the machine.
+ * @param machines The list of machines in the system.
+ * @param task The task that is to be scheduled.
+ * @return The selected machine or `null` if no machine could be found.
+ */
+ fun select(
+ scheduler: StageWorkflowSchedulerLogic,
+ machines: List<HostView>,
+ task: StageWorkflowSchedulerLogic.TaskView
+ ): HostView?
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FifoTaskSortingPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FifoTaskSortingPolicy.kt
new file mode 100644
index 00000000..2eb2f6fb
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FifoTaskSortingPolicy.kt
@@ -0,0 +1,37 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows.stages.task
+
+import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic
+
+/**
+ * The [FifoTaskSortingPolicy] sorts tasks based on the order of arrival in the queue.
+ */
+class FifoTaskSortingPolicy : TaskSortingPolicy {
+ override fun invoke(
+ scheduler: StageWorkflowSchedulerLogic,
+ tasks: Collection<StageWorkflowSchedulerLogic.TaskView>
+ ): List<StageWorkflowSchedulerLogic.TaskView> = tasks.toList()
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FunctionalTaskEligibilityPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FunctionalTaskEligibilityPolicy.kt
new file mode 100644
index 00000000..2e7cc8c1
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/FunctionalTaskEligibilityPolicy.kt
@@ -0,0 +1,38 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows.stages.task
+
+import com.atlarge.opendc.model.resources.compute.scheduling.ProcessState
+import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic
+
+/**
+ * A [TaskEligibilityPolicy] that marks tasks as eligible if they are tasks roots within the job.
+ */
+class FunctionalTaskEligibilityPolicy : TaskEligibilityPolicy {
+ override fun isEligible(
+ scheduler: StageWorkflowSchedulerLogic,
+ task: StageWorkflowSchedulerLogic.TaskView
+ ): Boolean = task.state == ProcessState.READY
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/RandomTaskSortingPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/RandomTaskSortingPolicy.kt
new file mode 100644
index 00000000..69462e41
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/RandomTaskSortingPolicy.kt
@@ -0,0 +1,40 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows.stages.task
+
+import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic
+import kotlin.random.Random
+
+/**
+ * The [RandomTaskSortingPolicy] sorts tasks randomly.
+ *
+ * @property random The [Random] instance to use when sorting the list of tasks.
+ */
+class RandomTaskSortingPolicy(private val random: Random = Random.Default) : TaskSortingPolicy {
+ override fun invoke(
+ scheduler: StageWorkflowSchedulerLogic,
+ tasks: Collection<StageWorkflowSchedulerLogic.TaskView>
+ ): List<StageWorkflowSchedulerLogic.TaskView> = tasks.shuffled(random)
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskEligibilityPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskEligibilityPolicy.kt
new file mode 100644
index 00000000..c3c7e725
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskEligibilityPolicy.kt
@@ -0,0 +1,48 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows.stages.task
+
+import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic
+
+/**
+ * A policy interface for determining the eligibility of tasks in a scheduling cycle.
+ */
+interface TaskEligibilityPolicy {
+ /**
+ * A method that is invoked at the start of each scheduling cycle.
+ *
+ * @param scheduler The scheduler that started the cycle.
+ */
+ fun startCycle(scheduler: StageWorkflowSchedulerLogic) {}
+
+ /**
+ * Determine whether the specified [StageWorkflowSchedulerLogic.TaskView] is eligible to be scheduled.
+ *
+ * @param scheduler The scheduler that is determining whether the task is eligible.
+ * @param task The task instance to schedule.
+ * @return `true` if the task eligible to be scheduled, `false` otherwise.
+ */
+ fun isEligible(scheduler: StageWorkflowSchedulerLogic, task: StageWorkflowSchedulerLogic.TaskView): Boolean
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskSortingPolicy.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskSortingPolicy.kt
new file mode 100644
index 00000000..3f296d0e
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/services/workflows/stages/task/TaskSortingPolicy.kt
@@ -0,0 +1,45 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.services.workflows.stages.task
+
+import com.atlarge.opendc.model.services.workflows.StageWorkflowSchedulerLogic
+
+/**
+ * This interface represents the **T2** stage of the Reference Architecture for Datacenter Schedulers and provides the
+ * scheduler with a sorted list of tasks to schedule.
+ */
+interface TaskSortingPolicy {
+ /**
+ * Sort the given list of tasks on a given criterion.
+ *
+ * @param scheduler The scheduler that is sorting the tasks.
+ * @param tasks The collection of tasks that should be sorted.
+ * @return The sorted list of tasks.
+ */
+ operator fun invoke(
+ scheduler: StageWorkflowSchedulerLogic,
+ tasks: Collection<StageWorkflowSchedulerLogic.TaskView>
+ ): List<StageWorkflowSchedulerLogic.TaskView>
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Job.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Job.kt
new file mode 100644
index 00000000..dd72cf6d
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Job.kt
@@ -0,0 +1,48 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.workload.workflow
+
+import com.atlarge.opendc.model.User
+import com.atlarge.opendc.model.workload.Workload
+import java.util.UUID
+
+/**
+ * A workload that represents a directed acyclic graph (DAG) of tasks with control and data dependencies between tasks.
+ *
+ * @property uid A unique identified of this workflow.
+ * @property name The name of this workflow.
+ * @property owner The owner of the workflow.
+ * @property tasks The tasks that are part of this workflow.
+ */
+data class Job(
+ override val uid: UUID,
+ override val name: String,
+ override val owner: User,
+ val tasks: Set<Task>
+) : Workload {
+ override fun equals(other: Any?): Boolean = other is Job && uid == other.uid
+
+ override fun hashCode(): Int = uid.hashCode()
+}
diff --git a/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Task.kt b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Task.kt
new file mode 100644
index 00000000..0cc3fa0e
--- /dev/null
+++ b/opendc-workflows/src/main/kotlin/com/atlarge/opendc/model/workload/workflow/Task.kt
@@ -0,0 +1,48 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2019 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.model.workload.workflow
+
+import com.atlarge.opendc.model.Identity
+import com.atlarge.opendc.model.workload.application.Application
+import java.util.UUID
+
+/**
+ * A stage of a [Job].
+ *
+ * @property uid A unique identified of this task.
+ * @property name The name of this task.
+ * @property application The application to run as part of this workflow task.
+ * @property dependencies The dependencies of this task in order for it to execute.
+ */
+data class Task(
+ override val uid: UUID,
+ override val name: String,
+ val application: Application,
+ val dependencies: Set<Task>
+) : Identity {
+ override fun equals(other: Any?): Boolean = other is Task && uid == other.uid
+
+ override fun hashCode(): Int = uid.hashCode()
+}