From 92e858e398bf69380dbacebc042dde2bfa8cfe9c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 14 Feb 2020 12:43:29 +0100 Subject: refactor: Integrate opendc-compute in existing model This change refactors the existing model to use the new interfaces from the opendc-compute module. --- .../opendc/workflows/monitor/WorkflowMonitor.kt | 53 ++++ .../workflows/service/StageWorkflowScheduler.kt | 59 ----- .../service/StageWorkflowSchedulerLogic.kt | 275 --------------------- .../workflows/service/StageWorkflowService.kt | 265 ++++++++++++++++++++ .../atlarge/opendc/workflows/service/TaskState.kt | 35 +++ .../opendc/workflows/service/WorkflowScheduler.kt | 48 ---- .../workflows/service/WorkflowSchedulerLogic.kt | 56 ----- .../opendc/workflows/service/WorkflowService.kt | 149 +---------- .../service/stage/job/FifoJobSortingPolicy.kt | 8 +- .../service/stage/job/JobAdmissionPolicy.kt | 10 +- .../service/stage/job/JobSortingPolicy.kt | 8 +- .../service/stage/job/NullJobAdmissionPolicy.kt | 6 +- .../service/stage/job/RandomJobSortingPolicy.kt | 8 +- .../resource/FirstFitResourceSelectionPolicy.kt | 12 +- .../FunctionalResourceDynamicFilterPolicy.kt | 14 +- .../stage/resource/ResourceDynamicFilterPolicy.kt | 12 +- .../stage/resource/ResourceSelectionPolicy.kt | 12 +- .../service/stage/task/FifoTaskSortingPolicy.kt | 8 +- .../stage/task/FunctionalTaskEligibilityPolicy.kt | 10 +- .../service/stage/task/RandomTaskSortingPolicy.kt | 8 +- .../service/stage/task/TaskEligibilityPolicy.kt | 8 +- .../service/stage/task/TaskSortingPolicy.kt | 8 +- .../com/atlarge/opendc/workflows/workload/Task.kt | 6 +- 23 files changed, 428 insertions(+), 650 deletions(-) create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt delete mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowScheduler.kt delete mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerLogic.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt create mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt delete mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowScheduler.kt delete mode 100644 opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerLogic.kt (limited to 'opendc/opendc-workflows/src') diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt new file mode 100644 index 00000000..3c77d57a --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt @@ -0,0 +1,53 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.monitor + +import com.atlarge.opendc.workflows.workload.Job +import com.atlarge.opendc.workflows.workload.Task + +/** + * An interface for monitoring the progression of workflows. + */ +public interface WorkflowMonitor { + /** + * This method is invoked when a job has become active. + */ + public suspend fun onJobStart(job: Job, time: Long) + + /** + * This method is invoked when a job has finished processing. + */ + public suspend fun onJobFinish(job: Job, time: Long) + + /** + * This method is invoked when a task of a job has started processing. + */ + public suspend fun onTaskStart(job: Job, task: Task, time: Long) + + /** + * This method is invoked when a task has finished processing. + */ + public suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowScheduler.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowScheduler.kt deleted file mode 100644 index d4240421..00000000 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowScheduler.kt +++ /dev/null @@ -1,59 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.workflows.service - -import com.atlarge.odcsim.ProcessContext -import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse -import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy -import com.atlarge.opendc.workflows.service.stage.job.JobSortingPolicy -import com.atlarge.opendc.workflows.service.stage.resource.ResourceDynamicFilterPolicy -import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPolicy -import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy -import com.atlarge.opendc.workflows.service.stage.task.TaskSortingPolicy -import kotlinx.coroutines.CoroutineScope - -/** - * A [WorkflowScheduler] that distributes work through a multi-stage process based on the Reference Architecture for - * Datacenter Scheduling. - */ -class StageWorkflowScheduler( - private val mode: WorkflowSchedulerMode, - private val jobAdmissionPolicy: JobAdmissionPolicy, - private val jobSortingPolicy: JobSortingPolicy, - private val taskEligibilityPolicy: TaskEligibilityPolicy, - private val taskSortingPolicy: TaskSortingPolicy, - private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy, - private val resourceSelectionPolicy: ResourceSelectionPolicy -) : WorkflowScheduler { - override fun invoke( - ctx: ProcessContext, - self: WorkflowServiceRef, - coroutineScope: CoroutineScope, - lease: ProvisioningResponse.Lease - ): WorkflowSchedulerLogic { - return StageWorkflowSchedulerLogic(ctx, self, coroutineScope, lease, mode, jobAdmissionPolicy, - jobSortingPolicy, taskEligibilityPolicy, taskSortingPolicy, resourceDynamicFilterPolicy, resourceSelectionPolicy) - } -} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerLogic.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerLogic.kt deleted file mode 100644 index c6162f5e..00000000 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerLogic.kt +++ /dev/null @@ -1,275 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.workflows.service - -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendPort -import com.atlarge.odcsim.SendRef -import com.atlarge.odcsim.sendOnce -import com.atlarge.opendc.core.resources.compute.MachineEvent -import com.atlarge.opendc.core.resources.compute.MachineMessage -import com.atlarge.opendc.core.resources.compute.MachineRef -import com.atlarge.opendc.core.resources.compute.scheduling.ProcessObserver -import com.atlarge.opendc.core.resources.compute.scheduling.ProcessState -import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse -import com.atlarge.opendc.core.services.resources.HostView -import com.atlarge.opendc.core.workload.application.Application -import com.atlarge.opendc.core.workload.application.Pid -import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy -import com.atlarge.opendc.workflows.service.stage.job.JobSortingPolicy -import com.atlarge.opendc.workflows.service.stage.resource.ResourceDynamicFilterPolicy -import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPolicy -import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy -import com.atlarge.opendc.workflows.service.stage.task.TaskSortingPolicy -import com.atlarge.opendc.workflows.workload.Job -import com.atlarge.opendc.workflows.workload.Task -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch - -/** - * Logic of the [StageWorkflowScheduler]. - */ -class StageWorkflowSchedulerLogic( - ctx: ProcessContext, - self: WorkflowServiceRef, - coroutineScope: CoroutineScope, - lease: ProvisioningResponse.Lease, - private val mode: WorkflowSchedulerMode, - private val jobAdmissionPolicy: JobAdmissionPolicy, - private val jobSortingPolicy: JobSortingPolicy, - private val taskEligibilityPolicy: TaskEligibilityPolicy, - private val taskSortingPolicy: TaskSortingPolicy, - private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy, - private val resourceSelectionPolicy: ResourceSelectionPolicy -) : WorkflowSchedulerLogic(ctx, self, coroutineScope, lease) { - - /** - * The incoming jobs ready to be processed by the scheduler. - */ - internal val incomingJobs: MutableSet = mutableSetOf() - - /** - * The active jobs in the system. - */ - internal val activeJobs: MutableSet = mutableSetOf() - - /** - * The running tasks by [Pid]. - */ - internal val taskByPid = mutableMapOf() - - /** - * The available processor cores on the leased machines. - */ - internal val machineCores: MutableMap = HashMap() - - private val brokers: MutableMap, SendPort> = HashMap() - private val channel = ctx.open() - - init { - lease.hosts.forEach { machineCores[it] = it.host.cores.count() } - coroutineScope.launch { - ProcessObserver(ctx, this@StageWorkflowSchedulerLogic, channel.receive) - } - } - - override suspend fun submit(job: Job, handler: SendRef) { - val broker = brokers.getOrPut(handler) { ctx.connect(handler) } - - // J1 Incoming Jobs - val jobInstance = JobView(job, handler) - val instances = job.tasks.associateWith { - TaskView(jobInstance, it) - } - - for ((task, instance) in instances) { - instance.dependencies.addAll(task.dependencies.map { instances[it]!! }) - task.dependencies.forEach { - instances[it]!!.dependents.add(instance) - } - - // If the task has no dependency, it is a root task and can immediately be evaluated - if (instance.isRoot) { - instance.state = ProcessState.READY - } - } - - jobInstance.tasks = instances.values.toMutableSet() - incomingJobs += jobInstance - broker.send(WorkflowEvent.JobSubmitted(self, job, ctx.clock.millis())) - requestCycle() - } - - private var next: kotlinx.coroutines.Job? = null - - /** - * Indicate to the scheduler that a scheduling cycle is needed. - */ - private fun requestCycle() { - when (mode) { - is WorkflowSchedulerMode.Interactive -> { - coroutineScope.launch { - schedule() - } - } - is WorkflowSchedulerMode.Batch -> { - if (next == null) { - val job = coroutineScope.launch { - delay(mode.quantum) - schedule() - } - next = job - job.invokeOnCompletion { - next = null - } - } - } - } - } - - /** - * Perform a scheduling cycle immediately. - */ - override suspend fun schedule() { - // J2 Create list of eligible jobs - jobAdmissionPolicy.startCycle(this) - val eligibleJobs = incomingJobs.filter { jobAdmissionPolicy.shouldAdmit(this, it) } - for (jobInstance in eligibleJobs) { - incomingJobs -= jobInstance - activeJobs += jobInstance - brokers.getValue(jobInstance.broker).send(WorkflowEvent.JobStarted(self, jobInstance.job, ctx.clock.millis())) - } - - // J3 Sort jobs on criterion - val sortedJobs = jobSortingPolicy(this, activeJobs) - - // J4 Per job - for (jobInstance in sortedJobs) { - // T1 Create list of eligible tasks - taskEligibilityPolicy.startCycle(this) - val eligibleTasks = jobInstance.tasks.filter { taskEligibilityPolicy.isEligible(this, it) } - - // T2 Sort tasks on criterion - val sortedTasks = taskSortingPolicy(this, eligibleTasks) - - // T3 Per task - for (instance in sortedTasks) { - val hosts = resourceDynamicFilterPolicy(this, lease.hosts, instance) - val host = resourceSelectionPolicy.select(this, hosts, instance) - - if (host != null) { - // T4 Submit task to machine - host.ref.sendOnce(MachineMessage.Submit(instance.task.application, instance, channel.send)) - instance.host = host - instance.state = ProcessState.RUNNING // Assume the application starts running - machineCores.merge(host, instance.task.application.cores, Int::minus) - } else { - return - } - } - } - } - - override fun onSubmission(instance: MachineRef, application: Application, key: Any, pid: Pid) { - val task = key as TaskView - task.pid = pid - taskByPid[pid] = task - - brokers.getValue(task.job.broker).send(WorkflowEvent.TaskStarted(self, task.job.job, task.task, ctx.clock.millis())) - } - - override fun onTermination(instance: MachineRef, pid: Pid, status: Int) { - val task = taskByPid.remove(pid) ?: throw IllegalStateException() - - val job = task.job - task.state = ProcessState.TERMINATED - job.tasks.remove(task) - machineCores.merge(task.host!!, task.task.application.cores, Int::plus) - brokers.getValue(job.broker).send(WorkflowEvent.TaskFinished(self, job.job, task.task, status, ctx.clock.millis())) - - if (job.isFinished) { - activeJobs -= job - brokers.getValue(job.broker).send(WorkflowEvent.JobFinished(self, job.job, ctx.clock.millis())) - } - - requestCycle() - } - - class JobView(val job: Job, val broker: SendRef) { - /** - * A flag to indicate whether this job is finished. - */ - val isFinished: Boolean - get() = tasks.isEmpty() - - lateinit var tasks: MutableSet - } - - class TaskView(val job: JobView, val task: Task) { - /** - * The dependencies of this task. - */ - val dependencies = HashSet() - - /** - * The dependents of this task. - */ - val dependents = HashSet() - - /** - * A flag to indicate whether this workflow task instance is a workflow root. - */ - val isRoot: Boolean - get() = dependencies.isEmpty() - - var state: ProcessState = ProcessState.CREATED - set(value) { - field = value - - // Mark the process as terminated in the graph - if (value == ProcessState.TERMINATED) { - markTerminated() - } - } - - var pid: Pid? = null - - var host: HostView? = null - - /** - * Mark the specified [TaskView] as terminated. - */ - private fun markTerminated() { - for (dependent in dependents) { - dependent.dependencies.remove(this) - - if (dependent.isRoot) { - dependent.state = ProcessState.READY - } - } - } - } -} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt new file mode 100644 index 00000000..d7b29c32 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -0,0 +1,265 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +import com.atlarge.odcsim.ProcessContext +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.core.monitor.ServerMonitor +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.workflows.monitor.WorkflowMonitor +import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy +import com.atlarge.opendc.workflows.service.stage.job.JobSortingPolicy +import com.atlarge.opendc.workflows.service.stage.resource.ResourceDynamicFilterPolicy +import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPolicy +import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy +import com.atlarge.opendc.workflows.service.stage.task.TaskSortingPolicy +import com.atlarge.opendc.workflows.workload.Job +import com.atlarge.opendc.workflows.workload.Task +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch + +/** + * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for + * Datacenter Scheduling. + */ +class StageWorkflowService( + private val ctx: ProcessContext, + private val provisioningService: ProvisioningService, + private val mode: WorkflowSchedulerMode, + private val jobAdmissionPolicy: JobAdmissionPolicy, + private val jobSortingPolicy: JobSortingPolicy, + private val taskEligibilityPolicy: TaskEligibilityPolicy, + private val taskSortingPolicy: TaskSortingPolicy, + private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy, + private val resourceSelectionPolicy: ResourceSelectionPolicy +) : WorkflowService, ServerMonitor { + + /** + * The incoming jobs ready to be processed by the scheduler. + */ + internal val incomingJobs: MutableSet = mutableSetOf() + + /** + * The active jobs in the system. + */ + internal val activeJobs: MutableSet = mutableSetOf() + + /** + * The running tasks by [Server]. + */ + internal val taskByServer = mutableMapOf() + + /** + * The nodes that are controlled by the service. + */ + internal lateinit var nodes: List + + /** + * The available nodes. + */ + internal val available: MutableSet = mutableSetOf() + + init { + ctx.launch { + nodes = provisioningService.nodes().toList() + available.addAll(nodes) + } + } + + override suspend fun submit(job: Job, monitor: WorkflowMonitor) { + // J1 Incoming Jobs + val jobInstance = JobView(job, monitor) + val instances = job.tasks.associateWith { + TaskView(jobInstance, it) + } + + for ((task, instance) in instances) { + instance.dependencies.addAll(task.dependencies.map { instances[it]!! }) + task.dependencies.forEach { + instances[it]!!.dependents.add(instance) + } + + // If the task has no dependency, it is a root task and can immediately be evaluated + if (instance.isRoot) { + instance.state = TaskState.READY + } + } + + jobInstance.tasks = instances.values.toMutableSet() + incomingJobs += jobInstance + requestCycle() + } + + private var next: kotlinx.coroutines.Job? = null + + /** + * Indicate to the scheduler that a scheduling cycle is needed. + */ + private fun requestCycle() { + when (mode) { + is WorkflowSchedulerMode.Interactive -> { + ctx.launch { + schedule() + } + } + is WorkflowSchedulerMode.Batch -> { + if (next == null) { + val job = ctx.launch { + delay(mode.quantum) + next = null + schedule() + } + next = job + } + } + } + } + + /** + * Perform a scheduling cycle immediately. + */ + private suspend fun schedule() { + // J2 Create list of eligible jobs + jobAdmissionPolicy.startCycle(this) + val eligibleJobs = incomingJobs.filter { jobAdmissionPolicy.shouldAdmit(this, it) } + + for (jobInstance in eligibleJobs) { + incomingJobs -= jobInstance + activeJobs += jobInstance + jobInstance.monitor.onJobStart(jobInstance.job, ctx.clock.millis()) + } + + // J3 Sort jobs on criterion + val sortedJobs = jobSortingPolicy(this, activeJobs) + + // J4 Per job + for (jobInstance in sortedJobs) { + // T1 Create list of eligible tasks + taskEligibilityPolicy.startCycle(this) + val eligibleTasks = jobInstance.tasks.filter { taskEligibilityPolicy.isEligible(this, it) } + + // T2 Sort tasks on criterion + val sortedTasks = taskSortingPolicy(this, eligibleTasks) + + // T3 Per task + for (instance in sortedTasks) { + val hosts = resourceDynamicFilterPolicy(this, nodes, instance) + val host = resourceSelectionPolicy.select(this, hosts, instance) + + if (host != null) { + // T4 Submit task to machine + available -= host + instance.state = TaskState.ACTIVE + + val newHost = provisioningService.deploy(host, instance.task.image, this) + instance.host = newHost + taskByServer[newHost.server!!] = instance + } else { + return + } + } + } + } + + override suspend fun onUpdate(server: Server, previousState: ServerState) { + when (server.state) { + ServerState.ACTIVE -> { + val task = taskByServer.getValue(server) + task.job.monitor.onTaskStart(task.job.job, task.task, ctx.clock.millis()) + } + ServerState.SHUTOFF, ServerState.ERROR -> { + val task = taskByServer.remove(server) ?: throw IllegalStateException() + val job = task.job + task.state = TaskState.FINISHED + job.tasks.remove(task) + available += task.host!! + job.monitor.onTaskFinish(job.job, task.task, 0, ctx.clock.millis()) + + if (job.isFinished) { + activeJobs -= job + job.monitor.onJobFinish(job.job, ctx.clock.millis()) + } + + requestCycle() + } + else -> throw IllegalStateException() + } + } + + class JobView(val job: Job, val monitor: WorkflowMonitor) { + /** + * A flag to indicate whether this job is finished. + */ + val isFinished: Boolean + get() = tasks.isEmpty() + + lateinit var tasks: MutableSet + } + + class TaskView(val job: JobView, val task: Task) { + /** + * The dependencies of this task. + */ + val dependencies = HashSet() + + /** + * The dependents of this task. + */ + val dependents = HashSet() + + /** + * A flag to indicate whether this workflow task instance is a workflow root. + */ + val isRoot: Boolean + get() = dependencies.isEmpty() + + var state: TaskState = TaskState.CREATED + set(value) { + field = value + + // Mark the process as terminated in the graph + if (value == TaskState.FINISHED) { + markTerminated() + } + } + + var host: Node? = null + + /** + * Mark the specified [TaskView] as terminated. + */ + private fun markTerminated() { + for (dependent in dependents) { + dependent.dependencies.remove(this) + + if (dependent.isRoot) { + dependent.state = TaskState.READY + } + } + } + } +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt new file mode 100644 index 00000000..ee0024f5 --- /dev/null +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.workflows.service + +/** + * The state of a workflow task. + */ +public enum class TaskState { + CREATED, + READY, + ACTIVE, + FINISHED +} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowScheduler.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowScheduler.kt deleted file mode 100644 index 6d6d4179..00000000 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowScheduler.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.workflows.service - -import com.atlarge.odcsim.ProcessContext -import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse -import kotlinx.coroutines.CoroutineScope - -/** - * A factory interface for constructing a [WorkflowSchedulerLogic]. - */ -interface WorkflowScheduler { - /** - * Construct a [WorkflowSchedulerLogic] in the given [ActorContext]. - * - * @param ctx The context in which the scheduler runs. - * @param timers The timer scheduler to use. - * @param lease The resource lease to use. - */ - operator fun invoke( - ctx: ProcessContext, - self: WorkflowServiceRef, - coroutineScope: CoroutineScope, - lease: ProvisioningResponse.Lease - ): WorkflowSchedulerLogic -} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerLogic.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerLogic.kt deleted file mode 100644 index 0b3ba828..00000000 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerLogic.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.workflows.service - -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendRef -import com.atlarge.opendc.core.resources.compute.scheduling.ProcessObserver -import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse -import com.atlarge.opendc.workflows.workload.Job -import kotlinx.coroutines.CoroutineScope - -/** - * A workflow scheduler interface that schedules jobs across machines. - * - * @property ctx The context in which the scheduler runs. - * @property timers The timer scheduler to use. - * @property lease The resource lease to use. - */ -abstract class WorkflowSchedulerLogic( - protected val ctx: ProcessContext, - protected val self: WorkflowServiceRef, - protected val coroutineScope: CoroutineScope, - protected val lease: ProvisioningResponse.Lease -) : ProcessObserver { - /** - * Submit the specified workflow for scheduling. - */ - abstract suspend fun submit(job: Job, handler: SendRef) - - /** - * Trigger an immediate scheduling cycle. - */ - abstract suspend fun schedule() -} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt index bed6b93b..524f4f9e 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt @@ -24,161 +24,24 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.Channel -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendRef -import com.atlarge.odcsim.ask -import com.atlarge.opendc.core.Zone -import com.atlarge.opendc.core.ZoneMessage -import com.atlarge.opendc.core.find -import com.atlarge.opendc.core.resources.compute.MachineEvent -import com.atlarge.opendc.core.services.AbstractService -import com.atlarge.opendc.core.services.Service -import com.atlarge.opendc.core.services.ServiceProvider -import com.atlarge.opendc.core.services.provisioning.ProvisioningMessage -import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse -import com.atlarge.opendc.core.services.provisioning.ProvisioningService +import com.atlarge.opendc.core.services.AbstractServiceKey +import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.workload.Job -import com.atlarge.opendc.workflows.workload.Task import java.util.UUID -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.isActive /** * A service for cloud workflow management. * * The workflow scheduler is modelled after the Reference Architecture for Datacenter Scheduling by Andreadis et al. */ -class WorkflowService(val scheduler: WorkflowScheduler) : ServiceProvider { - override val uid: UUID = UUID.randomUUID() - override val name: String = "workflows" - override val provides: Set> = setOf(WorkflowService) - override val dependencies: Set> = setOf(ProvisioningService) - - /** - * Build the runtime [Behavior] for the workflow service, responding to messages of shape [WorkflowMessage]. - */ - override suspend fun invoke(ctx: ProcessContext, zone: Zone, zoneRef: SendRef, main: Channel) { - coroutineScope { - val inlet = ctx.listen(main.receive) - val provisioner = zoneRef.find(ProvisioningService) - // Wait for 0.1 sec before asking the provisioner to allow it to initialize. Will return empty response if asked - // immediately. - delay(10) - val lease: ProvisioningResponse.Lease = provisioner.ask { ProvisioningMessage.Request(Int.MAX_VALUE, it) } - val schedulerLogic = scheduler(ctx, main.send, this, lease) - - while (isActive) { - when (val msg = inlet.receive()) { - is WorkflowMessage.Submit -> { - schedulerLogic.submit(msg.job, msg.broker) - } - is MachineEvent.Submitted -> { - schedulerLogic.onSubmission(msg.instance, msg.application, msg.key, msg.pid) - } - is MachineEvent.Terminated -> { - schedulerLogic.onTermination(msg.instance, msg.pid, msg.status) - } - } - } - } - } - - companion object : AbstractService(UUID.randomUUID(), "workflows") -} - -/** - * A reference to the workflow service instance. - */ -typealias WorkflowServiceRef = SendRef - -/** - * A message protocol for communicating to the workflow service. - */ -sealed class WorkflowMessage { +public interface WorkflowService { /** * Submit the specified [Job] to the workflow service for scheduling. - * - * @property job The workflow to submit for scheduling. - * @property broker The broker that has submitted this workflow on behalf of a user and that needs to be kept - * up-to-date. - */ - data class Submit(val job: Job, val broker: SendRef) : WorkflowMessage() -} - -/** - * A message protocol used by the workflow service to respond to [WorkflowMessage]s. - */ -sealed class WorkflowEvent { - /** - * Indicate that the specified [Job] was submitted to the workflow service. - * - * @property service The reference to the service the job was submitted to. - * @property job The job that has been submitted. - * @property time A timestamp of the moment the job was received. - */ - data class JobSubmitted( - val service: WorkflowServiceRef, - val job: Job, - val time: Long - ) : WorkflowEvent() - - /** - * Indicate that the specified [Job] has become active. - * - * @property service The reference to the service the job was submitted to. - * @property job The job that has been submitted. - * @property time A timestamp of the moment the job started. - */ - data class JobStarted( - val service: WorkflowServiceRef, - val job: Job, - val time: Long - ) : WorkflowEvent() - - /** - * Indicate that the specified [Task] has started processing. - * - * @property service The reference to the service the job was submitted to. - * @property job The job that contains this task. - * @property task The task that has started processing. - * @property time A timestamp of the moment the task started. - */ - data class TaskStarted( - val service: WorkflowServiceRef, - val job: Job, - val task: Task, - val time: Long - ) : WorkflowEvent() - - /** - * Indicate that the specified [Task] has started processing. - * - * @property service The reference to the service the job was submitted to. - * @property job The job that contains this task. - * @property task The task that has started processing. - * @property status The exit code of the task, where zero means successful. - * @property time A timestamp of the moment the task finished. */ - data class TaskFinished( - val service: WorkflowServiceRef, - val job: Job, - val task: Task, - val status: Int, - val time: Long - ) : WorkflowEvent() + public suspend fun submit(job: Job, monitor: WorkflowMonitor) /** - * Indicate that the specified [Job] has finished processing. - * - * @property service The reference to the service the job was submitted to. - * @property job The job that has finished processing. - * @property time A timestamp of the moment the task finished. + * The service key for the workflow scheduler. */ - data class JobFinished( - val service: WorkflowServiceRef, - val job: Job, - val time: Long - ) : WorkflowEvent() + companion object Key : AbstractServiceKey(UUID.randomUUID(), "workflows") } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt index 333ed35a..976fbbf3 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt @@ -24,14 +24,14 @@ package com.atlarge.opendc.workflows.service.stage.job -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * The [FifoJobSortingPolicy] sorts tasks based on the order of arrival in the queue. */ class FifoJobSortingPolicy : JobSortingPolicy { override fun invoke( - scheduler: StageWorkflowSchedulerLogic, - jobs: Collection - ): List = jobs.toList() + scheduler: StageWorkflowService, + jobs: Collection + ): List = jobs.toList() } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt index d3a5d9a6..cdaad512 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt @@ -24,10 +24,10 @@ package com.atlarge.opendc.workflows.service.stage.job -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** - * A policy interface for admitting [StageWorkflowSchedulerLogic.JobView]s to a scheduling cycle. + * A policy interface for admitting [StageWorkflowService.JobView]s to a scheduling cycle. */ interface JobAdmissionPolicy { /** @@ -35,14 +35,14 @@ interface JobAdmissionPolicy { * * @param scheduler The scheduler that started the cycle. */ - fun startCycle(scheduler: StageWorkflowSchedulerLogic) {} + fun startCycle(scheduler: StageWorkflowService) {} /** - * Determine whether the specified [StageWorkflowSchedulerLogic.JobView] should be admitted to the scheduling cycle. + * Determine whether the specified [StageWorkflowService.JobView] should be admitted to the scheduling cycle. * * @param scheduler The scheduler that should admit or reject the job. * @param job The workflow that has been submitted. * @return `true` if the workflow may be admitted to the scheduling cycle, `false` otherwise. */ - fun shouldAdmit(scheduler: StageWorkflowSchedulerLogic, job: StageWorkflowSchedulerLogic.JobView): Boolean + fun shouldAdmit(scheduler: StageWorkflowService, job: StageWorkflowService.JobView): Boolean } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt index ada3e693..c3a5dab5 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service.stage.job -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * A policy interface for ordering admitted workflows in the scheduling queue. @@ -38,7 +38,7 @@ interface JobSortingPolicy { * @return The sorted list of jobs. */ operator fun invoke( - scheduler: StageWorkflowSchedulerLogic, - jobs: Collection - ): List + scheduler: StageWorkflowService, + jobs: Collection + ): List } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt index f877403b..ad90839c 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service.stage.job -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * A [JobAdmissionPolicy] that admits all jobs. @@ -34,7 +34,7 @@ object NullJobAdmissionPolicy : JobAdmissionPolicy { * Admit every submitted job. */ override fun shouldAdmit( - scheduler: StageWorkflowSchedulerLogic, - job: StageWorkflowSchedulerLogic.JobView + scheduler: StageWorkflowService, + job: StageWorkflowService.JobView ): Boolean = true } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt index 30d5c456..9ce2811c 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service.stage.job -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService import kotlin.random.Random /** @@ -34,7 +34,7 @@ import kotlin.random.Random */ class RandomJobSortingPolicy(private val random: Random = Random.Default) : JobSortingPolicy { override fun invoke( - scheduler: StageWorkflowSchedulerLogic, - jobs: Collection - ): List = jobs.shuffled(random) + scheduler: StageWorkflowService, + jobs: Collection + ): List = jobs.shuffled(random) } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt index c3307063..e2490214 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt @@ -24,17 +24,17 @@ package com.atlarge.opendc.workflows.service.stage.resource -import com.atlarge.opendc.core.services.resources.HostView -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * A [ResourceSelectionPolicy] that selects the first machine that is available. */ class FirstFitResourceSelectionPolicy : ResourceSelectionPolicy { override fun select( - scheduler: StageWorkflowSchedulerLogic, - machines: List, - task: StageWorkflowSchedulerLogic.TaskView - ): HostView? = + scheduler: StageWorkflowService, + machines: List, + task: StageWorkflowService.TaskView + ): Node? = machines.firstOrNull() } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt index d742f842..a8f2fda9 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.workflows.service.stage.resource -import com.atlarge.opendc.core.services.resources.HostView -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * A [ResourceDynamicFilterPolicy] based on the amount of cores available on the machine and the cores required for @@ -33,11 +33,11 @@ import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic */ class FunctionalResourceDynamicFilterPolicy : ResourceDynamicFilterPolicy { override fun invoke( - scheduler: StageWorkflowSchedulerLogic, - machines: List, - task: StageWorkflowSchedulerLogic.TaskView - ): List { + scheduler: StageWorkflowService, + machines: List, + task: StageWorkflowService.TaskView + ): List { return machines - .filter { scheduler.machineCores[it] ?: 0 >= task.task.application.cores } + .filter { it in scheduler.available } } } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt index 8a3b5a1e..8d8ceec2 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.workflows.service.stage.resource -import com.atlarge.opendc.core.services.resources.HostView -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * This interface represents the **R4** stage of the Reference Architecture for Schedulers and acts as a filter yielding @@ -42,8 +42,8 @@ interface ResourceDynamicFilterPolicy { * @return The machines on which the task can be scheduled. */ operator fun invoke( - scheduler: StageWorkflowSchedulerLogic, - machines: List, - task: StageWorkflowSchedulerLogic.TaskView - ): List + scheduler: StageWorkflowService, + machines: List, + task: StageWorkflowService.TaskView + ): List } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt index 90b2873c..38fe5886 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.workflows.service.stage.resource -import com.atlarge.opendc.core.services.resources.HostView -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected @@ -41,8 +41,8 @@ interface ResourceSelectionPolicy { * @return The selected machine or `null` if no machine could be found. */ fun select( - scheduler: StageWorkflowSchedulerLogic, - machines: List, - task: StageWorkflowSchedulerLogic.TaskView - ): HostView? + scheduler: StageWorkflowService, + machines: List, + task: StageWorkflowService.TaskView + ): Node? } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt index 48a1a50d..bba81d27 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt @@ -24,14 +24,14 @@ package com.atlarge.opendc.workflows.service.stage.task -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * The [FifoTaskSortingPolicy] sorts tasks based on the order of arrival in the queue. */ class FifoTaskSortingPolicy : TaskSortingPolicy { override fun invoke( - scheduler: StageWorkflowSchedulerLogic, - tasks: Collection - ): List = tasks.toList() + scheduler: StageWorkflowService, + tasks: Collection + ): List = tasks.toList() } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt index 1672633e..72ecbee2 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt @@ -24,15 +24,15 @@ package com.atlarge.opendc.workflows.service.stage.task -import com.atlarge.opendc.core.resources.compute.scheduling.ProcessState -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState /** * A [TaskEligibilityPolicy] that marks tasks as eligible if they are tasks roots within the job. */ class FunctionalTaskEligibilityPolicy : TaskEligibilityPolicy { override fun isEligible( - scheduler: StageWorkflowSchedulerLogic, - task: StageWorkflowSchedulerLogic.TaskView - ): Boolean = task.state == ProcessState.READY + scheduler: StageWorkflowService, + task: StageWorkflowService.TaskView + ): Boolean = task.state == TaskState.READY } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt index 36ef3a50..1b1d5b44 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service.stage.task -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService import kotlin.random.Random /** @@ -34,7 +34,7 @@ import kotlin.random.Random */ class RandomTaskSortingPolicy(private val random: Random = Random.Default) : TaskSortingPolicy { override fun invoke( - scheduler: StageWorkflowSchedulerLogic, - tasks: Collection - ): List = tasks.shuffled(random) + scheduler: StageWorkflowService, + tasks: Collection + ): List = tasks.shuffled(random) } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt index 19f0240b..19954d7b 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service.stage.task -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * A policy interface for determining the eligibility of tasks in a scheduling cycle. @@ -35,14 +35,14 @@ interface TaskEligibilityPolicy { * * @param scheduler The scheduler that started the cycle. */ - fun startCycle(scheduler: StageWorkflowSchedulerLogic) {} + fun startCycle(scheduler: StageWorkflowService) {} /** - * Determine whether the specified [StageWorkflowSchedulerLogic.TaskView] is eligible to be scheduled. + * Determine whether the specified [StageWorkflowService.TaskView] is eligible to be scheduled. * * @param scheduler The scheduler that is determining whether the task is eligible. * @param task The task instance to schedule. * @return `true` if the task eligible to be scheduled, `false` otherwise. */ - fun isEligible(scheduler: StageWorkflowSchedulerLogic, task: StageWorkflowSchedulerLogic.TaskView): Boolean + fun isEligible(scheduler: StageWorkflowService, task: StageWorkflowService.TaskView): Boolean } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt index 6a65ed69..aabc44a9 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service.stage.task -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * This interface represents the **T2** stage of the Reference Architecture for Datacenter Schedulers and provides the @@ -39,7 +39,7 @@ interface TaskSortingPolicy { * @return The sorted list of tasks. */ operator fun invoke( - scheduler: StageWorkflowSchedulerLogic, - tasks: Collection - ): List + scheduler: StageWorkflowService, + tasks: Collection + ): List } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt index 25fe7348..b5997b35 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.workflows.workload +import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.Identity -import com.atlarge.opendc.core.workload.application.Application import java.util.UUID /** @@ -33,13 +33,13 @@ import java.util.UUID * * @property uid A unique identified of this task. * @property name The name of this task. - * @property application The application to run as part of this workflow task. + * @property image The application image to run as part of this workflow task. * @property dependencies The dependencies of this task in order for it to execute. */ data class Task( override val uid: UUID, override val name: String, - val application: Application, + val image: Image, val dependencies: Set ) : Identity { override fun equals(other: Any?): Boolean = other is Task && uid == other.uid -- cgit v1.2.3