diff options
| author | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-02-14 19:11:35 +0100 |
|---|---|---|
| committer | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-02-14 19:11:35 +0100 |
| commit | 04e4bddccc4e06a126f3c6ee2878502323c7116e (patch) | |
| tree | 716253f4c03cf8dc6754430a5f63d0a57061d795 /opendc/opendc-workflows/src | |
| parent | cd293b79ef2066ffcb605b9c625d6ab0a9af1d16 (diff) | |
| parent | b13ba01e967e1a281d58b37cb57986b47ec99dd8 (diff) | |
Merge branch 'feat/cpu-sharing' into 'feat/2.x'
Add basis for VM modeling and fractional space-sharing
See merge request opendc/opendc-simulator!23
Diffstat (limited to 'opendc/opendc-workflows/src')
20 files changed, 166 insertions, 388 deletions
diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerLogic.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt index 0b3ba828..3c77d57a 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowSchedulerLogic.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/monitor/WorkflowMonitor.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2020 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,35 +22,32 @@ * SOFTWARE. */ -package com.atlarge.opendc.workflows.service +package com.atlarge.opendc.workflows.monitor -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendRef -import com.atlarge.opendc.core.resources.compute.scheduling.ProcessObserver -import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse import com.atlarge.opendc.workflows.workload.Job -import kotlinx.coroutines.CoroutineScope +import com.atlarge.opendc.workflows.workload.Task /** - * A workflow scheduler interface that schedules jobs across machines. - * - * @property ctx The context in which the scheduler runs. - * @property timers The timer scheduler to use. - * @property lease The resource lease to use. + * An interface for monitoring the progression of workflows. */ -abstract class WorkflowSchedulerLogic( - protected val ctx: ProcessContext, - protected val self: WorkflowServiceRef, - protected val coroutineScope: CoroutineScope, - protected val lease: ProvisioningResponse.Lease -) : ProcessObserver { +public interface WorkflowMonitor { + /** + * This method is invoked when a job has become active. + */ + public suspend fun onJobStart(job: Job, time: Long) + + /** + * This method is invoked when a job has finished processing. + */ + public suspend fun onJobFinish(job: Job, time: Long) + /** - * Submit the specified workflow for scheduling. + * This method is invoked when a task of a job has started processing. */ - abstract suspend fun submit(job: Job, handler: SendRef<WorkflowEvent>) + public suspend fun onTaskStart(job: Job, task: Task, time: Long) /** - * Trigger an immediate scheduling cycle. + * This method is invoked when a task has finished processing. */ - abstract suspend fun schedule() + public suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowScheduler.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowScheduler.kt deleted file mode 100644 index d4240421..00000000 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowScheduler.kt +++ /dev/null @@ -1,59 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.workflows.service - -import com.atlarge.odcsim.ProcessContext -import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse -import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy -import com.atlarge.opendc.workflows.service.stage.job.JobSortingPolicy -import com.atlarge.opendc.workflows.service.stage.resource.ResourceDynamicFilterPolicy -import com.atlarge.opendc.workflows.service.stage.resource.ResourceSelectionPolicy -import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy -import com.atlarge.opendc.workflows.service.stage.task.TaskSortingPolicy -import kotlinx.coroutines.CoroutineScope - -/** - * A [WorkflowScheduler] that distributes work through a multi-stage process based on the Reference Architecture for - * Datacenter Scheduling. - */ -class StageWorkflowScheduler( - private val mode: WorkflowSchedulerMode, - private val jobAdmissionPolicy: JobAdmissionPolicy, - private val jobSortingPolicy: JobSortingPolicy, - private val taskEligibilityPolicy: TaskEligibilityPolicy, - private val taskSortingPolicy: TaskSortingPolicy, - private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy, - private val resourceSelectionPolicy: ResourceSelectionPolicy -) : WorkflowScheduler { - override fun invoke( - ctx: ProcessContext, - self: WorkflowServiceRef, - coroutineScope: CoroutineScope, - lease: ProvisioningResponse.Lease - ): WorkflowSchedulerLogic { - return StageWorkflowSchedulerLogic(ctx, self, coroutineScope, lease, mode, jobAdmissionPolicy, - jobSortingPolicy, taskEligibilityPolicy, taskSortingPolicy, resourceDynamicFilterPolicy, resourceSelectionPolicy) - } -} diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerLogic.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt index c6162f5e..d7b29c32 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerLogic.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowService.kt @@ -25,18 +25,12 @@ package com.atlarge.opendc.workflows.service import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendPort -import com.atlarge.odcsim.SendRef -import com.atlarge.odcsim.sendOnce -import com.atlarge.opendc.core.resources.compute.MachineEvent -import com.atlarge.opendc.core.resources.compute.MachineMessage -import com.atlarge.opendc.core.resources.compute.MachineRef -import com.atlarge.opendc.core.resources.compute.scheduling.ProcessObserver -import com.atlarge.opendc.core.resources.compute.scheduling.ProcessState -import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse -import com.atlarge.opendc.core.services.resources.HostView -import com.atlarge.opendc.core.workload.application.Application -import com.atlarge.opendc.core.workload.application.Pid +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.core.monitor.ServerMonitor +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.stage.job.JobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.job.JobSortingPolicy import com.atlarge.opendc.workflows.service.stage.resource.ResourceDynamicFilterPolicy @@ -45,18 +39,16 @@ import com.atlarge.opendc.workflows.service.stage.task.TaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.TaskSortingPolicy import com.atlarge.opendc.workflows.workload.Job import com.atlarge.opendc.workflows.workload.Task -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch /** - * Logic of the [StageWorkflowScheduler]. + * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for + * Datacenter Scheduling. */ -class StageWorkflowSchedulerLogic( - ctx: ProcessContext, - self: WorkflowServiceRef, - coroutineScope: CoroutineScope, - lease: ProvisioningResponse.Lease, +class StageWorkflowService( + private val ctx: ProcessContext, + private val provisioningService: ProvisioningService, private val mode: WorkflowSchedulerMode, private val jobAdmissionPolicy: JobAdmissionPolicy, private val jobSortingPolicy: JobSortingPolicy, @@ -64,7 +56,7 @@ class StageWorkflowSchedulerLogic( private val taskSortingPolicy: TaskSortingPolicy, private val resourceDynamicFilterPolicy: ResourceDynamicFilterPolicy, private val resourceSelectionPolicy: ResourceSelectionPolicy -) : WorkflowSchedulerLogic(ctx, self, coroutineScope, lease) { +) : WorkflowService, ServerMonitor { /** * The incoming jobs ready to be processed by the scheduler. @@ -77,30 +69,30 @@ class StageWorkflowSchedulerLogic( internal val activeJobs: MutableSet<JobView> = mutableSetOf() /** - * The running tasks by [Pid]. + * The running tasks by [Server]. */ - internal val taskByPid = mutableMapOf<Pid, TaskView>() + internal val taskByServer = mutableMapOf<Server, TaskView>() /** - * The available processor cores on the leased machines. + * The nodes that are controlled by the service. */ - internal val machineCores: MutableMap<HostView, Int> = HashMap() + internal lateinit var nodes: List<Node> - private val brokers: MutableMap<SendRef<WorkflowEvent>, SendPort<WorkflowEvent>> = HashMap() - private val channel = ctx.open<MachineEvent>() + /** + * The available nodes. + */ + internal val available: MutableSet<Node> = mutableSetOf() init { - lease.hosts.forEach { machineCores[it] = it.host.cores.count() } - coroutineScope.launch { - ProcessObserver(ctx, this@StageWorkflowSchedulerLogic, channel.receive) + ctx.launch { + nodes = provisioningService.nodes().toList() + available.addAll(nodes) } } - override suspend fun submit(job: Job, handler: SendRef<WorkflowEvent>) { - val broker = brokers.getOrPut(handler) { ctx.connect(handler) } - + override suspend fun submit(job: Job, monitor: WorkflowMonitor) { // J1 Incoming Jobs - val jobInstance = JobView(job, handler) + val jobInstance = JobView(job, monitor) val instances = job.tasks.associateWith { TaskView(jobInstance, it) } @@ -113,13 +105,12 @@ class StageWorkflowSchedulerLogic( // If the task has no dependency, it is a root task and can immediately be evaluated if (instance.isRoot) { - instance.state = ProcessState.READY + instance.state = TaskState.READY } } jobInstance.tasks = instances.values.toMutableSet() incomingJobs += jobInstance - broker.send(WorkflowEvent.JobSubmitted(self, job, ctx.clock.millis())) requestCycle() } @@ -131,20 +122,18 @@ class StageWorkflowSchedulerLogic( private fun requestCycle() { when (mode) { is WorkflowSchedulerMode.Interactive -> { - coroutineScope.launch { + ctx.launch { schedule() } } is WorkflowSchedulerMode.Batch -> { if (next == null) { - val job = coroutineScope.launch { + val job = ctx.launch { delay(mode.quantum) + next = null schedule() } next = job - job.invokeOnCompletion { - next = null - } } } } @@ -153,14 +142,15 @@ class StageWorkflowSchedulerLogic( /** * Perform a scheduling cycle immediately. */ - override suspend fun schedule() { + private suspend fun schedule() { // J2 Create list of eligible jobs jobAdmissionPolicy.startCycle(this) val eligibleJobs = incomingJobs.filter { jobAdmissionPolicy.shouldAdmit(this, it) } + for (jobInstance in eligibleJobs) { incomingJobs -= jobInstance activeJobs += jobInstance - brokers.getValue(jobInstance.broker).send(WorkflowEvent.JobStarted(self, jobInstance.job, ctx.clock.millis())) + jobInstance.monitor.onJobStart(jobInstance.job, ctx.clock.millis()) } // J3 Sort jobs on criterion @@ -177,15 +167,17 @@ class StageWorkflowSchedulerLogic( // T3 Per task for (instance in sortedTasks) { - val hosts = resourceDynamicFilterPolicy(this, lease.hosts, instance) + val hosts = resourceDynamicFilterPolicy(this, nodes, instance) val host = resourceSelectionPolicy.select(this, hosts, instance) if (host != null) { // T4 Submit task to machine - host.ref.sendOnce(MachineMessage.Submit(instance.task.application, instance, channel.send)) - instance.host = host - instance.state = ProcessState.RUNNING // Assume the application starts running - machineCores.merge(host, instance.task.application.cores, Int::minus) + available -= host + instance.state = TaskState.ACTIVE + + val newHost = provisioningService.deploy(host, instance.task.image, this) + instance.host = newHost + taskByServer[newHost.server!!] = instance } else { return } @@ -193,32 +185,32 @@ class StageWorkflowSchedulerLogic( } } - override fun onSubmission(instance: MachineRef, application: Application, key: Any, pid: Pid) { - val task = key as TaskView - task.pid = pid - taskByPid[pid] = task - - brokers.getValue(task.job.broker).send(WorkflowEvent.TaskStarted(self, task.job.job, task.task, ctx.clock.millis())) - } - - override fun onTermination(instance: MachineRef, pid: Pid, status: Int) { - val task = taskByPid.remove(pid) ?: throw IllegalStateException() - - val job = task.job - task.state = ProcessState.TERMINATED - job.tasks.remove(task) - machineCores.merge(task.host!!, task.task.application.cores, Int::plus) - brokers.getValue(job.broker).send(WorkflowEvent.TaskFinished(self, job.job, task.task, status, ctx.clock.millis())) + override suspend fun onUpdate(server: Server, previousState: ServerState) { + when (server.state) { + ServerState.ACTIVE -> { + val task = taskByServer.getValue(server) + task.job.monitor.onTaskStart(task.job.job, task.task, ctx.clock.millis()) + } + ServerState.SHUTOFF, ServerState.ERROR -> { + val task = taskByServer.remove(server) ?: throw IllegalStateException() + val job = task.job + task.state = TaskState.FINISHED + job.tasks.remove(task) + available += task.host!! + job.monitor.onTaskFinish(job.job, task.task, 0, ctx.clock.millis()) + + if (job.isFinished) { + activeJobs -= job + job.monitor.onJobFinish(job.job, ctx.clock.millis()) + } - if (job.isFinished) { - activeJobs -= job - brokers.getValue(job.broker).send(WorkflowEvent.JobFinished(self, job.job, ctx.clock.millis())) + requestCycle() + } + else -> throw IllegalStateException() } - - requestCycle() } - class JobView(val job: Job, val broker: SendRef<WorkflowEvent>) { + class JobView(val job: Job, val monitor: WorkflowMonitor) { /** * A flag to indicate whether this job is finished. */ @@ -245,19 +237,17 @@ class StageWorkflowSchedulerLogic( val isRoot: Boolean get() = dependencies.isEmpty() - var state: ProcessState = ProcessState.CREATED + var state: TaskState = TaskState.CREATED set(value) { field = value // Mark the process as terminated in the graph - if (value == ProcessState.TERMINATED) { + if (value == TaskState.FINISHED) { markTerminated() } } - var pid: Pid? = null - - var host: HostView? = null + var host: Node? = null /** * Mark the specified [TaskView] as terminated. @@ -267,7 +257,7 @@ class StageWorkflowSchedulerLogic( dependent.dependencies.remove(this) if (dependent.isRoot) { - dependent.state = ProcessState.READY + dependent.state = TaskState.READY } } } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowScheduler.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt index 6d6d4179..ee0024f5 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowScheduler.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/TaskState.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2020 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,25 +24,12 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.ProcessContext -import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse -import kotlinx.coroutines.CoroutineScope - /** - * A factory interface for constructing a [WorkflowSchedulerLogic]. + * The state of a workflow task. */ -interface WorkflowScheduler { - /** - * Construct a [WorkflowSchedulerLogic] in the given [ActorContext]. - * - * @param ctx The context in which the scheduler runs. - * @param timers The timer scheduler to use. - * @param lease The resource lease to use. - */ - operator fun invoke( - ctx: ProcessContext, - self: WorkflowServiceRef, - coroutineScope: CoroutineScope, - lease: ProvisioningResponse.Lease - ): WorkflowSchedulerLogic +public enum class TaskState { + CREATED, + READY, + ACTIVE, + FINISHED } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt index bed6b93b..524f4f9e 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/WorkflowService.kt @@ -24,161 +24,24 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.Channel -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.SendRef -import com.atlarge.odcsim.ask -import com.atlarge.opendc.core.Zone -import com.atlarge.opendc.core.ZoneMessage -import com.atlarge.opendc.core.find -import com.atlarge.opendc.core.resources.compute.MachineEvent -import com.atlarge.opendc.core.services.AbstractService -import com.atlarge.opendc.core.services.Service -import com.atlarge.opendc.core.services.ServiceProvider -import com.atlarge.opendc.core.services.provisioning.ProvisioningMessage -import com.atlarge.opendc.core.services.provisioning.ProvisioningResponse -import com.atlarge.opendc.core.services.provisioning.ProvisioningService +import com.atlarge.opendc.core.services.AbstractServiceKey +import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.workload.Job -import com.atlarge.opendc.workflows.workload.Task import java.util.UUID -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.isActive /** * A service for cloud workflow management. * * The workflow scheduler is modelled after the Reference Architecture for Datacenter Scheduling by Andreadis et al. */ -class WorkflowService(val scheduler: WorkflowScheduler) : ServiceProvider { - override val uid: UUID = UUID.randomUUID() - override val name: String = "workflows" - override val provides: Set<Service<*>> = setOf(WorkflowService) - override val dependencies: Set<Service<*>> = setOf(ProvisioningService) - - /** - * Build the runtime [Behavior] for the workflow service, responding to messages of shape [WorkflowMessage]. - */ - override suspend fun invoke(ctx: ProcessContext, zone: Zone, zoneRef: SendRef<ZoneMessage>, main: Channel<Any>) { - coroutineScope { - val inlet = ctx.listen(main.receive) - val provisioner = zoneRef.find(ProvisioningService) - // Wait for 0.1 sec before asking the provisioner to allow it to initialize. Will return empty response if asked - // immediately. - delay(10) - val lease: ProvisioningResponse.Lease = provisioner.ask { ProvisioningMessage.Request(Int.MAX_VALUE, it) } - val schedulerLogic = scheduler(ctx, main.send, this, lease) - - while (isActive) { - when (val msg = inlet.receive()) { - is WorkflowMessage.Submit -> { - schedulerLogic.submit(msg.job, msg.broker) - } - is MachineEvent.Submitted -> { - schedulerLogic.onSubmission(msg.instance, msg.application, msg.key, msg.pid) - } - is MachineEvent.Terminated -> { - schedulerLogic.onTermination(msg.instance, msg.pid, msg.status) - } - } - } - } - } - - companion object : AbstractService<WorkflowMessage>(UUID.randomUUID(), "workflows") -} - -/** - * A reference to the workflow service instance. - */ -typealias WorkflowServiceRef = SendRef<WorkflowMessage> - -/** - * A message protocol for communicating to the workflow service. - */ -sealed class WorkflowMessage { +public interface WorkflowService { /** * Submit the specified [Job] to the workflow service for scheduling. - * - * @property job The workflow to submit for scheduling. - * @property broker The broker that has submitted this workflow on behalf of a user and that needs to be kept - * up-to-date. - */ - data class Submit(val job: Job, val broker: SendRef<WorkflowEvent>) : WorkflowMessage() -} - -/** - * A message protocol used by the workflow service to respond to [WorkflowMessage]s. - */ -sealed class WorkflowEvent { - /** - * Indicate that the specified [Job] was submitted to the workflow service. - * - * @property service The reference to the service the job was submitted to. - * @property job The job that has been submitted. - * @property time A timestamp of the moment the job was received. - */ - data class JobSubmitted( - val service: WorkflowServiceRef, - val job: Job, - val time: Long - ) : WorkflowEvent() - - /** - * Indicate that the specified [Job] has become active. - * - * @property service The reference to the service the job was submitted to. - * @property job The job that has been submitted. - * @property time A timestamp of the moment the job started. - */ - data class JobStarted( - val service: WorkflowServiceRef, - val job: Job, - val time: Long - ) : WorkflowEvent() - - /** - * Indicate that the specified [Task] has started processing. - * - * @property service The reference to the service the job was submitted to. - * @property job The job that contains this task. - * @property task The task that has started processing. - * @property time A timestamp of the moment the task started. - */ - data class TaskStarted( - val service: WorkflowServiceRef, - val job: Job, - val task: Task, - val time: Long - ) : WorkflowEvent() - - /** - * Indicate that the specified [Task] has started processing. - * - * @property service The reference to the service the job was submitted to. - * @property job The job that contains this task. - * @property task The task that has started processing. - * @property status The exit code of the task, where zero means successful. - * @property time A timestamp of the moment the task finished. */ - data class TaskFinished( - val service: WorkflowServiceRef, - val job: Job, - val task: Task, - val status: Int, - val time: Long - ) : WorkflowEvent() + public suspend fun submit(job: Job, monitor: WorkflowMonitor) /** - * Indicate that the specified [Job] has finished processing. - * - * @property service The reference to the service the job was submitted to. - * @property job The job that has finished processing. - * @property time A timestamp of the moment the task finished. + * The service key for the workflow scheduler. */ - data class JobFinished( - val service: WorkflowServiceRef, - val job: Job, - val time: Long - ) : WorkflowEvent() + companion object Key : AbstractServiceKey<WorkflowService>(UUID.randomUUID(), "workflows") } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt index 333ed35a..976fbbf3 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/FifoJobSortingPolicy.kt @@ -24,14 +24,14 @@ package com.atlarge.opendc.workflows.service.stage.job -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * The [FifoJobSortingPolicy] sorts tasks based on the order of arrival in the queue. */ class FifoJobSortingPolicy : JobSortingPolicy { override fun invoke( - scheduler: StageWorkflowSchedulerLogic, - jobs: Collection<StageWorkflowSchedulerLogic.JobView> - ): List<StageWorkflowSchedulerLogic.JobView> = jobs.toList() + scheduler: StageWorkflowService, + jobs: Collection<StageWorkflowService.JobView> + ): List<StageWorkflowService.JobView> = jobs.toList() } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt index d3a5d9a6..cdaad512 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt @@ -24,10 +24,10 @@ package com.atlarge.opendc.workflows.service.stage.job -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** - * A policy interface for admitting [StageWorkflowSchedulerLogic.JobView]s to a scheduling cycle. + * A policy interface for admitting [StageWorkflowService.JobView]s to a scheduling cycle. */ interface JobAdmissionPolicy { /** @@ -35,14 +35,14 @@ interface JobAdmissionPolicy { * * @param scheduler The scheduler that started the cycle. */ - fun startCycle(scheduler: StageWorkflowSchedulerLogic) {} + fun startCycle(scheduler: StageWorkflowService) {} /** - * Determine whether the specified [StageWorkflowSchedulerLogic.JobView] should be admitted to the scheduling cycle. + * Determine whether the specified [StageWorkflowService.JobView] should be admitted to the scheduling cycle. * * @param scheduler The scheduler that should admit or reject the job. * @param job The workflow that has been submitted. * @return `true` if the workflow may be admitted to the scheduling cycle, `false` otherwise. */ - fun shouldAdmit(scheduler: StageWorkflowSchedulerLogic, job: StageWorkflowSchedulerLogic.JobView): Boolean + fun shouldAdmit(scheduler: StageWorkflowService, job: StageWorkflowService.JobView): Boolean } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt index ada3e693..c3a5dab5 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/JobSortingPolicy.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service.stage.job -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * A policy interface for ordering admitted workflows in the scheduling queue. @@ -38,7 +38,7 @@ interface JobSortingPolicy { * @return The sorted list of jobs. */ operator fun invoke( - scheduler: StageWorkflowSchedulerLogic, - jobs: Collection<StageWorkflowSchedulerLogic.JobView> - ): List<StageWorkflowSchedulerLogic.JobView> + scheduler: StageWorkflowService, + jobs: Collection<StageWorkflowService.JobView> + ): List<StageWorkflowService.JobView> } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt index f877403b..ad90839c 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service.stage.job -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * A [JobAdmissionPolicy] that admits all jobs. @@ -34,7 +34,7 @@ object NullJobAdmissionPolicy : JobAdmissionPolicy { * Admit every submitted job. */ override fun shouldAdmit( - scheduler: StageWorkflowSchedulerLogic, - job: StageWorkflowSchedulerLogic.JobView + scheduler: StageWorkflowService, + job: StageWorkflowService.JobView ): Boolean = true } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt index 30d5c456..9ce2811c 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/job/RandomJobSortingPolicy.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service.stage.job -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService import kotlin.random.Random /** @@ -34,7 +34,7 @@ import kotlin.random.Random */ class RandomJobSortingPolicy(private val random: Random = Random.Default) : JobSortingPolicy { override fun invoke( - scheduler: StageWorkflowSchedulerLogic, - jobs: Collection<StageWorkflowSchedulerLogic.JobView> - ): List<StageWorkflowSchedulerLogic.JobView> = jobs.shuffled(random) + scheduler: StageWorkflowService, + jobs: Collection<StageWorkflowService.JobView> + ): List<StageWorkflowService.JobView> = jobs.shuffled(random) } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt index c3307063..e2490214 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt @@ -24,17 +24,17 @@ package com.atlarge.opendc.workflows.service.stage.resource -import com.atlarge.opendc.core.services.resources.HostView -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * A [ResourceSelectionPolicy] that selects the first machine that is available. */ class FirstFitResourceSelectionPolicy : ResourceSelectionPolicy { override fun select( - scheduler: StageWorkflowSchedulerLogic, - machines: List<HostView>, - task: StageWorkflowSchedulerLogic.TaskView - ): HostView? = + scheduler: StageWorkflowService, + machines: List<Node>, + task: StageWorkflowService.TaskView + ): Node? = machines.firstOrNull() } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt index d742f842..a8f2fda9 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/FunctionalResourceDynamicFilterPolicy.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.workflows.service.stage.resource -import com.atlarge.opendc.core.services.resources.HostView -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * A [ResourceDynamicFilterPolicy] based on the amount of cores available on the machine and the cores required for @@ -33,11 +33,11 @@ import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic */ class FunctionalResourceDynamicFilterPolicy : ResourceDynamicFilterPolicy { override fun invoke( - scheduler: StageWorkflowSchedulerLogic, - machines: List<HostView>, - task: StageWorkflowSchedulerLogic.TaskView - ): List<HostView> { + scheduler: StageWorkflowService, + machines: List<Node>, + task: StageWorkflowService.TaskView + ): List<Node> { return machines - .filter { scheduler.machineCores[it] ?: 0 >= task.task.application.cores } + .filter { it in scheduler.available } } } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt index 8a3b5a1e..8d8ceec2 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceDynamicFilterPolicy.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.workflows.service.stage.resource -import com.atlarge.opendc.core.services.resources.HostView -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * This interface represents the **R4** stage of the Reference Architecture for Schedulers and acts as a filter yielding @@ -42,8 +42,8 @@ interface ResourceDynamicFilterPolicy { * @return The machines on which the task can be scheduled. */ operator fun invoke( - scheduler: StageWorkflowSchedulerLogic, - machines: List<HostView>, - task: StageWorkflowSchedulerLogic.TaskView - ): List<HostView> + scheduler: StageWorkflowService, + machines: List<Node>, + task: StageWorkflowService.TaskView + ): List<Node> } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt index 90b2873c..38fe5886 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.workflows.service.stage.resource -import com.atlarge.opendc.core.services.resources.HostView -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected @@ -41,8 +41,8 @@ interface ResourceSelectionPolicy { * @return The selected machine or `null` if no machine could be found. */ fun select( - scheduler: StageWorkflowSchedulerLogic, - machines: List<HostView>, - task: StageWorkflowSchedulerLogic.TaskView - ): HostView? + scheduler: StageWorkflowService, + machines: List<Node>, + task: StageWorkflowService.TaskView + ): Node? } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt index 48a1a50d..bba81d27 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FifoTaskSortingPolicy.kt @@ -24,14 +24,14 @@ package com.atlarge.opendc.workflows.service.stage.task -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * The [FifoTaskSortingPolicy] sorts tasks based on the order of arrival in the queue. */ class FifoTaskSortingPolicy : TaskSortingPolicy { override fun invoke( - scheduler: StageWorkflowSchedulerLogic, - tasks: Collection<StageWorkflowSchedulerLogic.TaskView> - ): List<StageWorkflowSchedulerLogic.TaskView> = tasks.toList() + scheduler: StageWorkflowService, + tasks: Collection<StageWorkflowService.TaskView> + ): List<StageWorkflowService.TaskView> = tasks.toList() } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt index 1672633e..72ecbee2 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/FunctionalTaskEligibilityPolicy.kt @@ -24,15 +24,15 @@ package com.atlarge.opendc.workflows.service.stage.task -import com.atlarge.opendc.core.resources.compute.scheduling.ProcessState -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.TaskState /** * A [TaskEligibilityPolicy] that marks tasks as eligible if they are tasks roots within the job. */ class FunctionalTaskEligibilityPolicy : TaskEligibilityPolicy { override fun isEligible( - scheduler: StageWorkflowSchedulerLogic, - task: StageWorkflowSchedulerLogic.TaskView - ): Boolean = task.state == ProcessState.READY + scheduler: StageWorkflowService, + task: StageWorkflowService.TaskView + ): Boolean = task.state == TaskState.READY } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt index 36ef3a50..1b1d5b44 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/RandomTaskSortingPolicy.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service.stage.task -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService import kotlin.random.Random /** @@ -34,7 +34,7 @@ import kotlin.random.Random */ class RandomTaskSortingPolicy(private val random: Random = Random.Default) : TaskSortingPolicy { override fun invoke( - scheduler: StageWorkflowSchedulerLogic, - tasks: Collection<StageWorkflowSchedulerLogic.TaskView> - ): List<StageWorkflowSchedulerLogic.TaskView> = tasks.shuffled(random) + scheduler: StageWorkflowService, + tasks: Collection<StageWorkflowService.TaskView> + ): List<StageWorkflowService.TaskView> = tasks.shuffled(random) } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt index 19f0240b..19954d7b 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service.stage.task -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * A policy interface for determining the eligibility of tasks in a scheduling cycle. @@ -35,14 +35,14 @@ interface TaskEligibilityPolicy { * * @param scheduler The scheduler that started the cycle. */ - fun startCycle(scheduler: StageWorkflowSchedulerLogic) {} + fun startCycle(scheduler: StageWorkflowService) {} /** - * Determine whether the specified [StageWorkflowSchedulerLogic.TaskView] is eligible to be scheduled. + * Determine whether the specified [StageWorkflowService.TaskView] is eligible to be scheduled. * * @param scheduler The scheduler that is determining whether the task is eligible. * @param task The task instance to schedule. * @return `true` if the task eligible to be scheduled, `false` otherwise. */ - fun isEligible(scheduler: StageWorkflowSchedulerLogic, task: StageWorkflowSchedulerLogic.TaskView): Boolean + fun isEligible(scheduler: StageWorkflowService, task: StageWorkflowService.TaskView): Boolean } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt index 6a65ed69..aabc44a9 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/service/stage/task/TaskSortingPolicy.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.workflows.service.stage.task -import com.atlarge.opendc.workflows.service.StageWorkflowSchedulerLogic +import com.atlarge.opendc.workflows.service.StageWorkflowService /** * This interface represents the **T2** stage of the Reference Architecture for Datacenter Schedulers and provides the @@ -39,7 +39,7 @@ interface TaskSortingPolicy { * @return The sorted list of tasks. */ operator fun invoke( - scheduler: StageWorkflowSchedulerLogic, - tasks: Collection<StageWorkflowSchedulerLogic.TaskView> - ): List<StageWorkflowSchedulerLogic.TaskView> + scheduler: StageWorkflowService, + tasks: Collection<StageWorkflowService.TaskView> + ): List<StageWorkflowService.TaskView> } diff --git a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt index 25fe7348..b5997b35 100644 --- a/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt +++ b/opendc/opendc-workflows/src/main/kotlin/com/atlarge/opendc/workflows/workload/Task.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.workflows.workload +import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.Identity -import com.atlarge.opendc.core.workload.application.Application import java.util.UUID /** @@ -33,13 +33,13 @@ import java.util.UUID * * @property uid A unique identified of this task. * @property name The name of this task. - * @property application The application to run as part of this workflow task. + * @property image The application image to run as part of this workflow task. * @property dependencies The dependencies of this task in order for it to execute. */ data class Task( override val uid: UUID, override val name: String, - val application: Application, + val image: Image, val dependencies: Set<Task> ) : Identity { override fun equals(other: Any?): Boolean = other is Task && uid == other.uid |
