diff options
| author | Georgios Andreadis <info@gandreadis.com> | 2020-06-29 16:04:57 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-08-24 16:18:13 +0200 |
| commit | 46b06fb446e79c390c01953d31d700b8e73da24d (patch) | |
| tree | b2329630ebf2c90d297ba0d3046ccd558d12d042 /simulator/opendc/opendc-workflows/src/main | |
| parent | ebcacf96fbc1cd16a91523f95dd01db046fb7f90 (diff) | |
Prepare simulator repository for monorepo
This change prepares the simulator Git repository for the monorepo residing at
https://github.com/atlarge-research.com/opendc. To accomodate for this, we
move all files into a simulator subdirectory.
Diffstat (limited to 'simulator/opendc/opendc-workflows/src/main')
42 files changed, 2558 insertions, 0 deletions
diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt new file mode 100644 index 00000000..1cb2de97 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/JobState.kt @@ -0,0 +1,41 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +import com.atlarge.opendc.workflows.workload.Job + +class JobState(val job: Job, val submittedAt: Long) { + /** + * A flag to indicate whether this job is finished. + */ + val isFinished: Boolean + get() = tasks.isEmpty() + + val tasks: MutableSet<TaskState> = mutableSetOf() + + override fun equals(other: Any?): Boolean = other is JobState && other.job == job + + override fun hashCode(): Int = job.hashCode() +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerListener.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerListener.kt new file mode 100644 index 00000000..73c3e752 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerListener.kt @@ -0,0 +1,39 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +interface StageWorkflowSchedulerListener { + fun cycleStarted(scheduler: StageWorkflowService) {} + fun cycleFinished(scheduler: StageWorkflowService) {} + + fun jobSubmitted(job: JobState) {} + fun jobStarted(job: JobState) {} + fun jobFinished(job: JobState) {} + + fun taskReady(task: TaskState) {} + fun taskAssigned(task: TaskState) {} + fun taskStarted(task: TaskState) {} + fun taskFinished(task: TaskState) {} +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt new file mode 100644 index 00000000..7a20363c --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -0,0 +1,361 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.flow.EventFlow +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerEvent +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy +import com.atlarge.opendc.workflows.service.stage.job.JobOrderPolicy +import com.atlarge.opendc.workflows.service.stage.resource.ResourceFilterPolicy +import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPolicy +import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy +import com.atlarge.opendc.workflows.service.stage.task.TaskOrderPolicy +import com.atlarge.opendc.workflows.workload.Job +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import java.util.PriorityQueue +import java.util.Queue +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext + +/** + * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for + * Datacenter Scheduling. + */ +class StageWorkflowService( + private val domain: Domain, + private val provisioningService: ProvisioningService, + mode: WorkflowSchedulerMode, + jobAdmissionPolicy: JobAdmissionPolicy, + jobOrderPolicy: JobOrderPolicy, + taskEligibilityPolicy: TaskEligibilityPolicy, + taskOrderPolicy: TaskOrderPolicy, + resourceFilterPolicy: ResourceFilterPolicy, + resourceSelectionPolicy: ResourceSelectionPolicy +) : WorkflowService, CoroutineScope by domain { + + /** + * The incoming jobs ready to be processed by the scheduler. + */ + internal val incomingJobs: MutableSet<JobState> = linkedSetOf() + + /** + * The incoming tasks ready to be processed by the scheduler. + */ + internal val incomingTasks: MutableSet<TaskState> = linkedSetOf() + + /** + * The job queue. + */ + internal val jobQueue: Queue<JobState> + + /** + * The task queue. + */ + internal val taskQueue: Queue<TaskState> + + /** + * The active jobs in the system. + */ + internal val activeJobs: MutableSet<JobState> = mutableSetOf() + + /** + * The active tasks in the system. + */ + internal val activeTasks: MutableSet<TaskState> = mutableSetOf() + + /** + * The running tasks by [Server]. + */ + internal val taskByServer = mutableMapOf<Server, TaskState>() + + /** + * The nodes that are controlled by the service. + */ + internal lateinit var nodes: List<Node> + + /** + * The available nodes. + */ + internal val available: MutableSet<Node> = mutableSetOf() + + /** + * The maximum number of incoming jobs. + */ + private val throttleLimit: Int = 20000 + + /** + * The load of the system. + */ + internal val load: Double + get() = (available.size / nodes.size.toDouble()) + + /** + * The root listener of this scheduler. + */ + private val rootListener = object : StageWorkflowSchedulerListener { + /** + * The listeners to delegate to. + */ + val listeners = mutableSetOf<StageWorkflowSchedulerListener>() + + override fun cycleStarted(scheduler: StageWorkflowService) { + listeners.forEach { it.cycleStarted(scheduler) } + } + + override fun cycleFinished(scheduler: StageWorkflowService) { + listeners.forEach { it.cycleFinished(scheduler) } + } + + override fun jobSubmitted(job: JobState) { + listeners.forEach { it.jobSubmitted(job) } + } + + override fun jobStarted(job: JobState) { + listeners.forEach { it.jobStarted(job) } + } + + override fun jobFinished(job: JobState) { + listeners.forEach { it.jobFinished(job) } + } + + override fun taskReady(task: TaskState) { + listeners.forEach { it.taskReady(task) } + } + + override fun taskAssigned(task: TaskState) { + listeners.forEach { it.taskAssigned(task) } + } + + override fun taskStarted(task: TaskState) { + listeners.forEach { it.taskStarted(task) } + } + + override fun taskFinished(task: TaskState) { + listeners.forEach { it.taskFinished(task) } + } + } + + private val mode: WorkflowSchedulerMode.Logic + private val jobAdmissionPolicy: JobAdmissionPolicy.Logic + private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic + private val resourceFilterPolicy: ResourceFilterPolicy.Logic + private val resourceSelectionPolicy: Comparator<Node> + private val eventFlow = EventFlow<WorkflowEvent>() + + init { + domain.launch { + nodes = provisioningService.nodes().toList() + available.addAll(nodes) + } + + this.mode = mode(this) + this.jobAdmissionPolicy = jobAdmissionPolicy(this) + this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid }) + this.taskEligibilityPolicy = taskEligibilityPolicy(this) + this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid }) + this.resourceFilterPolicy = resourceFilterPolicy(this) + this.resourceSelectionPolicy = resourceSelectionPolicy(this) + } + + override val events: Flow<WorkflowEvent> = eventFlow + + override suspend fun submit(job: Job) = withContext(domain.coroutineContext) { + // J1 Incoming Jobs + val jobInstance = JobState(job, simulationContext.clock.millis()) + val instances = job.tasks.associateWith { + TaskState(jobInstance, it) + } + + for ((task, instance) in instances) { + instance.dependencies.addAll(task.dependencies.map { instances[it]!! }) + task.dependencies.forEach { + instances[it]!!.dependents.add(instance) + } + + // If the task has no dependency, it is a root task and can immediately be evaluated + if (instance.isRoot) { + instance.state = TaskStatus.READY + } + } + + instances.values.toCollection(jobInstance.tasks) + incomingJobs += jobInstance + rootListener.jobSubmitted(jobInstance) + + requestCycle() + } + + /** + * Indicate to the scheduler that a scheduling cycle is needed. + */ + private suspend fun requestCycle() = mode.requestCycle() + + /** + * Perform a scheduling cycle immediately. + */ + @OptIn(ExperimentalCoroutinesApi::class) + internal suspend fun schedule() { + // J2 Create list of eligible jobs + val iterator = incomingJobs.iterator() + while (iterator.hasNext()) { + val jobInstance = iterator.next() + val advice = jobAdmissionPolicy(jobInstance) + if (advice.stop) { + break + } else if (!advice.admit) { + continue + } + + iterator.remove() + jobQueue.add(jobInstance) + activeJobs += jobInstance + eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, simulationContext.clock.millis())) + rootListener.jobStarted(jobInstance) + } + + // J4 Per job + while (jobQueue.isNotEmpty()) { + val jobInstance = jobQueue.poll() + + // Edge-case: job has no tasks + if (jobInstance.isFinished) { + finishJob(jobInstance) + } + + // Add job roots to the scheduling queue + for (task in jobInstance.tasks) { + if (task.state != TaskStatus.READY) { + continue + } + + incomingTasks += task + rootListener.taskReady(task) + } + } + + // T1 Create list of eligible tasks + val taskIterator = incomingTasks.iterator() + while (taskIterator.hasNext()) { + val taskInstance = taskIterator.next() + val advice = taskEligibilityPolicy(taskInstance) + if (advice.stop) { + break + } else if (!advice.admit) { + continue + } + + taskIterator.remove() + taskQueue.add(taskInstance) + } + + // T3 Per task + while (taskQueue.isNotEmpty()) { + val instance = taskQueue.peek() + val host: Node? = available.firstOrNull() + + if (host != null) { + // T4 Submit task to machine + available -= host + instance.state = TaskStatus.ACTIVE + val newHost = provisioningService.deploy(host, instance.task.image) + val server = newHost.server!! + instance.host = newHost + taskByServer[server] = instance + server.events + .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) } + .launchIn(this) + + activeTasks += instance + taskQueue.poll() + rootListener.taskAssigned(instance) + } else { + break + } + } + } + + private suspend fun stateChanged(server: Server) { + when (server.state) { + ServerState.ACTIVE -> { + val task = taskByServer.getValue(server) + task.startedAt = simulationContext.clock.millis() + eventFlow.emit(WorkflowEvent.TaskStarted(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis())) + rootListener.taskStarted(task) + } + ServerState.SHUTOFF, ServerState.ERROR -> { + val task = taskByServer.remove(server) ?: throw IllegalStateException() + val job = task.job + task.state = TaskStatus.FINISHED + task.finishedAt = simulationContext.clock.millis() + job.tasks.remove(task) + available += task.host!! + activeTasks -= task + eventFlow.emit(WorkflowEvent.TaskFinished(this@StageWorkflowService, task.job.job, task.task, simulationContext.clock.millis())) + rootListener.taskFinished(task) + + // Add job roots to the scheduling queue + for (dependent in task.dependents) { + if (dependent.state != TaskStatus.READY) { + continue + } + + incomingTasks += dependent + rootListener.taskReady(dependent) + } + + if (job.isFinished) { + finishJob(job) + } + + requestCycle() + } + else -> throw IllegalStateException() + } + } + + private suspend fun finishJob(job: JobState) { + activeJobs -= job + eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, simulationContext.clock.millis())) + rootListener.jobFinished(job) + } + + fun addListener(listener: StageWorkflowSchedulerListener) { + rootListener.listeners += listener + } + + fun removeListener(listener: StageWorkflowSchedulerListener) { + rootListener.listeners -= listener + } +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt new file mode 100644 index 00000000..acd5731b --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt @@ -0,0 +1,85 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.workload.Task + +class TaskState(val job: JobState, val task: Task) { + /** + * The moment in time the task was started. + */ + var startedAt: Long = Long.MIN_VALUE + + /** + * The moment in time the task was finished. + */ + var finishedAt: Long = Long.MIN_VALUE + + /** + * The dependencies of this task. + */ + val dependencies = HashSet<TaskState>() + + /** + * The dependents of this task. + */ + val dependents = HashSet<TaskState>() + + /** + * A flag to indicate whether this workflow task instance is a workflow root. + */ + val isRoot: Boolean + get() = dependencies.isEmpty() + + var state: TaskStatus = TaskStatus.CREATED + set(value) { + field = value + + // Mark the process as terminated in the graph + if (value == TaskStatus.FINISHED) { + markTerminated() + } + } + + var host: Node? = null + + /** + * Mark the specified [TaskView] as terminated. + */ + private fun markTerminated() { + for (dependent in dependents) { + dependent.dependencies.remove(this) + + if (dependent.isRoot) { + dependent.state = TaskStatus.READY + } + } + } + + override fun equals(other: Any?): Boolean = other is TaskState && other.job == job && other.task == task + + override fun hashCode(): Int = task.hashCode() +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskStatus.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskStatus.kt new file mode 100644 index 00000000..c53c6171 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskStatus.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +/** + * The state of a workflow task. + */ +public enum class TaskStatus { + CREATED, + READY, + ACTIVE, + FINISHED +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt new file mode 100644 index 00000000..2ca5a19d --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowEvent.kt @@ -0,0 +1,76 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +import com.atlarge.opendc.workflows.workload.Job +import com.atlarge.opendc.workflows.workload.Task + +/** + * An event emitted by the [WorkflowService]. + */ +public sealed class WorkflowEvent { + /** + * The [WorkflowService] that emitted the event. + */ + public abstract val service: WorkflowService + + /** + * This event is emitted when a job has become active. + */ + public data class JobStarted( + override val service: WorkflowService, + public val job: Job, + public val time: Long + ) : WorkflowEvent() + + /** + * This event is emitted when a job has finished processing. + */ + public data class JobFinished( + override val service: WorkflowService, + public val job: Job, + public val time: Long + ) : WorkflowEvent() + + /** + * This event is emitted when a task of a job has started processing. + */ + public data class TaskStarted( + override val service: WorkflowService, + public val job: Job, + public val task: Task, + public val time: Long + ) : WorkflowEvent() + + /** + * This event is emitted when a task of a job has started processing. + */ + public data class TaskFinished( + override val service: WorkflowService, + public val job: Job, + public val task: Task, + public val time: Long + ) : WorkflowEvent() +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt new file mode 100644 index 00000000..776f0b07 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerMode.kt @@ -0,0 +1,112 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.workflows.service.stage.StagePolicy +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.yield + +/** + * The operating mode of a workflow scheduler. + */ +sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { + /** + * The logic for operating the cycles of a workflow scheduler. + */ + interface Logic { + /** + * Request a new scheduling cycle to be performed. + */ + suspend fun requestCycle() + } + + /** + * An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received. + */ + object Interactive : WorkflowSchedulerMode() { + override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override suspend fun requestCycle() { + yield() + scheduler.schedule() + } + } + + override fun toString(): String = "Interactive" + } + + /** + * A batch scheduler triggers a scheduling cycle every time quantum if needed. + */ + data class Batch(val quantum: Long) : WorkflowSchedulerMode() { + private var next: kotlinx.coroutines.Job? = null + + override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override suspend fun requestCycle() { + val ctx = simulationContext + if (next == null) { + // In batch mode, we assume that the scheduler runs at a fixed slot every time + // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. + val delay = quantum - (ctx.clock.millis() % quantum) + + val job = ctx.domain.launch { + delay(delay) + next = null + scheduler.schedule() + } + next = job + } + } + } + + override fun toString(): String = "Batch($quantum)" + } + + /** + * A scheduling cycle is triggered at a random point in time. + */ + data class Random(private val random: java.util.Random = java.util.Random(123)) : WorkflowSchedulerMode() { + private var next: kotlinx.coroutines.Job? = null + + override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override suspend fun requestCycle() { + val ctx = simulationContext + if (next == null) { + val delay = random.nextInt(200).toLong() + + val job = ctx.domain.launch { + delay(delay) + next = null + scheduler.schedule() + } + next = job + } + } + } + + override fun toString(): String = "Random" + } +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt new file mode 100644 index 00000000..38ea49c4 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +import com.atlarge.opendc.core.services.AbstractServiceKey +import com.atlarge.opendc.workflows.workload.Job +import kotlinx.coroutines.flow.Flow +import java.util.UUID + +/** + * A service for cloud workflow management. + * + * The workflow scheduler is modelled after the Reference Architecture for Datacenter Scheduling by Andreadis et al. + */ +public interface WorkflowService { + /** + * Thie events emitted by the workflow scheduler. + */ + public val events: Flow<WorkflowEvent> + + /** + * Submit the specified [Job] to the workflow service for scheduling. + */ + public suspend fun submit(job: Job) + + /** + * The service key for the workflow scheduler. + */ + companion object Key : AbstractServiceKey<WorkflowService>(UUID.randomUUID(), "workflows") +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/StagePolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/StagePolicy.kt new file mode 100644 index 00000000..c7cc3d84 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/StagePolicy.kt @@ -0,0 +1,38 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import java.io.Serializable + +/** + * A scheduling stage policy. + */ +interface StagePolicy<T : Any> : Serializable { + /** + * Build the logic of the stage policy. + */ + operator fun invoke(scheduler: StageWorkflowService): T +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt new file mode 100644 index 00000000..bbdb9f71 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt @@ -0,0 +1,102 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.workload.Job +import com.atlarge.opendc.workflows.workload.Task +import com.atlarge.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE + +/** + * A [JobOrderPolicy] that orders jobs based on its critical path length. + */ +data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> = + object : Comparator<JobState>, StageWorkflowSchedulerListener { + private val results = HashMap<Job, Long>() + + init { + scheduler.addListener(this) + } + + private val Job.duration: Long + get() = results[this]!! + + override fun jobSubmitted(job: JobState) { + results[job.job] = job.job.toposort().map { task -> + val estimable = task.metadata[WORKFLOW_TASK_DEADLINE] as? Long? + estimable ?: Long.MAX_VALUE + }.sum() + } + + override fun jobFinished(job: JobState) { + results.remove(job.job) + } + + override fun compare(o1: JobState, o2: JobState): Int { + return compareValuesBy(o1, o2) { it.job.duration }.let { if (ascending) it else -it } + } + } + + override fun toString(): String { + return "Job-Duration(${if (ascending) "asc" else "desc"})" + } +} + +/** + * Create a topological sorting of the tasks in a job. + * + * @return The list of tasks within the job topologically sorted. + */ +fun Job.toposort(): List<Task> { + val res = mutableListOf<Task>() + val visited = mutableSetOf<Task>() + val adjacent = mutableMapOf<Task, MutableList<Task>>() + + for (task in tasks) { + for (dependency in task.dependencies) { + adjacent.getOrPut(dependency) { mutableListOf() }.add(task) + } + } + + fun visit(task: Task) { + visited.add(task) + + adjacent[task] ?: emptyList<Task>() + .asSequence() + .filter { it !in visited } + .forEach { visit(it) } + + res.add(task) + } + + tasks + .asSequence() + .filter { it !in visited } + .forEach { visit(it) } + return res +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt new file mode 100644 index 00000000..535d7792 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt @@ -0,0 +1,72 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.stage.StagePolicy + +/** + * A policy interface for admitting [JobState]s to a scheduling cycle. + */ +interface JobAdmissionPolicy : StagePolicy<JobAdmissionPolicy.Logic> { + interface Logic { + /** + * Determine whether the specified [JobState] should be admitted to the scheduling cycle. + * + * @param job The workflow that has been submitted. + * @return The advice for admitting the job. + */ + operator fun invoke(job: JobState): Advice + } + + /** + * The advice given to the scheduler by an admission policy. + * + * @property admit A flag to indicate to the scheduler that the job should be admitted. + * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait + * for the next scheduling cycle. + */ + enum class Advice(val admit: Boolean, val stop: Boolean) { + /** + * Admit the current job to the scheduling queue and continue admitting jobs. + */ + ADMIT(true, false), + + /** + * Admit the current job to the scheduling queue and stop admitting jobs. + */ + ADMIT_LAST(true, true), + + /** + * Deny the current job, but continue admitting jobs. + */ + DENY(false, false), + + /** + * Deny the current job and also stop admitting jobs. + */ + STOP(false, true) + } +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt new file mode 100644 index 00000000..ba57f064 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobOrderPolicy.kt @@ -0,0 +1,33 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.stage.StagePolicy + +/** + * A policy interface for ordering admitted workflows in the scheduling queue. + */ +interface JobOrderPolicy : StagePolicy<Comparator<JobState>> diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt new file mode 100644 index 00000000..6b1faf20 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowService + +/** + * A [JobAdmissionPolicy] that limits the amount of active jobs in the system. + * + * @property limit The maximum number of concurrent jobs in the system. + */ +data class LimitJobAdmissionPolicy(val limit: Int) : JobAdmissionPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : JobAdmissionPolicy.Logic { + override fun invoke( + job: JobState + ): JobAdmissionPolicy.Advice = + if (scheduler.activeJobs.size < limit) + JobAdmissionPolicy.Advice.ADMIT + else + JobAdmissionPolicy.Advice.STOP + } + + override fun toString(): String = "Limit-Active($limit)" +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt new file mode 100644 index 00000000..e1c27472 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowService + +/** + * A [JobAdmissionPolicy] that limits the amount of jobs based on the average system load. + * + * @property limit The maximum load before stopping admission. + */ +data class LoadJobAdmissionPolicy(val limit: Double) : JobAdmissionPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : JobAdmissionPolicy.Logic { + override fun invoke( + job: JobState + ): JobAdmissionPolicy.Advice = + if (scheduler.load < limit) + JobAdmissionPolicy.Advice.ADMIT + else + JobAdmissionPolicy.Advice.STOP + } + + override fun toString(): String = "Limit-Load($limit)" +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt new file mode 100644 index 00000000..46888467 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt @@ -0,0 +1,39 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowService + +/** + * A [JobAdmissionPolicy] that admits all jobs. + */ +object NullJobAdmissionPolicy : JobAdmissionPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : JobAdmissionPolicy.Logic { + override fun invoke(job: JobState): JobAdmissionPolicy.Advice = JobAdmissionPolicy.Advice.ADMIT + } + + override fun toString(): String = "Always" +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt new file mode 100644 index 00000000..14a3d98d --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt @@ -0,0 +1,62 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.workload.Job +import java.util.Random + +/** + * A [JobOrderPolicy] that randomly orders jobs. + */ +object RandomJobOrderPolicy : JobOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> = + object : Comparator<JobState>, StageWorkflowSchedulerListener { + private val random = Random(123) + private val ids = HashMap<Job, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobSubmitted(job: JobState) { + ids[job.job] = random.nextInt() + } + + override fun jobFinished(job: JobState) { + ids.remove(job.job) + } + + override fun compare(o1: JobState, o2: JobState): Int { + return compareValuesBy(o1, o2) { ids.getValue(it.job) } + } + } + + override fun toString(): String { + return "Random" + } +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt new file mode 100644 index 00000000..3bce43cf --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowService + +/** + * A [SizeJobOrderPolicy] that orders jobs based on the number of tasks it has. + */ +data class SizeJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = + compareBy<JobState> { it.tasks.size.let { if (ascending) it else -it } } + + override fun toString(): String { + return "Job-Size(${if (ascending) "asc" else "desc"})" + } +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt new file mode 100644 index 00000000..d6e24b2b --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.job + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowService + +/** + * A [JobOrderPolicy] orders jobs in FIFO order. + */ +data class SubmissionTimeJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = + compareBy<JobState> { it.submittedAt.let { if (ascending) it else -it } } + + override fun toString(): String { + return "Submission-Time(${if (ascending) "asc" else "desc"})" + } +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt new file mode 100644 index 00000000..a5671d45 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt @@ -0,0 +1,39 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.resource + +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService + +/** + * A [ResourceSelectionPolicy] that selects the first machine that is available. + */ +object FirstFitResourceSelectionPolicy : ResourceSelectionPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : Comparator<Node> { + override fun compare(o1: Node, o2: Node): Int = 1 + } + + override fun toString(): String = "First-Fit" +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt new file mode 100644 index 00000000..0e83d8d7 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt @@ -0,0 +1,43 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.resource + +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +/** + * A [ResourceFilterPolicy] based on the amount of cores available on the machine and the cores required for + * the task. + */ +object FunctionalResourceFilterPolicy : ResourceFilterPolicy { + override fun invoke(scheduler: StageWorkflowService): ResourceFilterPolicy.Logic = + object : ResourceFilterPolicy.Logic { + override fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node> = + hosts.filter { it in scheduler.available } + } + + override fun toString(): String = "functional" +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt new file mode 100644 index 00000000..9b05cbac --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.resource + +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService +import java.util.Random + +/** + * A [ResourceSelectionPolicy] that randomly orders the machines. + */ +object RandomResourceSelectionPolicy : ResourceSelectionPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : Comparator<Node> { + private val ids: Map<Node, Long> + + init { + val random = Random(123) + ids = scheduler.nodes.associateWith { random.nextLong() } + } + + override fun compare(o1: Node, o2: Node): Int = compareValuesBy(o1, o2) { ids[it] } + } + + override fun toString(): String = "Random" +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt new file mode 100644 index 00000000..28ef970f --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.resource + +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.TaskState +import com.atlarge.opendc.workflows.service.stage.StagePolicy + +/** + * This interface represents stages **R2**, **R3** and **R4** stage of the Reference Architecture for Schedulers and + * acts as a filter yielding a list of resources with sufficient resource-capacities, based on fixed or dynamic + * requirements, and on predicted or monitored information about processing unit availability, memory occupancy, etc. + */ +interface ResourceFilterPolicy : StagePolicy<ResourceFilterPolicy.Logic> { + interface Logic { + /** + * Filter the list of machines based on dynamic information. + * + * @param hosts The hosts to filter. + * @param task The task that is to be scheduled. + * @return The machines on which the task can be scheduled. + */ + operator fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node> + } +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt new file mode 100644 index 00000000..43053097 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt @@ -0,0 +1,34 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.resource + +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.stage.StagePolicy + +/** + * This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected + * task with a (set of) resource(s), using policies such as First-Fit, Worst-Fit, and Best-Fit. + */ +interface ResourceSelectionPolicy : StagePolicy<Comparator<Node>> diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt new file mode 100644 index 00000000..b084d26c --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt @@ -0,0 +1,70 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +/** + * A [TaskOrderPolicy] that orders tasks based on the number of active relative tasks (w.r.t. its job) in the system. + */ +data class ActiveTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = + object : Comparator<TaskState>, StageWorkflowSchedulerListener { + private val active = mutableMapOf<JobState, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + active[job] = 0 + } + + override fun jobFinished(job: JobState) { + active.remove(job) + } + + override fun taskAssigned(task: TaskState) { + active.merge(task.job, 1, Int::plus) + } + + override fun taskFinished(task: TaskState) { + active.merge(task.job, -1, Int::plus) + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { active.getValue(it.job) }.let { + if (ascending) it else -it + } + } + } + + override fun toString(): String { + return "Active-Per-Job(${if (ascending) "asc" else "desc"})" + } +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt new file mode 100644 index 00000000..2255d40c --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt @@ -0,0 +1,78 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState +import kotlin.math.max + +/** + * A [TaskEligibilityPolicy] that balances the tasks based on their job, e.g. do not allow a single job to claim all + * resources of the system. + * + * @property tolerance The maximum difference from the average number of tasks per job in the system as a fraction of + * the average. + */ +data class BalancingTaskEligibilityPolicy(val tolerance: Double = 1.5) : TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = + object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener { + private val active = mutableMapOf<JobState, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + active[job] = 0 + } + + override fun jobFinished(job: JobState) { + active.remove(job) + } + + override fun taskAssigned(task: TaskState) { + active.merge(task.job, 1, Int::plus) + } + + override fun taskFinished(task: TaskState) { + active.merge(task.job, -1, Int::plus) + } + + override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice { + val activeJobs = scheduler.activeJobs.size + val activeTasks = scheduler.activeTasks.size + val baseline = max(activeTasks / activeJobs.toDouble(), 1.0) + val activeForJob = active[task.job]!! + return if ((activeForJob + 1) / baseline < tolerance) + TaskEligibilityPolicy.Advice.ADMIT + else + TaskEligibilityPolicy.Advice.DENY + } + } + + override fun toString(): String = "Job-Balance($tolerance)" +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt new file mode 100644 index 00000000..d0cf1374 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt @@ -0,0 +1,66 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +/** + * A [TaskOrderPolicy] that orders tasks based on the number of completed relative tasks. + */ +data class CompletionTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = + object : Comparator<TaskState>, StageWorkflowSchedulerListener { + private val finished = mutableMapOf<JobState, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + finished[job] = 0 + } + + override fun jobFinished(job: JobState) { + finished.remove(job) + } + + override fun taskFinished(task: TaskState) { + finished.merge(task.job, 1, Int::plus) + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { finished.getValue(it.job) / it.job.tasks.size.toDouble() }.let { + if (ascending) it else -it + } + } + } + + override fun toString(): String { + return "Job-Completion(${if (ascending) "asc" else "desc"})" + } +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt new file mode 100644 index 00000000..73d83d21 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt @@ -0,0 +1,41 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +/** + * A [TaskOrderPolicy] that orders tasks based on the number of dependency tasks it has. + */ +data class DependenciesTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = compareBy<TaskState> { + it.task.dependencies.size.let { if (ascending) it else -it } + } + + override fun toString(): String { + return "Task-Dependencies(${if (ascending) "asc" else "desc"})" + } +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt new file mode 100644 index 00000000..85b3543f --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt @@ -0,0 +1,41 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +/** + * A [TaskOrderPolicy] that orders tasks based on the number of dependent tasks it has. + */ +data class DependentsTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = compareBy<TaskState> { + it.dependents.size.let { if (ascending) it else -it } + } + + override fun toString(): String { + return "Task-Dependents(${if (ascending) "asc" else "desc"})" + } +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt new file mode 100644 index 00000000..426a76a4 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt @@ -0,0 +1,72 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +/** + * A [TaskOrderPolicy] that orders tasks based on the average duration of the preceding tasks in the job. + */ +data class DurationHistoryTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + + override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = + object : Comparator<TaskState>, StageWorkflowSchedulerListener { + private val results = HashMap<JobState, MutableList<Long>>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + results[job] = mutableListOf() + } + + override fun jobFinished(job: JobState) { + results.remove(job) + } + + override fun taskFinished(task: TaskState) { + results.getValue(task.job) += task.finishedAt - task.startedAt + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { key -> + val history = results.getValue(key.job) + if (history.isEmpty()) { + Long.MAX_VALUE + } else { + history.average() + } + }.let { if (ascending) it else -it } + } + } + + override fun toString(): String { + return "Task-Duration-History(${if (ascending) "asc" else "desc"})" + } +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt new file mode 100644 index 00000000..23b47891 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt @@ -0,0 +1,68 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState +import com.atlarge.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE +import java.util.UUID + +/** + * A [TaskOrderPolicy] orders tasks based on the pre-specified (approximate) duration of the task. + */ +data class DurationTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + + override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = + object : Comparator<TaskState>, StageWorkflowSchedulerListener { + private val results = HashMap<UUID, Long>() + + init { + scheduler.addListener(this) + } + + override fun taskReady(task: TaskState) { + val deadline = task.task.metadata[WORKFLOW_TASK_DEADLINE] as? Long? + results[task.task.uid] = deadline ?: Long.MAX_VALUE + } + + override fun taskFinished(task: TaskState) { + results -= task.task.uid + } + + private val TaskState.duration: Long + get() = results.getValue(task.uid) + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { state -> state.duration }.let { + if (ascending) it else -it + } + } + } + + override fun toString(): String { + return "Task-Duration(${if (ascending) "asc" else "desc"})" + } +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt new file mode 100644 index 00000000..c039bf6f --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt @@ -0,0 +1,70 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.JobState +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +/** + * A [TaskEligibilityPolicy] that limits the number of active tasks of a job in the system. + */ +data class LimitPerJobTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = + object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener { + private val active = mutableMapOf<JobState, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + active[job] = 0 + } + + override fun jobFinished(job: JobState) { + active.remove(job) + } + + override fun taskAssigned(task: TaskState) { + active.merge(task.job, 1, Int::plus) + } + + override fun taskFinished(task: TaskState) { + active.merge(task.job, -1, Int::plus) + } + + override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice { + val activeForJob = active[task.job]!! + return if (activeForJob <= limit) + TaskEligibilityPolicy.Advice.ADMIT + else + TaskEligibilityPolicy.Advice.DENY + } + } + + override fun toString(): String = "Limit-Active-Job($limit)" +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt new file mode 100644 index 00000000..75322ef5 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt @@ -0,0 +1,45 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +/** + * A [TaskEligibilityPolicy] that limits the total number of active tasks in the system. + */ +data class LimitTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { + override fun invoke( + task: TaskState + ): TaskEligibilityPolicy.Advice = + if (scheduler.activeTasks.size < limit) + TaskEligibilityPolicy.Advice.ADMIT + else + TaskEligibilityPolicy.Advice.STOP + } + + override fun toString(): String = "Limit-Active($limit)" +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt new file mode 100644 index 00000000..090f7be7 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt @@ -0,0 +1,45 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +/** + * A [TaskEligibilityPolicy] that limits the number of active tasks in the system based on the average system load. + */ +data class LoadTaskEligibilityPolicy(val limit: Double) : TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { + override fun invoke( + task: TaskState + ): TaskEligibilityPolicy.Advice = + if (scheduler.load < limit) + TaskEligibilityPolicy.Advice.ADMIT + else + TaskEligibilityPolicy.Advice.STOP + } + + override fun toString(): String = "Limit-Load($limit)" +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt new file mode 100644 index 00000000..889f2ab5 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt @@ -0,0 +1,43 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +/** + * A [TaskEligibilityPolicy] that always allows new tasks to enter. + */ +object NullTaskEligibilityPolicy : TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = Logic + + private object Logic : TaskEligibilityPolicy.Logic { + override fun invoke( + task: TaskState + ): TaskEligibilityPolicy.Advice = TaskEligibilityPolicy.Advice.ADMIT + } + + override fun toString(): String = "Always" +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt new file mode 100644 index 00000000..d6f49d14 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState +import java.util.Random + +/** + * A [TaskEligibilityPolicy] that randomly accepts tasks in the system with some [probability]. + */ +data class RandomTaskEligibilityPolicy(val probability: Double = 0.5) : TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { + val random = Random(123) + + override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice = + if (random.nextDouble() <= probability || scheduler.activeTasks.isEmpty()) + TaskEligibilityPolicy.Advice.ADMIT + else { + TaskEligibilityPolicy.Advice.DENY + } + } + + override fun toString(): String = "Random($probability)" +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt new file mode 100644 index 00000000..4c309085 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt @@ -0,0 +1,62 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerListener +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState +import com.atlarge.opendc.workflows.workload.Task +import kotlin.random.Random + +/** + * A [TaskOrderPolicy] that orders the tasks randomly. + */ +object RandomTaskOrderPolicy : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = + object : Comparator<TaskState>, StageWorkflowSchedulerListener { + private val random = Random(123) + private val ids = HashMap<Task, Int>() + + init { + scheduler.addListener(this) + } + + override fun taskReady(task: TaskState) { + ids[task.task] = random.nextInt() + } + + override fun taskFinished(task: TaskState) { + ids.remove(task.task) + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { ids.getValue(it.task) } + } + } + + override fun toString(): String { + return "Random" + } +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt new file mode 100644 index 00000000..a261965f --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt @@ -0,0 +1,41 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState + +/** + * A [TaskOrderPolicy] that orders tasks based on the order of arrival in the queue. + */ +data class SubmissionTimeTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = compareBy<TaskState> { + it.job.submittedAt.let { if (ascending) it else -it } + } + + override fun toString(): String { + return "Submission-Time(${if (ascending) "asc" else "desc"})" + } +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt new file mode 100644 index 00000000..72a7fdd0 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt @@ -0,0 +1,72 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.TaskState +import com.atlarge.opendc.workflows.service.stage.StagePolicy + +/** + * A policy interface for determining the eligibility of tasks in a scheduling cycle. + */ +interface TaskEligibilityPolicy : StagePolicy<TaskEligibilityPolicy.Logic> { + interface Logic { + /** + * Determine whether the specified [TaskState] is eligible to be scheduled. + * + * @param task The task instance to schedule. + * @return The advice for marking the task. + */ + operator fun invoke(task: TaskState): Advice + } + + /** + * The advice given to the scheduler by an admission policy. + * + * @property admit A flag to indicate to the scheduler that the job should be admitted. + * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait + * for the next scheduling cycle. + */ + enum class Advice(val admit: Boolean, val stop: Boolean) { + /** + * Admit the current job to the scheduling queue and continue admitting jobs. + */ + ADMIT(true, false), + + /** + * Admit the current job to the scheduling queue and stop admitting jobs. + */ + ADMIT_LAST(true, true), + + /** + * Deny the current job, but continue admitting jobs. + */ + DENY(false, false), + + /** + * Deny the current job and also stop admitting jobs. + */ + STOP(false, true) + } +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskOrderPolicy.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskOrderPolicy.kt new file mode 100644 index 00000000..bfdc7924 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskOrderPolicy.kt @@ -0,0 +1,34 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service.stage.task + +import com.atlarge.opendc.workflows.service.TaskState +import com.atlarge.opendc.workflows.service.stage.StagePolicy + +/** + * This interface represents the **T2** stage of the Reference Architecture for Datacenter Schedulers and provides the + * scheduler with a sorted list of tasks to schedule. + */ +interface TaskOrderPolicy : StagePolicy<Comparator<TaskState>> diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt new file mode 100644 index 00000000..02969d8a --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Job.kt @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.workload + +import com.atlarge.opendc.core.User +import com.atlarge.opendc.core.workload.Workload +import java.util.UUID + +/** + * A workload that represents a directed acyclic graph (DAG) of tasks with control and data dependencies between tasks. + * + * @property uid A unique identified of this workflow. + * @property name The name of this workflow. + * @property owner The owner of the workflow. + * @property tasks The tasks that are part of this workflow. + * @property metadata Additional metadata for the job. + */ +data class Job( + override val uid: UUID, + override val name: String, + override val owner: User, + val tasks: Set<Task>, + val metadata: Map<String, Any> = emptyMap() +) : Workload { + override fun equals(other: Any?): Boolean = other is Job && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Job(uid=$uid, name=$name, tasks=${tasks.size}, metadata=$metadata)" +} diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Metadata.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Metadata.kt new file mode 100644 index 00000000..067f1179 --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Metadata.kt @@ -0,0 +1,30 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.workload + +/** + * Meta-data key for the deadline of a task. + */ +const val WORKFLOW_TASK_DEADLINE = "workflow:task:deadline" diff --git a/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt new file mode 100644 index 00000000..82521faa --- /dev/null +++ b/simulator/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt @@ -0,0 +1,50 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.workload + +import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.core.Identity +import java.util.UUID + +/** + * A stage of a [Job]. + * + * @property uid A unique identified of this task. + * @property name The name of this task. + * @property image The application image to run as part of this workflow task. + * @property dependencies The dependencies of this task in order for it to execute. + * @property metadata Additional metadata for this task. + */ +data class Task( + override val uid: UUID, + override val name: String, + val image: Image, + val dependencies: Set<Task>, + val metadata: Map<String, Any> = emptyMap() +) : Identity { + override fun equals(other: Any?): Boolean = other is Task && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} |
