diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-09 20:29:33 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-09 20:33:29 +0100 |
| commit | 40e5871e01858a55372bfcb51cf90069c080e751 (patch) | |
| tree | bc880252a935cc0b1558c50fe83f71d21b735d29 | |
| parent | 44ed0023ed783437c3c838780f73e28efe1cc4ca (diff) | |
workflow: Split workflow module in API and service module
51 files changed, 431 insertions, 299 deletions
diff --git a/simulator/README.md b/simulator/README.md index 61ef1d43..d02925d6 100644 --- a/simulator/README.md +++ b/simulator/README.md @@ -23,7 +23,7 @@ This component is responsible for modelling and simulation of datacenters and th - **[opendc-compute](opendc-compute)** The [Infrastructure as a Service](https://en.wikipedia.org/wiki/Infrastructure_as_a_Service) (IaaS) component of OpenDC for computing infrastructure (similar to [Amazon EC2](https://aws.amazon.com/ec2/) and [Google Compute Engine](https://cloud.google.com/compute)). -- **[opendc-workflows](opendc-workflows)** +- **[opendc-workflow](opendc-workflow)** Workflow orchestration service built on top of OpenDC. - **[opendc-format](opendc-format)** Collection of libraries for processing data formats related to (simulation of) cloud computing and datacenters. diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts index f85d9b19..02e77c7c 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-sc18/build.gradle.kts @@ -32,7 +32,7 @@ dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-harness")) implementation(project(":opendc-format")) - implementation(project(":opendc-workflows")) + implementation(project(":opendc-workflow:opendc-workflow-service")) implementation(project(":opendc-simulator:opendc-simulator-core")) implementation(project(":opendc-compute:opendc-compute-simulator")) } diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt index 2be05119..9e305b3d 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/UnderspecificationExperiment.kt @@ -35,13 +35,13 @@ import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer import org.opendc.trace.core.enable -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.WorkflowEvent -import org.opendc.workflows.service.WorkflowSchedulerMode -import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy -import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy -import org.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy -import org.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy +import org.opendc.workflow.service.WorkflowEvent +import org.opendc.workflow.service.WorkflowService +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy +import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy import java.io.File import java.io.FileInputStream import kotlin.math.max @@ -106,8 +106,8 @@ public class UnderspecificationExperiment : Experiment("underspecification") { hosts.forEach { compute.addHost(it) } - val scheduler = StageWorkflowService( - testScope, + val scheduler = WorkflowService( + testScope.coroutineContext, clock, tracer, compute.newClient(), diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt index dbd04b87..a8356888 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/WorkflowMetrics.kt @@ -24,7 +24,7 @@ package org.opendc.experiments.sc18 import org.opendc.trace.core.EventStream import org.opendc.trace.core.onEvent -import org.opendc.workflows.service.WorkflowEvent +import org.opendc.workflow.service.WorkflowEvent import java.util.* import kotlin.coroutines.resume import kotlin.coroutines.suspendCoroutine diff --git a/simulator/opendc-format/build.gradle.kts b/simulator/opendc-format/build.gradle.kts index 4c0f6dcd..385e556d 100644 --- a/simulator/opendc-format/build.gradle.kts +++ b/simulator/opendc-format/build.gradle.kts @@ -31,7 +31,7 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-compute:opendc-compute-api")) - api(project(":opendc-workflows")) + api(project(":opendc-workflow:opendc-workflow-api")) implementation(project(":opendc-simulator:opendc-simulator-compute")) implementation(project(":opendc-compute:opendc-compute-simulator")) api("com.fasterxml.jackson.module:jackson-module-kotlin:${versions["jackson-module-kotlin"]}") diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt index f510271b..e68afeb7 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt @@ -25,10 +25,10 @@ package org.opendc.format.trace.gwf import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.workflows.workload.Job -import org.opendc.workflows.workload.Task -import org.opendc.workflows.workload.WORKFLOW_TASK_CORES -import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE import java.io.BufferedReader import java.io.File import java.io.InputStream diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index 3d969eb7..feadf61f 100644 --- a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -28,10 +28,10 @@ import org.apache.parquet.avro.AvroParquetReader import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.workflows.workload.Job -import org.opendc.workflows.workload.Task -import org.opendc.workflows.workload.WORKFLOW_TASK_CORES -import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE import java.util.UUID import kotlin.math.min diff --git a/simulator/opendc-workflow/build.gradle.kts b/simulator/opendc-workflow/build.gradle.kts new file mode 100644 index 00000000..3cefa409 --- /dev/null +++ b/simulator/opendc-workflow/build.gradle.kts @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Workflow orchestration for OpenDC" diff --git a/simulator/opendc-workflow/opendc-workflow-api/build.gradle.kts b/simulator/opendc-workflow/opendc-workflow-api/build.gradle.kts new file mode 100644 index 00000000..d3e67bee --- /dev/null +++ b/simulator/opendc-workflow/opendc-workflow-api/build.gradle.kts @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Workflow orchestration service API for OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` +} + +dependencies { + api(platform(project(":opendc-platform"))) + api(project(":opendc-compute:opendc-compute-api")) + implementation(project(":opendc-utils")) + implementation("io.github.microutils:kotlin-logging") +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Job.kt b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt index 53116cb6..5e8b0b9e 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Job.kt +++ b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.workflows.workload +package org.opendc.workflow.api import java.util.* diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Metadata.kt index 4305aa57..db208998 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt +++ b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Metadata.kt @@ -22,7 +22,7 @@ * SOFTWARE. */ -package org.opendc.workflows.workload +package org.opendc.workflow.api /** * Meta-data key for the deadline of a task. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt index 4ccefef9..d91f9879 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Task.kt +++ b/simulator/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.workflows.workload +package org.opendc.workflow.api import java.util.* diff --git a/simulator/opendc-workflows/build.gradle.kts b/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts index 541d379e..12a54235 100644 --- a/simulator/opendc-workflows/build.gradle.kts +++ b/simulator/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,7 +20,7 @@ * SOFTWARE. */ -description = "Workflow service for OpenDC" +description = "Workflow orchestration service for OpenDC" /* Build configuration */ plugins { @@ -30,6 +30,7 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) + api(project(":opendc-workflow:opendc-workflow-api")) api(project(":opendc-compute:opendc-compute-api")) api(project(":opendc-trace:opendc-trace-core")) implementation(project(":opendc-utils")) diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt index bcf93562..bb2ad6c6 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowEvent.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,11 +20,11 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service import org.opendc.trace.core.Event -import org.opendc.workflows.workload.Job -import org.opendc.workflows.workload.Task +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task /** * An event emitted by the [WorkflowService]. diff --git a/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt new file mode 100644 index 00000000..2f83e376 --- /dev/null +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service + +import kotlinx.coroutines.flow.Flow +import org.opendc.compute.api.ComputeClient +import org.opendc.trace.core.EventTracer +import org.opendc.workflow.api.Job +import org.opendc.workflow.service.internal.WorkflowServiceImpl +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.JobOrderPolicy +import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * A service for cloud workflow management. + * + * The workflow scheduler is modelled after the Reference Architecture for Topology Scheduling by Andreadis et al. + */ +public interface WorkflowService : AutoCloseable { + /** + * The events emitted by the workflow scheduler. + */ + public val events: Flow<WorkflowEvent> + + /** + * Submit the specified [Job] to the workflow service for scheduling. + */ + public suspend fun submit(job: Job) + + /** + * Terminate the lifecycle of the workflow service, stopping all running workflows. + */ + public override fun close() + + public companion object { + /** + * Construct a new [WorkflowService] implementation. + * + * @param context The [CoroutineContext] to use in the service. + * @param clock The clock instance to use. + * @param tracer The event tracer to use. + * @param compute The compute client to use. + * @param mode The scheduling mode to use. + * @param jobAdmissionPolicy The job admission policy to use. + * @param jobOrderPolicy The job order policy to use. + * @param taskEligibilityPolicy The task eligibility policy to use. + * @param taskOrderPolicy The task order policy to use. + */ + public operator fun invoke( + context: CoroutineContext, + clock: Clock, + tracer: EventTracer, + compute: ComputeClient, + mode: WorkflowSchedulerMode, + jobAdmissionPolicy: JobAdmissionPolicy, + jobOrderPolicy: JobOrderPolicy, + taskEligibilityPolicy: TaskEligibilityPolicy, + taskOrderPolicy: TaskOrderPolicy + ): WorkflowService { + return WorkflowServiceImpl( + context, + clock, + tracer, + compute, + mode, + jobAdmissionPolicy, + jobOrderPolicy, + taskEligibilityPolicy, + taskOrderPolicy + ) + } + } +} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/JobState.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt index 89849f6a..1bb67169 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/JobState.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,9 +20,9 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service.internal -import org.opendc.workflows.workload.Job +import org.opendc.workflow.api.Job public class JobState(public val job: Job, public val submittedAt: Long) { /** diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskState.kt index ef9714c2..c3ce1492 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskState.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service.internal import org.opendc.compute.api.Server -import org.opendc.workflows.workload.Task +import org.opendc.workflow.api.Task public class TaskState(public val job: JobState, public val task: Task) { /** @@ -39,12 +39,12 @@ public class TaskState(public val job: JobState, public val task: Task) { /** * The dependencies of this task. */ - public val dependencies: HashSet<TaskState> = HashSet<TaskState>() + public val dependencies: HashSet<TaskState> = HashSet() /** * The dependents of this task. */ - public val dependents: HashSet<TaskState> = HashSet<TaskState>() + public val dependents: HashSet<TaskState> = HashSet() /** * A flag to indicate whether this workflow task instance is a workflow root. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskStatus.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt index 99f5bb87..fe941d09 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskStatus.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service.internal /** * The state of a workflow task. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerListener.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt index 18721889..29c6aeea 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerListener.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,11 +20,11 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service.internal -public interface StageWorkflowSchedulerListener { - public fun cycleStarted(scheduler: StageWorkflowService) {} - public fun cycleFinished(scheduler: StageWorkflowService) {} +public interface WorkflowSchedulerListener { + public fun cycleStarted(scheduler: WorkflowServiceImpl) {} + public fun cycleFinished(scheduler: WorkflowServiceImpl) {} public fun jobSubmitted(job: JobState) {} public fun jobStarted(job: JobState) {} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index c5c4bf97..85a88acd 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,10 +20,11 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service.internal import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch @@ -32,21 +33,24 @@ import org.opendc.compute.api.* import org.opendc.trace.core.EventTracer import org.opendc.trace.core.consumeAsFlow import org.opendc.trace.core.enable -import org.opendc.workflows.service.stage.job.JobAdmissionPolicy -import org.opendc.workflows.service.stage.job.JobOrderPolicy -import org.opendc.workflows.service.stage.task.TaskEligibilityPolicy -import org.opendc.workflows.service.stage.task.TaskOrderPolicy -import org.opendc.workflows.workload.Job -import org.opendc.workflows.workload.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.service.* +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.JobOrderPolicy +import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy import java.time.Clock import java.util.* +import kotlin.coroutines.CoroutineContext /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for * Datacenter Scheduling. */ -public class StageWorkflowService( - internal val coroutineScope: CoroutineScope, +public class WorkflowServiceImpl( + context: CoroutineContext, internal val clock: Clock, internal val tracer: EventTracer, private val computeClient: ComputeClient, @@ -57,6 +61,11 @@ public class StageWorkflowService( taskOrderPolicy: TaskOrderPolicy ) : WorkflowService, ServerWatcher { /** + * The [CoroutineScope] of the service bounded by the lifecycle of the service. + */ + internal val scope = CoroutineScope(context) + + /** * The logger instance to use. */ private val logger = KotlinLogging.logger {} @@ -99,17 +108,17 @@ public class StageWorkflowService( /** * The root listener of this scheduler. */ - private val rootListener = object : StageWorkflowSchedulerListener { + private val rootListener = object : WorkflowSchedulerListener { /** * The listeners to delegate to. */ - val listeners = mutableSetOf<StageWorkflowSchedulerListener>() + val listeners = mutableSetOf<WorkflowSchedulerListener>() - override fun cycleStarted(scheduler: StageWorkflowService) { + override fun cycleStarted(scheduler: WorkflowServiceImpl) { listeners.forEach { it.cycleStarted(scheduler) } } - override fun cycleFinished(scheduler: StageWorkflowService) { + override fun cycleFinished(scheduler: WorkflowServiceImpl) { listeners.forEach { it.cycleFinished(scheduler) } } @@ -153,7 +162,7 @@ public class StageWorkflowService( this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid }) this.taskEligibilityPolicy = taskEligibilityPolicy(this) this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid }) - coroutineScope.launch { + scope.launch { image = computeClient.newImage("workflow-runner") } } @@ -194,6 +203,10 @@ public class StageWorkflowService( requestCycle() } + override fun close() { + scope.cancel() + } + /** * Indicate to the scheduler that a scheduling cycle is needed. */ @@ -218,7 +231,12 @@ public class StageWorkflowService( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - tracer.commit(WorkflowEvent.JobStarted(this, jobInstance.job)) + tracer.commit( + WorkflowEvent.JobStarted( + this, + jobInstance.job + ) + ) rootListener.jobStarted(jobInstance) } @@ -263,15 +281,25 @@ public class StageWorkflowService( val cores = instance.task.metadata[WORKFLOW_TASK_CORES] as? Int ?: 1 val image = image - coroutineScope.launch { - val flavor = computeClient.newFlavor(instance.task.name, cores, 1000) // TODO How to determine memory usage for workflow task - val server = computeClient.newServer(instance.task.name, image, flavor, start = false, meta = instance.task.metadata) + scope.launch { + val flavor = computeClient.newFlavor( + instance.task.name, + cores, + 1000 + ) // TODO How to determine memory usage for workflow task + val server = computeClient.newServer( + instance.task.name, + image, + flavor, + start = false, + meta = instance.task.metadata + ) instance.state = TaskStatus.ACTIVE instance.server = server taskByServer[server] = instance - server.watch(this@StageWorkflowService) + server.watch(this@WorkflowServiceImpl) server.start() } @@ -283,13 +311,14 @@ public class StageWorkflowService( public override fun onStateChanged(server: Server, newState: ServerState) { when (newState) { - ServerState.PROVISIONING -> {} + ServerState.PROVISIONING -> { + } ServerState.RUNNING -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() tracer.commit( WorkflowEvent.TaskStarted( - this@StageWorkflowService, + this@WorkflowServiceImpl, task.job.job, task.task ) @@ -299,7 +328,7 @@ public class StageWorkflowService( ServerState.TERMINATED, ServerState.ERROR -> { val task = taskByServer.remove(server) ?: throw IllegalStateException() - coroutineScope.launch { + scope.launch { server.delete() server.flavor.delete() } @@ -311,7 +340,7 @@ public class StageWorkflowService( activeTasks -= task tracer.commit( WorkflowEvent.TaskFinished( - this@StageWorkflowService, + this@WorkflowServiceImpl, task.job.job, task.task ) @@ -334,7 +363,8 @@ public class StageWorkflowService( requestCycle() } - ServerState.DELETED -> {} + ServerState.DELETED -> { + } else -> throw IllegalStateException() } } @@ -345,11 +375,11 @@ public class StageWorkflowService( rootListener.jobFinished(job) } - public fun addListener(listener: StageWorkflowSchedulerListener) { + public fun addListener(listener: WorkflowSchedulerListener) { rootListener.listeners += listener } - public fun removeListener(listener: StageWorkflowSchedulerListener) { + public fun removeListener(listener: WorkflowSchedulerListener) { rootListener.listeners -= listener } } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/StagePolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/StagePolicy.kt index d76579f9..359fc223 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/StagePolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/StagePolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,9 +20,9 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage +package org.opendc.workflow.service.scheduler -import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflow.service.internal.WorkflowServiceImpl import java.io.Serializable /** @@ -32,5 +32,5 @@ public interface StagePolicy<T : Any> : Serializable { /** * Build the logic of the stage policy. */ - public operator fun invoke(scheduler: StageWorkflowService): T + public operator fun invoke(scheduler: WorkflowServiceImpl): T } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt index cf8f92e0..58e7893f 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowSchedulerMode.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,11 +20,11 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service.scheduler import kotlinx.coroutines.delay import kotlinx.coroutines.launch -import org.opendc.workflows.service.stage.StagePolicy +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * The operating mode of a workflow scheduler. @@ -44,9 +44,9 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo * An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received. */ public object Interactive : WorkflowSchedulerMode() { - override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic { override fun requestCycle() { - scheduler.coroutineScope.launch { scheduler.schedule() } + scheduler.scope.launch { scheduler.schedule() } } } @@ -59,14 +59,14 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo public data class Batch(val quantum: Long) : WorkflowSchedulerMode() { private var next: kotlinx.coroutines.Job? = null - override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic { override fun requestCycle() { if (next == null) { // In batch mode, we assume that the scheduler runs at a fixed slot every time // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. val delay = quantum - (scheduler.clock.millis() % quantum) - val job = scheduler.coroutineScope.launch { + val job = scheduler.scope.launch { delay(delay) next = null scheduler.schedule() @@ -85,12 +85,12 @@ public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Lo public data class Random(private val random: java.util.Random = java.util.Random(123)) : WorkflowSchedulerMode() { private var next: kotlinx.coroutines.Job? = null - override fun invoke(scheduler: StageWorkflowService): Logic = object : Logic { + override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic { override fun requestCycle() { if (next == null) { val delay = random.nextInt(200).toLong() - val job = scheduler.coroutineScope.launch { + val job = scheduler.scope.launch { delay(delay) next = null scheduler.schedule() diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt index 1190a408..1b5b91b9 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/DurationJobOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,21 +20,23 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.workload.Job -import org.opendc.workflows.workload.Task -import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [JobOrderPolicy] that orders jobs based on its critical path length. */ public data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> = - object : Comparator<JobState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> = + object : + Comparator<JobState>, + WorkflowSchedulerListener { private val results = HashMap<Job, Long>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt index 0e5a42c0..ed3acff7 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobAdmissionPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.stage.StagePolicy +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.scheduler.StagePolicy /** * A policy interface for admitting [JobState]s to a scheduling cycle. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobOrderPolicy.kt index 83d42b2d..adaa6671 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/JobOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.stage.StagePolicy +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.scheduler.StagePolicy /** * A policy interface for ordering admitted workflows in the scheduling queue. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt index 6f6ccb50..6a0bfeb9 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/LimitJobAdmissionPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [JobAdmissionPolicy] that limits the amount of active jobs in the system. @@ -31,7 +31,7 @@ import org.opendc.workflows.service.StageWorkflowService * @property limit The maximum number of concurrent jobs in the system. */ public data class LimitJobAdmissionPolicy(public val limit: Int) : JobAdmissionPolicy { - override fun invoke(scheduler: StageWorkflowService): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic { + override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic { override fun invoke( job: JobState ): JobAdmissionPolicy.Advice = diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt index ac74f090..31f8f8db 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/NullJobAdmissionPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [JobAdmissionPolicy] that admits all jobs. */ public object NullJobAdmissionPolicy : JobAdmissionPolicy { - override fun invoke(scheduler: StageWorkflowService): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic { + override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic { override fun invoke(job: JobState): JobAdmissionPolicy.Advice = JobAdmissionPolicy.Advice.ADMIT } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt index 6c747261..1b359125 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/RandomJobOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,23 +20,23 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.workload.Job +import org.opendc.workflow.api.Job +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl import java.util.* import kotlin.collections.HashMap -import kotlin.collections.getValue -import kotlin.collections.set /** * A [JobOrderPolicy] that randomly orders jobs. */ public object RandomJobOrderPolicy : JobOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> = - object : Comparator<JobState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> = + object : + Comparator<JobState>, + WorkflowSchedulerListener { private val random = Random(123) private val ids = HashMap<Job, Int>() diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SizeJobOrderPolicy.kt index c1c244c3..6998606d 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SizeJobOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SizeJobOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [SizeJobOrderPolicy] that orders jobs based on the number of tasks it has. */ public data class SizeJobOrderPolicy(public val ascending: Boolean = true) : JobOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> = + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> = compareBy { it.tasks.size.let { if (ascending) it else -it } } override fun toString(): String { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SubmissionTimeJobOrderPolicy.kt index 005f8153..53d06023 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/job/SubmissionTimeJobOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/SubmissionTimeJobOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.job +package org.opendc.workflow.service.scheduler.job -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowService +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [JobOrderPolicy] orders jobs in FIFO order. */ public data class SubmissionTimeJobOrderPolicy(public val ascending: Boolean = true) : JobOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<JobState> = + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<JobState> = compareBy { it.submittedAt.let { if (ascending) it else -it } } override fun toString(): String { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt index 6a465746..821d4964 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/ActiveTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,19 +20,19 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskOrderPolicy] that orders tasks based on the number of active relative tasks (w.r.t. its job) in the system. */ public data class ActiveTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = - object : Comparator<TaskState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { private val active = mutableMapOf<JobState, Int>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt index f3f19ef5..42804f5a 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/BalancingTaskEligibilityPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,12 +20,12 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl import kotlin.math.max /** @@ -36,8 +36,8 @@ import kotlin.math.max * the average. */ public data class BalancingTaskEligibilityPolicy(public val tolerance: Double = 1.5) : TaskEligibilityPolicy { - override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = - object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = + object : TaskEligibilityPolicy.Logic, WorkflowSchedulerListener { private val active = mutableMapOf<JobState, Int>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt index 0020023f..dae7ad99 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/CompletionTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,19 +20,19 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskOrderPolicy] that orders tasks based on the number of completed relative tasks. */ public data class CompletionTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = - object : Comparator<TaskState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { private val finished = mutableMapOf<JobState, Int>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt index a9f5eb84..7786f6ec 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependenciesTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskOrderPolicy] that orders tasks based on the number of dependency tasks it has. */ public data class DependenciesTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = compareBy<TaskState> { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy { it.task.dependencies.size.let { if (ascending) it else -it } } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt index e5a9f159..4fb835d7 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DependentsTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskOrderPolicy] that orders tasks based on the number of dependent tasks it has. */ public data class DependentsTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = compareBy<TaskState> { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy { it.dependents.size.let { if (ascending) it else -it } } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt index 7ce8ccce..3a634de7 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationHistoryTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,19 +20,19 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskOrderPolicy] that orders tasks based on the average duration of the preceding tasks in the job. */ public data class DurationHistoryTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = - object : Comparator<TaskState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { private val results = HashMap<JobState, MutableList<Long>>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt index 3674eb01..d9fde53a 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/DurationTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,15 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState -import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl import java.util.* import kotlin.collections.HashMap import kotlin.collections.getValue -import kotlin.collections.minusAssign import kotlin.collections.set /** @@ -37,8 +36,8 @@ import kotlin.collections.set */ public data class DurationTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = - object : Comparator<TaskState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { private val results = HashMap<UUID, Long>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt index 2dddbc7c..229460df 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitPerJobTaskEligibilityPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,19 +20,19 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.JobState -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.JobState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskEligibilityPolicy] that limits the number of active tasks of a job in the system. */ public data class LimitPerJobTaskEligibilityPolicy(public val limit: Int) : TaskEligibilityPolicy { - override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = - object : TaskEligibilityPolicy.Logic, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = + object : TaskEligibilityPolicy.Logic, WorkflowSchedulerListener { private val active = mutableMapOf<JobState, Int>() init { diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt index fdc1fd5e..57aa0d58 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/LimitTaskEligibilityPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskEligibilityPolicy] that limits the total number of active tasks in the system. */ public data class LimitTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy { - override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic { + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic { override fun invoke( task: TaskState ): TaskEligibilityPolicy.Advice = diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt index b40f9823..cfe2aeed 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/NullTaskEligibilityPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskEligibilityPolicy] that always allows new tasks to enter. */ public object NullTaskEligibilityPolicy : TaskEligibilityPolicy { - override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = Logic + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = Logic private object Logic : TaskEligibilityPolicy.Logic { override fun invoke( diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt index a0691b23..a01439c2 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskEligibilityPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,17 +20,17 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl import java.util.* /** * A [TaskEligibilityPolicy] that randomly accepts tasks in the system with some [probability]. */ public data class RandomTaskEligibilityPolicy(val probability: Double = 0.5) : TaskEligibilityPolicy { - override fun invoke(scheduler: StageWorkflowService): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic { + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic { val random = Random(123) override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice = diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt index 890e7165..c12d6a66 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/RandomTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,20 +20,20 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowSchedulerListener -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState -import org.opendc.workflows.workload.Task +import org.opendc.workflow.api.Task +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowSchedulerListener +import org.opendc.workflow.service.internal.WorkflowServiceImpl import kotlin.random.Random /** * A [TaskOrderPolicy] that orders the tasks randomly. */ public object RandomTaskOrderPolicy : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = - object : Comparator<TaskState>, StageWorkflowSchedulerListener { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + object : Comparator<TaskState>, WorkflowSchedulerListener { private val random = Random(123) private val ids = HashMap<Task, Int>() diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt index 6b0199b8..e9bbf815 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/SubmissionTimeTaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,16 +20,16 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.internal.WorkflowServiceImpl /** * A [TaskOrderPolicy] that orders tasks based on the order of arrival in the queue. */ public data class SubmissionTimeTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<TaskState> = compareBy<TaskState> { + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy { it.job.submittedAt.let { if (ascending) it else -it } } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt index 37597709..ee31aee2 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskEligibilityPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2019 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.TaskState -import org.opendc.workflows.service.stage.StagePolicy +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.scheduler.StagePolicy /** * A policy interface for determining the eligibility of tasks in a scheduling cycle. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskOrderPolicy.kt b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskOrderPolicy.kt index 5feac6d0..fffcb765 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/task/TaskOrderPolicy.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskOrderPolicy.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,10 +20,10 @@ * SOFTWARE. */ -package org.opendc.workflows.service.stage.task +package org.opendc.workflow.service.scheduler.task -import org.opendc.workflows.service.TaskState -import org.opendc.workflows.service.stage.StagePolicy +import org.opendc.workflow.service.internal.TaskState +import org.opendc.workflow.service.scheduler.StagePolicy /** * This interface represents the **T2** stage of the Reference Architecture for Topology Schedulers and provides the diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt index eb79162c..2161f5f2 100644 --- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/StageWorkflowSchedulerIntegrationTest.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.workflows.service +package org.opendc.workflow.service import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay @@ -43,16 +41,18 @@ import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.trace.core.EventTracer -import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy -import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy -import org.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy -import org.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy +import org.opendc.workflow.service.internal.WorkflowServiceImpl +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy +import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy import kotlin.math.max /** - * Integration test suite for the [StageWorkflowService]. + * Integration test suite for the [WorkflowServiceImpl]. */ -@DisplayName("StageWorkflowService") +@DisplayName("WorkflowServiceImpl") @OptIn(ExperimentalCoroutinesApi::class) internal class StageWorkflowSchedulerIntegrationTest { /** @@ -89,8 +89,8 @@ internal class StageWorkflowSchedulerIntegrationTest { hosts.forEach { compute.addHost(it) } - StageWorkflowService( - testScope, + WorkflowService( + testScope.coroutineContext, clock, tracer, compute.newClient(), diff --git a/simulator/opendc-workflows/src/test/resources/environment.json b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/environment.json index 0965b250..0965b250 100644 --- a/simulator/opendc-workflows/src/test/resources/environment.json +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/environment.json diff --git a/simulator/opendc-workflows/src/test/resources/log4j2.xml b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml index 70a0eacc..70a0eacc 100644 --- a/simulator/opendc-workflows/src/test/resources/log4j2.xml +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/log4j2.xml diff --git a/simulator/opendc-workflows/src/test/resources/trace.gwf b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/trace.gwf index d264b9c3..d264b9c3 100644 --- a/simulator/opendc-workflows/src/test/resources/trace.gwf +++ b/simulator/opendc-workflow/opendc-workflow-service/src/test/resources/trace.gwf diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt deleted file mode 100644 index c43c72f5..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service - -import kotlinx.coroutines.flow.Flow -import org.opendc.workflows.workload.Job -import java.util.* - -/** - * A service for cloud workflow management. - * - * The workflow scheduler is modelled after the Reference Architecture for Topology Scheduling by Andreadis et al. - */ -public interface WorkflowService { - /** - * The events emitted by the workflow scheduler. - */ - public val events: Flow<WorkflowEvent> - - /** - * Submit the specified [Job] to the workflow service for scheduling. - */ - public suspend fun submit(job: Job) -} diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts index 66a55c04..e87dd4d8 100644 --- a/simulator/settings.gradle.kts +++ b/simulator/settings.gradle.kts @@ -25,7 +25,8 @@ include(":opendc-platform") include(":opendc-compute:opendc-compute-api") include(":opendc-compute:opendc-compute-service") include(":opendc-compute:opendc-compute-simulator") -include(":opendc-workflows") +include(":opendc-workflow:opendc-workflow-api") +include(":opendc-workflow:opendc-workflow-service") include(":opendc-format") include(":opendc-experiments:opendc-experiments-sc18") include(":opendc-experiments:opendc-experiments-capelin") |
