diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2024-03-05 13:23:57 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-03-05 13:23:57 +0100 |
| commit | 5864cbcbfe2eb8c36ca05c3a39c7e5916aeecaec (patch) | |
| tree | 5b2773b8dc21c2e1b526fb70f829c376dd80532a /opendc-workflow | |
| parent | d28002a3c151d198298574312f32f1cb43f3a660 (diff) | |
Updated package versions, updated web server tests. (#207)
* Updated all package versions including kotlin. Updated all web-server tests to run.
* Changed the java version of the tests. OpenDC now only supports java 19.
* small update
* test update
* new update
* updated docker version to 19
* updated docker version to 19
Diffstat (limited to 'opendc-workflow')
26 files changed, 231 insertions, 193 deletions
diff --git a/opendc-workflow/opendc-workflow-api/build.gradle.kts b/opendc-workflow/opendc-workflow-api/build.gradle.kts index 03569d8c..ac94082b 100644 --- a/opendc-workflow/opendc-workflow-api/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-api/build.gradle.kts @@ -22,7 +22,7 @@ description = "Workflow orchestration service API for OpenDC" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } diff --git a/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt b/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt index b59ad6da..92df6be6 100644 --- a/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt +++ b/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt @@ -36,7 +36,7 @@ public data class Job( val uid: UUID, val name: String, val tasks: Set<Task>, - val metadata: Map<String, Any> = emptyMap() + val metadata: Map<String, Any> = emptyMap(), ) { override fun equals(other: Any?): Boolean = other is Job && uid == other.uid diff --git a/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt b/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt index f805c210..a14cfd11 100644 --- a/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt +++ b/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt @@ -36,7 +36,7 @@ public data class Task( val uid: UUID, val name: String, val dependencies: Set<Task>, - val metadata: Map<String, Any> = emptyMap() + val metadata: Map<String, Any> = emptyMap(), ) { override fun equals(other: Any?): Boolean = other is Task && uid == other.uid diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index e9e31656..fdfbf82a 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -22,7 +22,7 @@ description = "Workflow orchestration service for OpenDC" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index 07b43b6d..4efc7953 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -73,7 +73,7 @@ public interface WorkflowService : AutoCloseable { jobAdmissionPolicy: JobAdmissionPolicy, jobOrderPolicy: JobOrderPolicy, taskEligibilityPolicy: TaskEligibilityPolicy, - taskOrderPolicy: TaskOrderPolicy + taskOrderPolicy: TaskOrderPolicy, ): WorkflowService { return WorkflowServiceImpl( dispatcher, @@ -82,7 +82,7 @@ public interface WorkflowService : AutoCloseable { jobAdmissionPolicy, jobOrderPolicy, taskEligibilityPolicy, - taskOrderPolicy + taskOrderPolicy, ) } } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt index fe941d09..e5475ee6 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt @@ -29,5 +29,5 @@ public enum class TaskStatus { CREATED, READY, ACTIVE, - FINISHED + FINISHED, } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index d54584b3..93a55c3d 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -59,7 +59,7 @@ public class WorkflowServiceImpl( jobAdmissionPolicy: JobAdmissionPolicy, jobOrderPolicy: JobOrderPolicy, taskEligibilityPolicy: TaskEligibilityPolicy, - taskOrderPolicy: TaskOrderPolicy + taskOrderPolicy: TaskOrderPolicy, ) : WorkflowService, ServerWatcher { /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. @@ -109,47 +109,48 @@ public class WorkflowServiceImpl( /** * The root listener of this scheduler. */ - private val rootListener = object : WorkflowSchedulerListener { - /** - * The listeners to delegate to. - */ - val listeners = mutableListOf<WorkflowSchedulerListener>() - - override fun jobSubmitted(job: JobState) { - listeners.forEach { it.jobSubmitted(job) } - } + private val rootListener = + object : WorkflowSchedulerListener { + /** + * The listeners to delegate to. + */ + val listeners = mutableListOf<WorkflowSchedulerListener>() + + override fun jobSubmitted(job: JobState) { + listeners.forEach { it.jobSubmitted(job) } + } - override fun jobStarted(job: JobState) { - listeners.forEach { it.jobStarted(job) } - } + override fun jobStarted(job: JobState) { + listeners.forEach { it.jobStarted(job) } + } - override fun jobFinished(job: JobState) { - listeners.forEach { it.jobFinished(job) } - } + override fun jobFinished(job: JobState) { + listeners.forEach { it.jobFinished(job) } + } - override fun taskReady(task: TaskState) { - listeners.forEach { it.taskReady(task) } - } + override fun taskReady(task: TaskState) { + listeners.forEach { it.taskReady(task) } + } - override fun taskAssigned(task: TaskState) { - listeners.forEach { it.taskAssigned(task) } - } + override fun taskAssigned(task: TaskState) { + listeners.forEach { it.taskAssigned(task) } + } - override fun taskStarted(task: TaskState) { - listeners.forEach { it.taskStarted(task) } - } + override fun taskStarted(task: TaskState) { + listeners.forEach { it.taskStarted(task) } + } - override fun taskFinished(task: TaskState) { - listeners.forEach { it.taskFinished(task) } + override fun taskFinished(task: TaskState) { + listeners.forEach { it.taskFinished(task) } + } } - } - private var _workflowsSubmitted: Int = 0 - private var _workflowsRunning: Int = 0 - private var _workflowsFinished: Int = 0 - private var _tasksSubmitted: Int = 0 - private var _tasksRunning: Int = 0 - private var _tasksFinished: Int = 0 + private var localWorkflowsSubmitted: Int = 0 + private var localWorkflowsRunning: Int = 0 + private var localWorkflowsFinished: Int = 0 + private var localTasksSubmitted: Int = 0 + private var localTasksRunning: Int = 0 + private var localTasksFinished: Int = 0 /** * The [Pacer] to use for scheduling the scheduler cycles. @@ -170,37 +171,46 @@ public class WorkflowServiceImpl( scope.launch { image = computeClient.newImage("workflow-runner") } } - override suspend fun invoke(job: Job): Unit = suspendCancellableCoroutine { cont -> - // J1 Incoming Jobs - val jobInstance = JobState(job, clock.millis(), cont) - val instances = job.tasks.associateWith { - TaskState(jobInstance, it) - } + override suspend fun invoke(job: Job): Unit = + suspendCancellableCoroutine { cont -> + // J1 Incoming Jobs + val jobInstance = JobState(job, clock.millis(), cont) + val instances = + job.tasks.associateWith { + TaskState(jobInstance, it) + } - for ((task, instance) in instances) { - instance.dependencies.addAll(task.dependencies.map { instances[it]!! }) - task.dependencies.forEach { - instances[it]!!.dependents.add(instance) - } + for ((task, instance) in instances) { + instance.dependencies.addAll(task.dependencies.map { instances[it]!! }) + task.dependencies.forEach { + instances[it]!!.dependents.add(instance) + } - // If the task has no dependency, it is a root task and can immediately be evaluated - if (instance.isRoot) { - instance.state = TaskStatus.READY - } + // If the task has no dependency, it is a root task and can immediately be evaluated + if (instance.isRoot) { + instance.state = TaskStatus.READY + } - _tasksSubmitted++ - } + localTasksSubmitted++ + } - instances.values.toCollection(jobInstance.tasks) - incomingJobs += jobInstance - rootListener.jobSubmitted(jobInstance) - _workflowsSubmitted++ + instances.values.toCollection(jobInstance.tasks) + incomingJobs += jobInstance + rootListener.jobSubmitted(jobInstance) + localWorkflowsSubmitted++ - pacer.enqueue() - } + pacer.enqueue() + } override fun getSchedulerStats(): SchedulerStats { - return SchedulerStats(_workflowsSubmitted, _workflowsRunning, _workflowsFinished, _tasksSubmitted, _tasksRunning, _tasksFinished) + return SchedulerStats( + localWorkflowsSubmitted, + localWorkflowsRunning, + localWorkflowsFinished, + localTasksSubmitted, + localTasksRunning, + localTasksFinished, + ) } override fun close() { @@ -240,7 +250,7 @@ public class WorkflowServiceImpl( jobQueue.add(jobInstance) activeJobs += jobInstance - _workflowsRunning++ + localWorkflowsRunning++ rootListener.jobStarted(jobInstance) } @@ -286,18 +296,20 @@ public class WorkflowServiceImpl( val cores = instance.task.metadata[WORKFLOW_TASK_CORES] as? Int ?: 1 val image = image scope.launch { - val flavor = computeClient.newFlavor( - instance.task.name, - cores, - 1000 - ) // TODO How to determine memory usage for workflow task - val server = computeClient.newServer( - instance.task.name, - image, - flavor, - start = false, - meta = instance.task.metadata - ) + val flavor = + computeClient.newFlavor( + instance.task.name, + cores, + 1000, + ) // TODO How to determine memory usage for workflow task + val server = + computeClient.newServer( + instance.task.name, + image, + flavor, + start = false, + meta = instance.task.metadata, + ) instance.state = TaskStatus.ACTIVE instance.server = server @@ -313,13 +325,16 @@ public class WorkflowServiceImpl( } } - override fun onStateChanged(server: Server, newState: ServerState) { + override fun onStateChanged( + server: Server, + newState: ServerState, + ) { when (newState) { ServerState.PROVISIONING -> {} ServerState.RUNNING -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() - _tasksRunning++ + localTasksRunning++ rootListener.taskStarted(task) } ServerState.TERMINATED, ServerState.ERROR -> { @@ -336,8 +351,8 @@ public class WorkflowServiceImpl( job.tasks.remove(task) activeTasks -= task - _tasksRunning-- - _tasksFinished++ + localTasksRunning-- + localTasksFinished++ rootListener.taskFinished(task) // Add job roots to the scheduling queue @@ -363,8 +378,8 @@ public class WorkflowServiceImpl( private fun finishJob(job: JobState) { activeJobs -= job - _workflowsRunning-- - _workflowsFinished++ + localWorkflowsRunning-- + localWorkflowsFinished++ rootListener.jobFinished(job) job.cont.resume(Unit) diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt index 1b5b91b9..7ae3244e 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt @@ -47,17 +47,21 @@ public data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrd get() = results[this]!! override fun jobSubmitted(job: JobState) { - results[job.job] = job.job.toposort().map { task -> - val estimable = task.metadata[WORKFLOW_TASK_DEADLINE] as? Long? - estimable ?: Long.MAX_VALUE - }.sum() + results[job.job] = + job.job.toposort().map { task -> + val estimable = task.metadata[WORKFLOW_TASK_DEADLINE] as? Long? + estimable ?: Long.MAX_VALUE + }.sum() } override fun jobFinished(job: JobState) { results.remove(job.job) } - override fun compare(o1: JobState, o2: JobState): Int { + override fun compare( + o1: JobState, + o2: JobState, + ): Int { return compareValuesBy(o1, o2) { it.job.duration }.let { if (ascending) it else -it } } } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt index ed3acff7..475c8f97 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt @@ -65,6 +65,6 @@ public interface JobAdmissionPolicy : StagePolicy<JobAdmissionPolicy.Logic> { /** * Deny the current job and also stop admitting jobs. */ - STOP(false, true) + STOP(false, true), } } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt index bd416546..f1b81259 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt @@ -31,16 +31,15 @@ import org.opendc.workflow.service.internal.WorkflowServiceImpl * @property limit The maximum number of concurrent jobs in the system. */ public data class LimitJobAdmissionPolicy(public val limit: Int) : JobAdmissionPolicy { - override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic { - override fun invoke( - job: JobState - ): JobAdmissionPolicy.Advice = - if (scheduler.activeJobs.size < limit) { - JobAdmissionPolicy.Advice.ADMIT - } else { - JobAdmissionPolicy.Advice.STOP - } - } + override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic = + object : JobAdmissionPolicy.Logic { + override fun invoke(job: JobState): JobAdmissionPolicy.Advice = + if (scheduler.activeJobs.size < limit) { + JobAdmissionPolicy.Advice.ADMIT + } else { + JobAdmissionPolicy.Advice.STOP + } + } override fun toString(): String = "Limit-Active($limit)" } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt index 31f8f8db..731a0047 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt @@ -29,9 +29,10 @@ import org.opendc.workflow.service.internal.WorkflowServiceImpl * A [JobAdmissionPolicy] that admits all jobs. */ public object NullJobAdmissionPolicy : JobAdmissionPolicy { - override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic { - override fun invoke(job: JobState): JobAdmissionPolicy.Advice = JobAdmissionPolicy.Advice.ADMIT - } + override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic = + object : JobAdmissionPolicy.Logic { + override fun invoke(job: JobState): JobAdmissionPolicy.Advice = JobAdmissionPolicy.Advice.ADMIT + } override fun toString(): String = "Always" } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt index 4f8dc05b..ea005e97 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt @@ -52,7 +52,10 @@ public object RandomJobOrderPolicy : JobOrderPolicy { ids.remove(job.job) } - override fun compare(o1: JobState, o2: JobState): Int { + override fun compare( + o1: JobState, + o2: JobState, + ): Int { return compareValuesBy(o1, o2) { ids.getValue(it.job) } } } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt index 821d4964..48fdafe9 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt @@ -55,7 +55,10 @@ public data class ActiveTaskOrderPolicy(public val ascending: Boolean = true) : active.merge(task.job, -1, Int::plus) } - override fun compare(o1: TaskState, o2: TaskState): Int { + override fun compare( + o1: TaskState, + o2: TaskState, + ): Int { return compareValuesBy(o1, o2) { active.getValue(it.job) }.let { if (ascending) it else -it } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt index dae7ad99..104de105 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt @@ -51,7 +51,10 @@ public data class CompletionTaskOrderPolicy(public val ascending: Boolean = true finished.merge(task.job, 1, Int::plus) } - override fun compare(o1: TaskState, o2: TaskState): Int { + override fun compare( + o1: TaskState, + o2: TaskState, + ): Int { return compareValuesBy(o1, o2) { finished.getValue(it.job) / it.job.tasks.size.toDouble() }.let { if (ascending) it else -it } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt index 7786f6ec..df9ebfad 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt @@ -29,9 +29,10 @@ import org.opendc.workflow.service.internal.WorkflowServiceImpl * A [TaskOrderPolicy] that orders tasks based on the number of dependency tasks it has. */ public data class DependenciesTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy { - it.task.dependencies.size.let { if (ascending) it else -it } - } + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + compareBy { + it.task.dependencies.size.let { if (ascending) it else -it } + } override fun toString(): String { return "Task-Dependencies(${if (ascending) "asc" else "desc"})" diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt index 4fb835d7..bbc20348 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt @@ -29,9 +29,10 @@ import org.opendc.workflow.service.internal.WorkflowServiceImpl * A [TaskOrderPolicy] that orders tasks based on the number of dependent tasks it has. */ public data class DependentsTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy { - it.dependents.size.let { if (ascending) it else -it } - } + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + compareBy { + it.dependents.size.let { if (ascending) it else -it } + } override fun toString(): String { return "Task-Dependents(${if (ascending) "asc" else "desc"})" diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt index 3a634de7..e21acb41 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt @@ -51,7 +51,10 @@ public data class DurationHistoryTaskOrderPolicy(public val ascending: Boolean = results.getValue(task.job) += task.finishedAt - task.startedAt } - override fun compare(o1: TaskState, o2: TaskState): Int { + override fun compare( + o1: TaskState, + o2: TaskState, + ): Int { return compareValuesBy(o1, o2) { key -> val history = results.getValue(key.job) if (history.isEmpty()) { diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt index a2ca2086..170f3394 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt @@ -35,7 +35,6 @@ import kotlin.collections.set * A [TaskOrderPolicy] orders tasks based on the pre-specified (approximate) duration of the task. */ public data class DurationTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = object : Comparator<TaskState>, WorkflowSchedulerListener { private val results = HashMap<UUID, Long>() @@ -56,7 +55,10 @@ public data class DurationTaskOrderPolicy(public val ascending: Boolean = true) private val TaskState.duration: Long get() = results.getValue(task.uid) - override fun compare(o1: TaskState, o2: TaskState): Int { + override fun compare( + o1: TaskState, + o2: TaskState, + ): Int { return compareValuesBy(o1, o2) { state -> state.duration }.let { if (ascending) it else -it } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt index d2edc256..0e9c93da 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt @@ -29,16 +29,15 @@ import org.opendc.workflow.service.internal.WorkflowServiceImpl * A [TaskEligibilityPolicy] that limits the total number of active tasks in the system. */ public data class LimitTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy { - override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic { - override fun invoke( - task: TaskState - ): TaskEligibilityPolicy.Advice = - if (scheduler.activeTasks.size < limit) { - TaskEligibilityPolicy.Advice.ADMIT - } else { - TaskEligibilityPolicy.Advice.STOP - } - } + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = + object : TaskEligibilityPolicy.Logic { + override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice = + if (scheduler.activeTasks.size < limit) { + TaskEligibilityPolicy.Advice.ADMIT + } else { + TaskEligibilityPolicy.Advice.STOP + } + } override fun toString(): String = "Limit-Active($limit)" } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt index cfe2aeed..50a11784 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt @@ -32,9 +32,7 @@ public object NullTaskEligibilityPolicy : TaskEligibilityPolicy { override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = Logic private object Logic : TaskEligibilityPolicy.Logic { - override fun invoke( - task: TaskState - ): TaskEligibilityPolicy.Advice = TaskEligibilityPolicy.Advice.ADMIT + override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice = TaskEligibilityPolicy.Advice.ADMIT } override fun toString(): String = "Always" diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt index 036f3574..a883ac99 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt @@ -30,16 +30,17 @@ import java.util.Random * A [TaskEligibilityPolicy] that randomly accepts tasks in the system with some [probability]. */ public data class RandomTaskEligibilityPolicy(val probability: Double = 0.5) : TaskEligibilityPolicy { - override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic { - val random = Random(123) + override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = + object : TaskEligibilityPolicy.Logic { + val random = Random(123) - override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice = - if (random.nextDouble() <= probability || scheduler.activeTasks.isEmpty()) { - TaskEligibilityPolicy.Advice.ADMIT - } else { - TaskEligibilityPolicy.Advice.DENY - } - } + override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice = + if (random.nextDouble() <= probability || scheduler.activeTasks.isEmpty()) { + TaskEligibilityPolicy.Advice.ADMIT + } else { + TaskEligibilityPolicy.Advice.DENY + } + } override fun toString(): String = "Random($probability)" } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt index c12d6a66..134d22e0 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt @@ -49,7 +49,10 @@ public object RandomTaskOrderPolicy : TaskOrderPolicy { ids.remove(task.task) } - override fun compare(o1: TaskState, o2: TaskState): Int { + override fun compare( + o1: TaskState, + o2: TaskState, + ): Int { return compareValuesBy(o1, o2) { ids.getValue(it.task) } } } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt index e9bbf815..3b4bca8f 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt @@ -29,9 +29,10 @@ import org.opendc.workflow.service.internal.WorkflowServiceImpl * A [TaskOrderPolicy] that orders tasks based on the order of arrival in the queue. */ public data class SubmissionTimeTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy { - override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy { - it.job.submittedAt.let { if (ascending) it else -it } - } + override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = + compareBy { + it.job.submittedAt.let { if (ascending) it else -it } + } override fun toString(): String { return "Submission-Time(${if (ascending) "asc" else "desc"})" diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt index ee31aee2..89ec3847 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt @@ -65,6 +65,6 @@ public interface TaskEligibilityPolicy : StagePolicy<TaskEligibilityPolicy.Logic /** * Deny the current job and also stop admitting jobs. */ - STOP(false, true) + STOP(false, true), } } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt index 608e82df..c4f180b1 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt @@ -38,5 +38,5 @@ public data class SchedulerStats( val workflowsFinished: Int, val tasksSubmitted: Int, val tasksRunning: Int, - val tasksFinished: Int + val tasksFinished: Int, ) diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 68f2e610..1d87417d 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -66,63 +66,64 @@ internal class WorkflowServiceTest { * A large integration test where we check whether all tasks in some trace are executed correctly. */ @Test - fun testTrace() = runSimulation { - val computeService = "compute.opendc.org" - val workflowService = "workflow.opendc.org" + fun testTrace() = + runSimulation { + val computeService = "compute.opendc.org" + val workflowService = "workflow.opendc.org" - Provisioner(dispatcher, seed = 0L).use { provisioner -> - val scheduler: (ProvisioningContext) -> ComputeScheduler = { - FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), - weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) - ) - } - - provisioner.runSteps( - // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts - setupComputeService(computeService, scheduler, schedulingQuantum = Duration.ofSeconds(1)), - setupHosts(computeService, List(4) { createHostSpec(it) }), - - // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines - setupWorkflowService( - workflowService, - computeService, - WorkflowSchedulerSpec( - schedulingQuantum = Duration.ofMillis(100), - jobAdmissionPolicy = NullJobAdmissionPolicy, - jobOrderPolicy = SubmissionTimeJobOrderPolicy(), - taskEligibilityPolicy = NullTaskEligibilityPolicy, - taskOrderPolicy = SubmissionTimeTaskOrderPolicy() + Provisioner(dispatcher, seed = 0L).use { provisioner -> + val scheduler: (ProvisioningContext) -> ComputeScheduler = { + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), + weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)), ) + } + + provisioner.runSteps( + // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts + setupComputeService(computeService, scheduler, schedulingQuantum = Duration.ofSeconds(1)), + setupHosts(computeService, List(4) { createHostSpec(it) }), + // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines + setupWorkflowService( + workflowService, + computeService, + WorkflowSchedulerSpec( + schedulingQuantum = Duration.ofMillis(100), + jobAdmissionPolicy = NullJobAdmissionPolicy, + jobOrderPolicy = SubmissionTimeJobOrderPolicy(), + taskEligibilityPolicy = NullTaskEligibilityPolicy, + taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), + ), + ), ) - ) - val service = provisioner.registry.resolve(workflowService, WorkflowService::class.java)!! + val service = provisioner.registry.resolve(workflowService, WorkflowService::class.java)!! - val trace = Trace.open( - Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()), - format = "gwf" - ) - service.replay(timeSource, trace.toJobs()) + val trace = + Trace.open( + Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()), + format = "gwf", + ) + service.replay(timeSource, trace.toJobs()) - val metrics = service.getSchedulerStats() + val metrics = service.getSchedulerStats() - assertAll( - { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") }, - { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") }, - { - assertEquals( - metrics.workflowsSubmitted, - metrics.workflowsFinished, - "Not all started jobs finished" - ) - }, - { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") }, - { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, - { assertEquals(45975707L, timeSource.millis()) { "Total duration incorrect" } } - ) + assertAll( + { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") }, + { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") }, + { + assertEquals( + metrics.workflowsSubmitted, + metrics.workflowsFinished, + "Not all started jobs finished", + ) + }, + { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") }, + { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, + { assertEquals(45975707L, timeSource.millis()) { "Total duration incorrect" } }, + ) + } } - } /** * Construct a [HostSpec] for a simulated host. @@ -141,7 +142,7 @@ internal class WorkflowServiceTest { emptyMap(), machineModel, SimPsuFactories.noop(), - FlowMultiplexerFactory.forwardingMultiplexer() + FlowMultiplexerFactory.forwardingMultiplexer(), ) } } |
