diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-10-01 01:55:20 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-10-01 10:19:17 +0200 |
| commit | a283fac5e4d2a6be229acba191acdcbf7eba6dcd (patch) | |
| tree | 378ee03042636dde384e4c7eb98aef00f5d3213c /simulator/opendc-workflows/src/main/kotlin/org | |
| parent | 8a9f5573bef3f68316add17c04a47cc4e5fe75fa (diff) | |
Migrate to org.opendc namespace
This change moves the OpenDC simulator codebase to the org.opendc
namespace of which we control the domain. Previously, we used the
com.atlarge package of which we did not control the domain, which might
lead to difficulties in the future.
Diffstat (limited to 'simulator/opendc-workflows/src/main/kotlin/org')
42 files changed, 2501 insertions, 0 deletions
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/JobState.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/JobState.kt new file mode 100644 index 00000000..a8d10d22 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/JobState.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service + +import org.opendc.workflows.workload.Job + +class JobState(val job: Job, val submittedAt: Long) { + /** + * A flag to indicate whether this job is finished. + */ + val isFinished: Boolean + get() = tasks.isEmpty() + + val tasks: MutableSet<TaskState> = mutableSetOf() + + override fun equals(other: Any?): Boolean = other is JobState && other.job == job + + override fun hashCode(): Int = job.hashCode() +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerListener.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerListener.kt new file mode 100644 index 00000000..d03a646c --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerListener.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service + +interface StageWorkflowSchedulerListener { + fun cycleStarted(scheduler: StageWorkflowService) {} + fun cycleFinished(scheduler: StageWorkflowService) {} + + fun jobSubmitted(job: JobState) {} + fun jobStarted(job: JobState) {} + fun jobFinished(job: JobState) {} + + fun taskReady(task: TaskState) {} + fun taskAssigned(task: TaskState) {} + fun taskStarted(task: TaskState) {} + fun taskFinished(task: TaskState) {} +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt new file mode 100644 index 00000000..6262c61f --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt @@ -0,0 +1,371 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import org.opendc.compute.core.Server +import org.opendc.compute.core.ServerEvent +import org.opendc.compute.core.ServerState +import org.opendc.compute.metal.Node +import org.opendc.compute.metal.service.ProvisioningService +import org.opendc.utils.flow.EventFlow +import org.opendc.workflows.service.stage.job.JobAdmissionPolicy +import org.opendc.workflows.service.stage.job.JobOrderPolicy +import org.opendc.workflows.service.stage.resource.ResourceFilterPolicy +import org.opendc.workflows.service.stage.resource.ResourceSelectionPolicy +import org.opendc.workflows.service.stage.task.TaskEligibilityPolicy +import org.opendc.workflows.service.stage.task.TaskOrderPolicy +import org.opendc.workflows.workload.Job +import java.time.Clock +import java.util.* + +/** + * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for + * Topology Scheduling. + */ +class StageWorkflowService( + internal val coroutineScope: CoroutineScope, + internal val clock: Clock, + private val provisioningService: ProvisioningService, + mode: WorkflowSchedulerMode, + jobAdmissionPolicy: JobAdmissionPolicy, + jobOrderPolicy: JobOrderPolicy, + taskEligibilityPolicy: TaskEligibilityPolicy, + taskOrderPolicy: TaskOrderPolicy, + resourceFilterPolicy: ResourceFilterPolicy, + resourceSelectionPolicy: ResourceSelectionPolicy +) : WorkflowService { + + /** + * The incoming jobs ready to be processed by the scheduler. + */ + internal val incomingJobs: MutableSet<JobState> = linkedSetOf() + + /** + * The incoming tasks ready to be processed by the scheduler. + */ + internal val incomingTasks: MutableSet<TaskState> = linkedSetOf() + + /** + * The job queue. + */ + internal val jobQueue: Queue<JobState> + + /** + * The task queue. + */ + internal val taskQueue: Queue<TaskState> + + /** + * The active jobs in the system. + */ + internal val activeJobs: MutableSet<JobState> = mutableSetOf() + + /** + * The active tasks in the system. + */ + internal val activeTasks: MutableSet<TaskState> = mutableSetOf() + + /** + * The running tasks by [Server]. + */ + internal val taskByServer = mutableMapOf<Server, TaskState>() + + /** + * The nodes that are controlled by the service. + */ + internal lateinit var nodes: List<Node> + + /** + * The available nodes. + */ + internal val available: MutableSet<Node> = mutableSetOf() + + /** + * The maximum number of incoming jobs. + */ + private val throttleLimit: Int = 20000 + + /** + * The load of the system. + */ + internal val load: Double + get() = (available.size / nodes.size.toDouble()) + + /** + * The root listener of this scheduler. + */ + private val rootListener = object : StageWorkflowSchedulerListener { + /** + * The listeners to delegate to. + */ + val listeners = mutableSetOf<StageWorkflowSchedulerListener>() + + override fun cycleStarted(scheduler: StageWorkflowService) { + listeners.forEach { it.cycleStarted(scheduler) } + } + + override fun cycleFinished(scheduler: StageWorkflowService) { + listeners.forEach { it.cycleFinished(scheduler) } + } + + override fun jobSubmitted(job: JobState) { + listeners.forEach { it.jobSubmitted(job) } + } + + override fun jobStarted(job: JobState) { + listeners.forEach { it.jobStarted(job) } + } + + override fun jobFinished(job: JobState) { + listeners.forEach { it.jobFinished(job) } + } + + override fun taskReady(task: TaskState) { + listeners.forEach { it.taskReady(task) } + } + + override fun taskAssigned(task: TaskState) { + listeners.forEach { it.taskAssigned(task) } + } + + override fun taskStarted(task: TaskState) { + listeners.forEach { it.taskStarted(task) } + } + + override fun taskFinished(task: TaskState) { + listeners.forEach { it.taskFinished(task) } + } + } + + private val mode: WorkflowSchedulerMode.Logic + private val jobAdmissionPolicy: JobAdmissionPolicy.Logic + private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic + private val resourceFilterPolicy: ResourceFilterPolicy.Logic + private val resourceSelectionPolicy: Comparator<Node> + private val eventFlow = EventFlow<WorkflowEvent>() + + init { + coroutineScope.launch { + nodes = provisioningService.nodes().toList() + available.addAll(nodes) + } + + this.mode = mode(this) + this.jobAdmissionPolicy = jobAdmissionPolicy(this) + this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid }) + this.taskEligibilityPolicy = taskEligibilityPolicy(this) + this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid }) + this.resourceFilterPolicy = resourceFilterPolicy(this) + this.resourceSelectionPolicy = resourceSelectionPolicy(this) + } + + override val events: Flow<WorkflowEvent> = eventFlow + + override suspend fun submit(job: Job) { + // J1 Incoming Jobs + val jobInstance = JobState(job, clock.millis()) + val instances = job.tasks.associateWith { + TaskState(jobInstance, it) + } + + for ((task, instance) in instances) { + instance.dependencies.addAll(task.dependencies.map { instances[it]!! }) + task.dependencies.forEach { + instances[it]!!.dependents.add(instance) + } + + // If the task has no dependency, it is a root task and can immediately be evaluated + if (instance.isRoot) { + instance.state = TaskStatus.READY + } + } + + instances.values.toCollection(jobInstance.tasks) + incomingJobs += jobInstance + rootListener.jobSubmitted(jobInstance) + + requestCycle() + } + + /** + * Indicate to the scheduler that a scheduling cycle is needed. + */ + private suspend fun requestCycle() = mode.requestCycle() + + /** + * Perform a scheduling cycle immediately. + */ + @OptIn(ExperimentalCoroutinesApi::class) + internal suspend fun schedule() { + // J2 Create list of eligible jobs + val iterator = incomingJobs.iterator() + while (iterator.hasNext()) { + val jobInstance = iterator.next() + val advice = jobAdmissionPolicy(jobInstance) + if (advice.stop) { + break + } else if (!advice.admit) { + continue + } + + iterator.remove() + jobQueue.add(jobInstance) + activeJobs += jobInstance + eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, clock.millis())) + rootListener.jobStarted(jobInstance) + } + + // J4 Per job + while (jobQueue.isNotEmpty()) { + val jobInstance = jobQueue.poll() + + // Edge-case: job has no tasks + if (jobInstance.isFinished) { + finishJob(jobInstance) + } + + // Add job roots to the scheduling queue + for (task in jobInstance.tasks) { + if (task.state != TaskStatus.READY) { + continue + } + + incomingTasks += task + rootListener.taskReady(task) + } + } + + // T1 Create list of eligible tasks + val taskIterator = incomingTasks.iterator() + while (taskIterator.hasNext()) { + val taskInstance = taskIterator.next() + val advice = taskEligibilityPolicy(taskInstance) + if (advice.stop) { + break + } else if (!advice.admit) { + continue + } + + taskIterator.remove() + taskQueue.add(taskInstance) + } + + // T3 Per task + while (taskQueue.isNotEmpty()) { + val instance = taskQueue.peek() + val host: Node? = available.firstOrNull() + + if (host != null) { + // T4 Submit task to machine + available -= host + instance.state = TaskStatus.ACTIVE + val newHost = provisioningService.deploy(host, instance.task.image) + val server = newHost.server!! + instance.host = newHost + taskByServer[server] = instance + server.events + .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) } + .launchIn(coroutineScope) + + activeTasks += instance + taskQueue.poll() + rootListener.taskAssigned(instance) + } else { + break + } + } + } + + private suspend fun stateChanged(server: Server) { + when (server.state) { + ServerState.ACTIVE -> { + val task = taskByServer.getValue(server) + task.startedAt = clock.millis() + eventFlow.emit( + WorkflowEvent.TaskStarted( + this@StageWorkflowService, + task.job.job, + task.task, + clock.millis() + ) + ) + rootListener.taskStarted(task) + } + ServerState.SHUTOFF, ServerState.ERROR -> { + val task = taskByServer.remove(server) ?: throw IllegalStateException() + val job = task.job + task.state = TaskStatus.FINISHED + task.finishedAt = clock.millis() + job.tasks.remove(task) + available += task.host!! + activeTasks -= task + eventFlow.emit( + WorkflowEvent.TaskFinished( + this@StageWorkflowService, + task.job.job, + task.task, + clock.millis() + ) + ) + rootListener.taskFinished(task) + + // Add job roots to the scheduling queue + for (dependent in task.dependents) { + if (dependent.state != TaskStatus.READY) { + continue + } + + incomingTasks += dependent + rootListener.taskReady(dependent) + } + + if (job.isFinished) { + finishJob(job) + } + + requestCycle() + } + else -> throw IllegalStateException() + } + } + + private suspend fun finishJob(job: JobState) { + activeJobs -= job + eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, clock.millis())) + rootListener.jobFinished(job) + } + + fun addListener(listener: StageWorkflowSchedulerListener) { + rootListener.listeners += listener + } + + fun removeListener(listener: StageWorkflowSchedulerListener) { + rootListener.listeners -= listener + } +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt new file mode 100644 index 00000000..e7795dd5 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service + +import org.opendc.compute.metal.Node +import org.opendc.workflows.workload.Task + +class TaskState(val job: JobState, val task: Task) { + /** + * The moment in time the task was started. + */ + var startedAt: Long = Long.MIN_VALUE + + /** + * The moment in time the task was finished. + */ + var finishedAt: Long = Long.MIN_VALUE + + /** + * The dependencies of this task. + */ + val dependencies = HashSet<TaskState>() + + /** + * The dependents of this task. + */ + val dependents = HashSet<TaskState>() + + /** + * A flag to indicate whether this workflow task instance is a workflow root. + */ + val isRoot: Boolean + get() = dependencies.isEmpty() + + var state: TaskStatus = TaskStatus.CREATED + set(value) { + field = value + + // Mark the process as terminated in the graph + if (value == TaskStatus.FINISHED) { + markTerminated() + } + } + + var host: Node? = null + + /** + * Mark the specified [TaskView] as terminated. + */ + private fun markTerminated() { + for (dependent in dependents) { + dependent.dependencies.remove(this) + + if (dependent.isRoot) { + dependent.state = TaskStatus.READY + } + } + } + + override fun equals(other: Any?): Boolean = other is TaskState && other.job == job && other.task == task + + override fun hashCode(): Int = task.hashCode() +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskStatus.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskStatus.kt new file mode 100644 index 00000000..99f5bb87 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskStatus.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service + +/** + * The state of a workflow task. + */ +public enum class TaskStatus { + CREATED, + READY, + ACTIVE, + FINISHED +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt new file mode 100644 index 00000000..dadccb50 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service + +import org.opendc.workflows.workload.Job +import org.opendc.workflows.workload.Task + +/** + * An event emitted by the [WorkflowService]. + */ +public sealed class WorkflowEvent { + /** + * The [WorkflowService] that emitted the event. + */ + public abstract val service: WorkflowService + + /** + * This event is emitted when a job has become active. + */ + public data class JobStarted( + override val service: WorkflowService, + public val job: Job, + public val time: Long + ) : WorkflowEvent() + + /** + * This event is emitted when a job has finished processing. + */ + public data class JobFinished( + override val service: WorkflowService, + public val job: Job, + public val time: Long + ) : WorkflowEvent() + + /** + * This event is emitted when a task of a job has started processing. + */ + public data class TaskStarted( + override val service: WorkflowService, + public val job: Job, + public val task: Task, + public val time: Long + ) : WorkflowEvent() + + /** + * This event is emitted when a task of a job has started processing. + */ + public data class TaskFinished( + override val service: WorkflowService, + public val job: Job, + public val task: Task, + public val time: Long + ) : WorkflowEvent() +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt new file mode 100644 index 00000000..3eff0062 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service + +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.yield +import org.opendc.workflows.service.stage.StagePolicy + +/** + * The operating mode of a workflow scheduler. + */ +sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { + /** + * The logic for operating the cycles of a workflow scheduler. + */ + interface Logic { + /** + * Request a new scheduling cycle to be performed. + */ + suspend fun requestCycle() + } + + /** + * An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received. + */ + object Interactive : WorkflowSchedulerMode() { + override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override suspend fun requestCycle() { + yield() + scheduler.schedule() + } + } + + override fun toString(): String = "Interactive" + } + + /** + * A batch scheduler triggers a scheduling cycle every time quantum if needed. + */ + data class Batch(val quantum: Long) : WorkflowSchedulerMode() { + private var next: kotlinx.coroutines.Job? = null + + override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override suspend fun requestCycle() { + if (next == null) { + // In batch mode, we assume that the scheduler runs at a fixed slot every time + // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. + val delay = quantum - (scheduler.clock.millis() % quantum) + + val job = scheduler.coroutineScope.launch { + delay(delay) + next = null + scheduler.schedule() + } + next = job + } + } + } + + override fun toString(): String = "Batch($quantum)" + } + + /** + * A scheduling cycle is triggered at a random point in time. + */ + data class Random(private val random: java.util.Random = java.util.Random(123)) : WorkflowSchedulerMode() { + private var next: kotlinx.coroutines.Job? = null + + override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override suspend fun requestCycle() { + if (next == null) { + val delay = random.nextInt(200).toLong() + + val job = scheduler.coroutineScope.launch { + delay(delay) + next = null + scheduler.schedule() + } + next = job + } + } + } + + override fun toString(): String = "Random" + } +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt new file mode 100644 index 00000000..17a2d875 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service + +import kotlinx.coroutines.flow.Flow +import org.opendc.core.services.AbstractServiceKey +import org.opendc.workflows.workload.Job +import java.util.* + +/** + * A service for cloud workflow management. + * + * The workflow scheduler is modelled after the Reference Architecture for Topology Scheduling by Andreadis et al. + */ +public interface WorkflowService { + /** + * Thie events emitted by the workflow scheduler. + */ + public val events: Flow<WorkflowEvent> + + /** + * Submit the specified [Job] to the workflow service for scheduling. + */ + public suspend fun submit(job: Job) + + /** + * The service key for the workflow scheduler. + */ + companion object Key : AbstractServiceKey<WorkflowService>(UUID.randomUUID(), "workflows") +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/StagePolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/StagePolicy.kt new file mode 100644 index 00000000..68a8a424 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/StagePolicy.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage + +import org.opendc.workflows.service.StageWorkflowService +import java.io.Serializable + +/** + * A scheduling stage policy. + */ +interface StagePolicy<T : Any> : Serializable { + /** + * Build the logic of the stage policy. + */ + operator fun invoke(scheduler: StageWorkflowService): T +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt new file mode 100644 index 00000000..9ac6a97f --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.job + +import org.opendc.workflows.service.JobState +import org.opendc.workflows.service.StageWorkflowSchedulerListener +import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflows.workload.Job +import org.opendc.workflows.workload.Task +import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE + +/** + * A [JobOrderPolicy] that orders jobs based on its critical path length. + */ +data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> = + object : Comparator<JobState>, StageWorkflowSchedulerListener { + private val results = HashMap<Job, Long>() + + init { + scheduler.addListener(this) + } + + private val Job.duration: Long + get() = results[this]!! + + override fun jobSubmitted(job: JobState) { + results[job.job] = job.job.toposort().map { task -> + val estimable = task.metadata[WORKFLOW_TASK_DEADLINE] as? Long? + estimable ?: Long.MAX_VALUE + }.sum() + } + + override fun jobFinished(job: JobState) { + results.remove(job.job) + } + + override fun compare(o1: JobState, o2: JobState): Int { + return compareValuesBy(o1, o2) { it.job.duration }.let { if (ascending) it else -it } + } + } + + override fun toString(): String { + return "Job-Duration(${if (ascending) "asc" else "desc"})" + } +} + +/** + * Create a topological sorting of the tasks in a job. + * + * @return The list of tasks within the job topologically sorted. + */ +fun Job.toposort(): List<Task> { + val res = mutableListOf<Task>() + val visited = mutableSetOf<Task>() + val adjacent = mutableMapOf<Task, MutableList<Task>>() + + for (task in tasks) { + for (dependency in task.dependencies) { + adjacent.getOrPut(dependency) { mutableListOf() }.add(task) + } + } + + fun visit(task: Task) { + visited.add(task) + + adjacent[task] ?: emptyList<Task>() + .asSequence() + .filter { it !in visited } + .forEach { visit(it) } + + res.add(task) + } + + tasks + .asSequence() + .filter { it !in visited } + .forEach { visit(it) } + return res +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt new file mode 100644 index 00000000..8d45918b --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.job + +import org.opendc.workflows.service.JobState +import org.opendc.workflows.service.stage.StagePolicy + +/** + * A policy interface for admitting [JobState]s to a scheduling cycle. + */ +interface JobAdmissionPolicy : StagePolicy<JobAdmissionPolicy.Logic> { + interface Logic { + /** + * Determine whether the specified [JobState] should be admitted to the scheduling cycle. + * + * @param job The workflow that has been submitted. + * @return The advice for admitting the job. + */ + operator fun invoke(job: JobState): Advice + } + + /** + * The advice given to the scheduler by an admission policy. + * + * @property admit A flag to indicate to the scheduler that the job should be admitted. + * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait + * for the next scheduling cycle. + */ + enum class Advice(val admit: Boolean, val stop: Boolean) { + /** + * Admit the current job to the scheduling queue and continue admitting jobs. + */ + ADMIT(true, false), + + /** + * Admit the current job to the scheduling queue and stop admitting jobs. + */ + ADMIT_LAST(true, true), + + /** + * Deny the current job, but continue admitting jobs. + */ + DENY(false, false), + + /** + * Deny the current job and also stop admitting jobs. + */ + STOP(false, true) + } +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobOrderPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobOrderPolicy.kt new file mode 100644 index 00000000..e65a2ea7 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobOrderPolicy.kt @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.job + +import org.opendc.workflows.service.JobState +import org.opendc.workflows.service.stage.StagePolicy + +/** + * A policy interface for ordering admitted workflows in the scheduling queue. + */ +interface JobOrderPolicy : StagePolicy<Comparator<JobState>> diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt new file mode 100644 index 00000000..7ee15e6b --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.job + +import org.opendc.workflows.service.JobState +import org.opendc.workflows.service.StageWorkflowService + +/** + * A [JobAdmissionPolicy] that limits the amount of active jobs in the system. + * + * @property limit The maximum number of concurrent jobs in the system. + */ +data class LimitJobAdmissionPolicy(val limit: Int) : JobAdmissionPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : JobAdmissionPolicy.Logic { + override fun invoke(job: JobState): JobAdmissionPolicy.Advice = + if (scheduler.activeJobs.size < limit) + JobAdmissionPolicy.Advice.ADMIT + else + JobAdmissionPolicy.Advice.STOP + } + + override fun toString(): String = "Limit-Active($limit)" +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt new file mode 100644 index 00000000..31e6d043 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LoadJobAdmissionPolicy.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.job + +import org.opendc.workflows.service.JobState +import org.opendc.workflows.service.StageWorkflowService + +/** + * A [JobAdmissionPolicy] that limits the amount of jobs based on the average system load. + * + * @property limit The maximum load before stopping admission. + */ +data class LoadJobAdmissionPolicy(val limit: Double) : JobAdmissionPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : JobAdmissionPolicy.Logic { + override fun invoke(job: JobState): JobAdmissionPolicy.Advice = + if (scheduler.load < limit) + JobAdmissionPolicy.Advice.ADMIT + else + JobAdmissionPolicy.Advice.STOP + } + + override fun toString(): String = "Limit-Load($limit)" +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt new file mode 100644 index 00000000..e671db52 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.job + +import org.opendc.workflows.service.JobState +import org.opendc.workflows.service.StageWorkflowService + +/** + * A [JobAdmissionPolicy] that admits all jobs. + */ +object NullJobAdmissionPolicy : JobAdmissionPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : JobAdmissionPolicy.Logic { + override fun invoke(job: JobState): JobAdmissionPolicy.Advice = + JobAdmissionPolicy.Advice.ADMIT + } + + override fun toString(): String = "Always" +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt new file mode 100644 index 00000000..7f5abd68 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.job + +import org.opendc.workflows.service.JobState +import org.opendc.workflows.service.StageWorkflowSchedulerListener +import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflows.workload.Job +import java.util.* +import kotlin.collections.HashMap +import kotlin.collections.getValue +import kotlin.collections.set + +/** + * A [JobOrderPolicy] that randomly orders jobs. + */ +object RandomJobOrderPolicy : JobOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> = + object : Comparator<JobState>, StageWorkflowSchedulerListener { + private val random = Random(123) + private val ids = HashMap<Job, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobSubmitted(job: JobState) { + ids[job.job] = random.nextInt() + } + + override fun jobFinished(job: JobState) { + ids.remove(job.job) + } + + override fun compare(o1: JobState, o2: JobState): Int { + return compareValuesBy(o1, o2) { ids.getValue(it.job) } + } + } + + override fun toString(): String { + return "Random" + } +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt new file mode 100644 index 00000000..05953a9b --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.job + +import org.opendc.workflows.service.JobState +import org.opendc.workflows.service.StageWorkflowService + +/** + * A [SizeJobOrderPolicy] that orders jobs based on the number of tasks it has. + */ +data class SizeJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = + compareBy<JobState> { it.tasks.size.let { if (ascending) it else -it } } + + override fun toString(): String { + return "Job-Size(${if (ascending) "asc" else "desc"})" + } +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt new file mode 100644 index 00000000..9a48f934 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.job + +import org.opendc.workflows.service.JobState +import org.opendc.workflows.service.StageWorkflowService + +/** + * A [JobOrderPolicy] orders jobs in FIFO order. + */ +data class SubmissionTimeJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = + compareBy<JobState> { it.submittedAt.let { if (ascending) it else -it } } + + override fun toString(): String { + return "Submission-Time(${if (ascending) "asc" else "desc"})" + } +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt new file mode 100644 index 00000000..64b46330 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.resource + +import org.opendc.compute.metal.Node +import org.opendc.workflows.service.StageWorkflowService + +/** + * A [ResourceSelectionPolicy] that selects the first machine that is available. + */ +object FirstFitResourceSelectionPolicy : ResourceSelectionPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : Comparator<Node> { + override fun compare(o1: Node, o2: Node): Int = 1 + } + + override fun toString(): String = "First-Fit" +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt new file mode 100644 index 00000000..e505539d --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.resource + +import org.opendc.compute.metal.Node +import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflows.service.TaskState + +/** + * A [ResourceFilterPolicy] based on the amount of cores available on the machine and the cores required for + * the task. + */ +object FunctionalResourceFilterPolicy : ResourceFilterPolicy { + override fun invoke(scheduler: StageWorkflowService): ResourceFilterPolicy.Logic = + object : ResourceFilterPolicy.Logic { + override fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node> = + hosts.filter { it in scheduler.available } + } + + override fun toString(): String = "functional" +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt new file mode 100644 index 00000000..68c78cd6 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.resource + +import org.opendc.compute.metal.Node +import org.opendc.workflows.service.StageWorkflowService +import java.util.* + +/** + * A [ResourceSelectionPolicy] that randomly orders the machines. + */ +object RandomResourceSelectionPolicy : ResourceSelectionPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : Comparator<Node> { + private val ids: Map<Node, Long> + + init { + val random = Random(123) + ids = scheduler.nodes.associateWith { random.nextLong() } + } + + override fun compare(o1: Node, o2: Node): Int = compareValuesBy(o1, o2) { ids[it] } + } + + override fun toString(): String = "Random" +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt new file mode 100644 index 00000000..43744417 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.resource + +import org.opendc.compute.metal.Node +import org.opendc.workflows.service.TaskState +import org.opendc.workflows.service.stage.StagePolicy + +/** + * This interface represents stages **R2**, **R3** and **R4** stage of the Reference Architecture for Schedulers and + * acts as a filter yielding a list of resources with sufficient resource-capacities, based on fixed or dynamic + * requirements, and on predicted or monitored information about processing unit availability, memory occupancy, etc. + */ +interface ResourceFilterPolicy : StagePolicy<ResourceFilterPolicy.Logic> { + interface Logic { + /** + * Filter the list of machines based on dynamic information. + * + * @param hosts The hosts to filter. + * @param task The task that is to be scheduled. + * @return The machines on which the task can be scheduled. + */ + operator fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node> + } +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt new file mode 100644 index 00000000..2cc9bc3b --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.resource + +import org.opendc.compute.metal.Node +import org.opendc.workflows.service.stage.StagePolicy + +/** + * This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected + * task with a (set of) resource(s), using policies such as First-Fit, Worst-Fit, and Best-Fit. + */ +interface ResourceSelectionPolicy : StagePolicy<Comparator<Node>> diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt new file mode 100644 index 00000000..ef2f9db4 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.task + +import org.opendc.workflows.service.JobState +import org.opendc.workflows.service.StageWorkflowSchedulerListener +import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflows.service.TaskState + +/** + * A [TaskOrderPolicy] that orders tasks based on the number of active relative tasks (w.r.t. its job) in the system. + */ +data class ActiveTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = + object : Comparator<TaskState>, StageWorkflowSchedulerListener { + private val active = mutableMapOf<JobState, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + active[job] = 0 + } + + override fun jobFinished(job: JobState) { + active.remove(job) + } + + override fun taskAssigned(task: TaskState) { + active.merge(task.job, 1, Int::plus) + } + + override fun taskFinished(task: TaskState) { + active.merge(task.job, -1, Int::plus) + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { active.getValue(it.job) }.let { + if (ascending) it else -it + } + } + } + + override fun toString(): String { + return "Active-Per-Job(${if (ascending) "asc" else "desc"})" + } +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt new file mode 100644 index 00000000..11ac612e --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.task + +import org.opendc.workflows.service.JobState +import org.opendc.workflows.service.StageWorkflowSchedulerListener +import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflows.service.TaskState +import kotlin.math.max + +/** + * A [TaskEligibilityPolicy] that balances the tasks based on their job, e.g. do not allow a single job to claim all + * resources of the system. + * + * @property tolerance The maximum difference from the average number of tasks per job in the system as a fraction of + * the average. + */ +data class BalancingTaskEligibilityPolicy(val tolerance: Double = 1.5) : TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = + object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener { + private val active = mutableMapOf<JobState, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + active[job] = 0 + } + + override fun jobFinished(job: JobState) { + active.remove(job) + } + + override fun taskAssigned(task: TaskState) { + active.merge(task.job, 1, Int::plus) + } + + override fun taskFinished(task: TaskState) { + active.merge(task.job, -1, Int::plus) + } + + override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice { + val activeJobs = scheduler.activeJobs.size + val activeTasks = scheduler.activeTasks.size + val baseline = max(activeTasks / activeJobs.toDouble(), 1.0) + val activeForJob = active[task.job]!! + return if ((activeForJob + 1) / baseline < tolerance) + TaskEligibilityPolicy.Advice.ADMIT + else + TaskEligibilityPolicy.Advice.DENY + } + } + + override fun toString(): String = "Job-Balance($tolerance)" +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt new file mode 100644 index 00000000..c3e3720a --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.task + +import org.opendc.workflows.service.JobState +import org.opendc.workflows.service.StageWorkflowSchedulerListener +import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflows.service.TaskState + +/** + * A [TaskOrderPolicy] that orders tasks based on the number of completed relative tasks. + */ +data class CompletionTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = + object : Comparator<TaskState>, StageWorkflowSchedulerListener { + private val finished = mutableMapOf<JobState, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + finished[job] = 0 + } + + override fun jobFinished(job: JobState) { + finished.remove(job) + } + + override fun taskFinished(task: TaskState) { + finished.merge(task.job, 1, Int::plus) + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { finished.getValue(it.job) / it.job.tasks.size.toDouble() }.let { + if (ascending) it else -it + } + } + } + + override fun toString(): String { + return "Job-Completion(${if (ascending) "asc" else "desc"})" + } +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt new file mode 100644 index 00000000..60e27118 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.task + +import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflows.service.TaskState + +/** + * A [TaskOrderPolicy] that orders tasks based on the number of dependency tasks it has. + */ +data class DependenciesTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = compareBy<TaskState> { + it.task.dependencies.size.let { if (ascending) it else -it } + } + + override fun toString(): String { + return "Task-Dependencies(${if (ascending) "asc" else "desc"})" + } +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt new file mode 100644 index 00000000..97a6dfb0 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.task + +import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflows.service.TaskState + +/** + * A [TaskOrderPolicy] that orders tasks based on the number of dependent tasks it has. + */ +data class DependentsTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = compareBy<TaskState> { + it.dependents.size.let { if (ascending) it else -it } + } + + override fun toString(): String { + return "Task-Dependents(${if (ascending) "asc" else "desc"})" + } +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt new file mode 100644 index 00000000..9cd83eac --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.task + +import org.opendc.workflows.service.JobState +import org.opendc.workflows.service.StageWorkflowSchedulerListener +import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflows.service.TaskState + +/** + * A [TaskOrderPolicy] that orders tasks based on the average duration of the preceding tasks in the job. + */ +data class DurationHistoryTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + + override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = + object : Comparator<TaskState>, StageWorkflowSchedulerListener { + private val results = HashMap<JobState, MutableList<Long>>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + results[job] = mutableListOf() + } + + override fun jobFinished(job: JobState) { + results.remove(job) + } + + override fun taskFinished(task: TaskState) { + results.getValue(task.job) += task.finishedAt - task.startedAt + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { key -> + val history = results.getValue(key.job) + if (history.isEmpty()) { + Long.MAX_VALUE + } else { + history.average() + } + }.let { if (ascending) it else -it } + } + } + + override fun toString(): String { + return "Task-Duration-History(${if (ascending) "asc" else "desc"})" + } +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt new file mode 100644 index 00000000..d5a8a104 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.task + +import org.opendc.workflows.service.StageWorkflowSchedulerListener +import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflows.service.TaskState +import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE +import java.util.* +import kotlin.collections.HashMap +import kotlin.collections.getValue +import kotlin.collections.minusAssign +import kotlin.collections.set + +/** + * A [TaskOrderPolicy] orders tasks based on the pre-specified (approximate) duration of the task. + */ +data class DurationTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + + override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = + object : Comparator<TaskState>, StageWorkflowSchedulerListener { + private val results = HashMap<UUID, Long>() + + init { + scheduler.addListener(this) + } + + override fun taskReady(task: TaskState) { + val deadline = task.task.metadata[WORKFLOW_TASK_DEADLINE] as? Long? + results[task.task.uid] = deadline ?: Long.MAX_VALUE + } + + override fun taskFinished(task: TaskState) { + results -= task.task.uid + } + + private val TaskState.duration: Long + get() = results.getValue(task.uid) + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { state -> state.duration }.let { + if (ascending) it else -it + } + } + } + + override fun toString(): String { + return "Task-Duration(${if (ascending) "asc" else "desc"})" + } +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt new file mode 100644 index 00000000..9b06f7d9 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.task + +import org.opendc.workflows.service.JobState +import org.opendc.workflows.service.StageWorkflowSchedulerListener +import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflows.service.TaskState + +/** + * A [TaskEligibilityPolicy] that limits the number of active tasks of a job in the system. + */ +data class LimitPerJobTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = + object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener { + private val active = mutableMapOf<JobState, Int>() + + init { + scheduler.addListener(this) + } + + override fun jobStarted(job: JobState) { + active[job] = 0 + } + + override fun jobFinished(job: JobState) { + active.remove(job) + } + + override fun taskAssigned(task: TaskState) { + active.merge(task.job, 1, Int::plus) + } + + override fun taskFinished(task: TaskState) { + active.merge(task.job, -1, Int::plus) + } + + override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice { + val activeForJob = active[task.job]!! + return if (activeForJob <= limit) + TaskEligibilityPolicy.Advice.ADMIT + else + TaskEligibilityPolicy.Advice.DENY + } + } + + override fun toString(): String = "Limit-Active-Job($limit)" +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt new file mode 100644 index 00000000..e0ac3bc4 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.task + +import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflows.service.TaskState + +/** + * A [TaskEligibilityPolicy] that limits the total number of active tasks in the system. + */ +data class LimitTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { + override fun invoke( + task: TaskState + ): TaskEligibilityPolicy.Advice = + if (scheduler.activeTasks.size < limit) + TaskEligibilityPolicy.Advice.ADMIT + else + TaskEligibilityPolicy.Advice.STOP + } + + override fun toString(): String = "Limit-Active($limit)" +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt new file mode 100644 index 00000000..e1f0a0b7 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LoadTaskEligibilityPolicy.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.task + +import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflows.service.TaskState + +/** + * A [TaskEligibilityPolicy] that limits the number of active tasks in the system based on the average system load. + */ +data class LoadTaskEligibilityPolicy(val limit: Double) : TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { + override fun invoke( + task: TaskState + ): TaskEligibilityPolicy.Advice = + if (scheduler.load < limit) + TaskEligibilityPolicy.Advice.ADMIT + else + TaskEligibilityPolicy.Advice.STOP + } + + override fun toString(): String = "Limit-Load($limit)" +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt new file mode 100644 index 00000000..4f34b692 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.task + +import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflows.service.TaskState + +/** + * A [TaskEligibilityPolicy] that always allows new tasks to enter. + */ +object NullTaskEligibilityPolicy : TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = Logic + + private object Logic : TaskEligibilityPolicy.Logic { + override fun invoke( + task: TaskState + ): TaskEligibilityPolicy.Advice = TaskEligibilityPolicy.Advice.ADMIT + } + + override fun toString(): String = "Always" +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt new file mode 100644 index 00000000..8a2e26ad --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.task + +import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflows.service.TaskState +import java.util.* + +/** + * A [TaskEligibilityPolicy] that randomly accepts tasks in the system with some [probability]. + */ +data class RandomTaskEligibilityPolicy(val probability: Double = 0.5) : TaskEligibilityPolicy { + override fun invoke(scheduler: StageWorkflowService) = object : TaskEligibilityPolicy.Logic { + val random = Random(123) + + override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice = + if (random.nextDouble() <= probability || scheduler.activeTasks.isEmpty()) + TaskEligibilityPolicy.Advice.ADMIT + else { + TaskEligibilityPolicy.Advice.DENY + } + } + + override fun toString(): String = "Random($probability)" +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt new file mode 100644 index 00000000..df03ba80 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt @@ -0,0 +1,62 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.task + +import org.opendc.workflows.service.StageWorkflowSchedulerListener +import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflows.service.TaskState +import org.opendc.workflows.workload.Task +import kotlin.random.Random + +/** + * A [TaskOrderPolicy] that orders the tasks randomly. + */ +object RandomTaskOrderPolicy : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = + object : Comparator<TaskState>, StageWorkflowSchedulerListener { + private val random = Random(123) + private val ids = HashMap<Task, Int>() + + init { + scheduler.addListener(this) + } + + override fun taskReady(task: TaskState) { + ids[task.task] = random.nextInt() + } + + override fun taskFinished(task: TaskState) { + ids.remove(task.task) + } + + override fun compare(o1: TaskState, o2: TaskState): Int { + return compareValuesBy(o1, o2) { ids.getValue(it.task) } + } + } + + override fun toString(): String { + return "Random" + } +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt new file mode 100644 index 00000000..e6727e8a --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt @@ -0,0 +1,41 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.task + +import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflows.service.TaskState + +/** + * A [TaskOrderPolicy] that orders tasks based on the order of arrival in the queue. + */ +data class SubmissionTimeTaskOrderPolicy(val ascending: Boolean = true) : TaskOrderPolicy { + override fun invoke(scheduler: StageWorkflowService) = compareBy<TaskState> { + it.job.submittedAt.let { if (ascending) it else -it } + } + + override fun toString(): String { + return "Submission-Time(${if (ascending) "asc" else "desc"})" + } +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt new file mode 100644 index 00000000..1eb2fab0 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt @@ -0,0 +1,72 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.task + +import org.opendc.workflows.service.TaskState +import org.opendc.workflows.service.stage.StagePolicy + +/** + * A policy interface for determining the eligibility of tasks in a scheduling cycle. + */ +interface TaskEligibilityPolicy : StagePolicy<TaskEligibilityPolicy.Logic> { + interface Logic { + /** + * Determine whether the specified [TaskState] is eligible to be scheduled. + * + * @param task The task instance to schedule. + * @return The advice for marking the task. + */ + operator fun invoke(task: TaskState): Advice + } + + /** + * The advice given to the scheduler by an admission policy. + * + * @property admit A flag to indicate to the scheduler that the job should be admitted. + * @property stop A flag to indicate the scheduler should immediately stop admitting jobs to the scheduling queue and wait + * for the next scheduling cycle. + */ + enum class Advice(val admit: Boolean, val stop: Boolean) { + /** + * Admit the current job to the scheduling queue and continue admitting jobs. + */ + ADMIT(true, false), + + /** + * Admit the current job to the scheduling queue and stop admitting jobs. + */ + ADMIT_LAST(true, true), + + /** + * Deny the current job, but continue admitting jobs. + */ + DENY(false, false), + + /** + * Deny the current job and also stop admitting jobs. + */ + STOP(false, true) + } +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskOrderPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskOrderPolicy.kt new file mode 100644 index 00000000..0a3ce077 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskOrderPolicy.kt @@ -0,0 +1,34 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.service.stage.task + +import org.opendc.workflows.service.TaskState +import org.opendc.workflows.service.stage.StagePolicy + +/** + * This interface represents the **T2** stage of the Reference Architecture for Topology Schedulers and provides the + * scheduler with a sorted list of tasks to schedule. + */ +interface TaskOrderPolicy : StagePolicy<Comparator<TaskState>> diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Job.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Job.kt new file mode 100644 index 00000000..30285507 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Job.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.workload + +import org.opendc.core.User +import org.opendc.core.workload.Workload +import java.util.* + +/** + * A workload that represents a directed acyclic graph (DAG) of tasks with control and data dependencies between tasks. + * + * @property uid A unique identified of this workflow. + * @property name The name of this workflow. + * @property owner The owner of the workflow. + * @property tasks The tasks that are part of this workflow. + * @property metadata Additional metadata for the job. + */ +data class Job( + override val uid: UUID, + override val name: String, + override val owner: User, + val tasks: Set<Task>, + val metadata: Map<String, Any> = emptyMap() +) : Workload { + override fun equals(other: Any?): Boolean = other is Job && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "Job(uid=$uid, name=$name, tasks=${tasks.size}, metadata=$metadata)" +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt new file mode 100644 index 00000000..99bd1cd3 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt @@ -0,0 +1,30 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.workload + +/** + * Meta-data key for the deadline of a task. + */ +const val WORKFLOW_TASK_DEADLINE = "workflow:task:deadline" diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt new file mode 100644 index 00000000..864eede5 --- /dev/null +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt @@ -0,0 +1,50 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflows.workload + +import org.opendc.compute.core.image.Image +import org.opendc.core.Identity +import java.util.* + +/** + * A stage of a [Job]. + * + * @property uid A unique identified of this task. + * @property name The name of this task. + * @property image The application image to run as part of this workflow task. + * @property dependencies The dependencies of this task in order for it to execute. + * @property metadata Additional metadata for this task. + */ +data class Task( + override val uid: UUID, + override val name: String, + val image: Image, + val dependencies: Set<Task>, + val metadata: Map<String, Any> = emptyMap() +) : Identity { + override fun equals(other: Any?): Boolean = other is Task && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} |
