summaryrefslogtreecommitdiff
path: root/opendc-workflow/opendc-workflow-service/src/main
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2024-03-05 13:23:57 +0100
committerGitHub <noreply@github.com>2024-03-05 13:23:57 +0100
commit5864cbcbfe2eb8c36ca05c3a39c7e5916aeecaec (patch)
tree5b2773b8dc21c2e1b526fb70f829c376dd80532a /opendc-workflow/opendc-workflow-service/src/main
parentd28002a3c151d198298574312f32f1cb43f3a660 (diff)
Updated package versions, updated web server tests. (#207)
* Updated all package versions including kotlin. Updated all web-server tests to run. * Changed the java version of the tests. OpenDC now only supports java 19. * small update * test update * new update * updated docker version to 19 * updated docker version to 19
Diffstat (limited to 'opendc-workflow/opendc-workflow-service/src/main')
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt4
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt2
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt169
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt14
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt2
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt19
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt7
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt5
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt5
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt5
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt7
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt7
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt5
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt6
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt19
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt4
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt19
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt5
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt7
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt2
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt2
21 files changed, 176 insertions, 139 deletions
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
index 07b43b6d..4efc7953 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
@@ -73,7 +73,7 @@ public interface WorkflowService : AutoCloseable {
jobAdmissionPolicy: JobAdmissionPolicy,
jobOrderPolicy: JobOrderPolicy,
taskEligibilityPolicy: TaskEligibilityPolicy,
- taskOrderPolicy: TaskOrderPolicy
+ taskOrderPolicy: TaskOrderPolicy,
): WorkflowService {
return WorkflowServiceImpl(
dispatcher,
@@ -82,7 +82,7 @@ public interface WorkflowService : AutoCloseable {
jobAdmissionPolicy,
jobOrderPolicy,
taskEligibilityPolicy,
- taskOrderPolicy
+ taskOrderPolicy,
)
}
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt
index fe941d09..e5475ee6 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/TaskStatus.kt
@@ -29,5 +29,5 @@ public enum class TaskStatus {
CREATED,
READY,
ACTIVE,
- FINISHED
+ FINISHED,
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index d54584b3..93a55c3d 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -59,7 +59,7 @@ public class WorkflowServiceImpl(
jobAdmissionPolicy: JobAdmissionPolicy,
jobOrderPolicy: JobOrderPolicy,
taskEligibilityPolicy: TaskEligibilityPolicy,
- taskOrderPolicy: TaskOrderPolicy
+ taskOrderPolicy: TaskOrderPolicy,
) : WorkflowService, ServerWatcher {
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
@@ -109,47 +109,48 @@ public class WorkflowServiceImpl(
/**
* The root listener of this scheduler.
*/
- private val rootListener = object : WorkflowSchedulerListener {
- /**
- * The listeners to delegate to.
- */
- val listeners = mutableListOf<WorkflowSchedulerListener>()
-
- override fun jobSubmitted(job: JobState) {
- listeners.forEach { it.jobSubmitted(job) }
- }
+ private val rootListener =
+ object : WorkflowSchedulerListener {
+ /**
+ * The listeners to delegate to.
+ */
+ val listeners = mutableListOf<WorkflowSchedulerListener>()
+
+ override fun jobSubmitted(job: JobState) {
+ listeners.forEach { it.jobSubmitted(job) }
+ }
- override fun jobStarted(job: JobState) {
- listeners.forEach { it.jobStarted(job) }
- }
+ override fun jobStarted(job: JobState) {
+ listeners.forEach { it.jobStarted(job) }
+ }
- override fun jobFinished(job: JobState) {
- listeners.forEach { it.jobFinished(job) }
- }
+ override fun jobFinished(job: JobState) {
+ listeners.forEach { it.jobFinished(job) }
+ }
- override fun taskReady(task: TaskState) {
- listeners.forEach { it.taskReady(task) }
- }
+ override fun taskReady(task: TaskState) {
+ listeners.forEach { it.taskReady(task) }
+ }
- override fun taskAssigned(task: TaskState) {
- listeners.forEach { it.taskAssigned(task) }
- }
+ override fun taskAssigned(task: TaskState) {
+ listeners.forEach { it.taskAssigned(task) }
+ }
- override fun taskStarted(task: TaskState) {
- listeners.forEach { it.taskStarted(task) }
- }
+ override fun taskStarted(task: TaskState) {
+ listeners.forEach { it.taskStarted(task) }
+ }
- override fun taskFinished(task: TaskState) {
- listeners.forEach { it.taskFinished(task) }
+ override fun taskFinished(task: TaskState) {
+ listeners.forEach { it.taskFinished(task) }
+ }
}
- }
- private var _workflowsSubmitted: Int = 0
- private var _workflowsRunning: Int = 0
- private var _workflowsFinished: Int = 0
- private var _tasksSubmitted: Int = 0
- private var _tasksRunning: Int = 0
- private var _tasksFinished: Int = 0
+ private var localWorkflowsSubmitted: Int = 0
+ private var localWorkflowsRunning: Int = 0
+ private var localWorkflowsFinished: Int = 0
+ private var localTasksSubmitted: Int = 0
+ private var localTasksRunning: Int = 0
+ private var localTasksFinished: Int = 0
/**
* The [Pacer] to use for scheduling the scheduler cycles.
@@ -170,37 +171,46 @@ public class WorkflowServiceImpl(
scope.launch { image = computeClient.newImage("workflow-runner") }
}
- override suspend fun invoke(job: Job): Unit = suspendCancellableCoroutine { cont ->
- // J1 Incoming Jobs
- val jobInstance = JobState(job, clock.millis(), cont)
- val instances = job.tasks.associateWith {
- TaskState(jobInstance, it)
- }
+ override suspend fun invoke(job: Job): Unit =
+ suspendCancellableCoroutine { cont ->
+ // J1 Incoming Jobs
+ val jobInstance = JobState(job, clock.millis(), cont)
+ val instances =
+ job.tasks.associateWith {
+ TaskState(jobInstance, it)
+ }
- for ((task, instance) in instances) {
- instance.dependencies.addAll(task.dependencies.map { instances[it]!! })
- task.dependencies.forEach {
- instances[it]!!.dependents.add(instance)
- }
+ for ((task, instance) in instances) {
+ instance.dependencies.addAll(task.dependencies.map { instances[it]!! })
+ task.dependencies.forEach {
+ instances[it]!!.dependents.add(instance)
+ }
- // If the task has no dependency, it is a root task and can immediately be evaluated
- if (instance.isRoot) {
- instance.state = TaskStatus.READY
- }
+ // If the task has no dependency, it is a root task and can immediately be evaluated
+ if (instance.isRoot) {
+ instance.state = TaskStatus.READY
+ }
- _tasksSubmitted++
- }
+ localTasksSubmitted++
+ }
- instances.values.toCollection(jobInstance.tasks)
- incomingJobs += jobInstance
- rootListener.jobSubmitted(jobInstance)
- _workflowsSubmitted++
+ instances.values.toCollection(jobInstance.tasks)
+ incomingJobs += jobInstance
+ rootListener.jobSubmitted(jobInstance)
+ localWorkflowsSubmitted++
- pacer.enqueue()
- }
+ pacer.enqueue()
+ }
override fun getSchedulerStats(): SchedulerStats {
- return SchedulerStats(_workflowsSubmitted, _workflowsRunning, _workflowsFinished, _tasksSubmitted, _tasksRunning, _tasksFinished)
+ return SchedulerStats(
+ localWorkflowsSubmitted,
+ localWorkflowsRunning,
+ localWorkflowsFinished,
+ localTasksSubmitted,
+ localTasksRunning,
+ localTasksFinished,
+ )
}
override fun close() {
@@ -240,7 +250,7 @@ public class WorkflowServiceImpl(
jobQueue.add(jobInstance)
activeJobs += jobInstance
- _workflowsRunning++
+ localWorkflowsRunning++
rootListener.jobStarted(jobInstance)
}
@@ -286,18 +296,20 @@ public class WorkflowServiceImpl(
val cores = instance.task.metadata[WORKFLOW_TASK_CORES] as? Int ?: 1
val image = image
scope.launch {
- val flavor = computeClient.newFlavor(
- instance.task.name,
- cores,
- 1000
- ) // TODO How to determine memory usage for workflow task
- val server = computeClient.newServer(
- instance.task.name,
- image,
- flavor,
- start = false,
- meta = instance.task.metadata
- )
+ val flavor =
+ computeClient.newFlavor(
+ instance.task.name,
+ cores,
+ 1000,
+ ) // TODO How to determine memory usage for workflow task
+ val server =
+ computeClient.newServer(
+ instance.task.name,
+ image,
+ flavor,
+ start = false,
+ meta = instance.task.metadata,
+ )
instance.state = TaskStatus.ACTIVE
instance.server = server
@@ -313,13 +325,16 @@ public class WorkflowServiceImpl(
}
}
- override fun onStateChanged(server: Server, newState: ServerState) {
+ override fun onStateChanged(
+ server: Server,
+ newState: ServerState,
+ ) {
when (newState) {
ServerState.PROVISIONING -> {}
ServerState.RUNNING -> {
val task = taskByServer.getValue(server)
task.startedAt = clock.millis()
- _tasksRunning++
+ localTasksRunning++
rootListener.taskStarted(task)
}
ServerState.TERMINATED, ServerState.ERROR -> {
@@ -336,8 +351,8 @@ public class WorkflowServiceImpl(
job.tasks.remove(task)
activeTasks -= task
- _tasksRunning--
- _tasksFinished++
+ localTasksRunning--
+ localTasksFinished++
rootListener.taskFinished(task)
// Add job roots to the scheduling queue
@@ -363,8 +378,8 @@ public class WorkflowServiceImpl(
private fun finishJob(job: JobState) {
activeJobs -= job
- _workflowsRunning--
- _workflowsFinished++
+ localWorkflowsRunning--
+ localWorkflowsFinished++
rootListener.jobFinished(job)
job.cont.resume(Unit)
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt
index 1b5b91b9..7ae3244e 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/DurationJobOrderPolicy.kt
@@ -47,17 +47,21 @@ public data class DurationJobOrderPolicy(val ascending: Boolean = true) : JobOrd
get() = results[this]!!
override fun jobSubmitted(job: JobState) {
- results[job.job] = job.job.toposort().map { task ->
- val estimable = task.metadata[WORKFLOW_TASK_DEADLINE] as? Long?
- estimable ?: Long.MAX_VALUE
- }.sum()
+ results[job.job] =
+ job.job.toposort().map { task ->
+ val estimable = task.metadata[WORKFLOW_TASK_DEADLINE] as? Long?
+ estimable ?: Long.MAX_VALUE
+ }.sum()
}
override fun jobFinished(job: JobState) {
results.remove(job.job)
}
- override fun compare(o1: JobState, o2: JobState): Int {
+ override fun compare(
+ o1: JobState,
+ o2: JobState,
+ ): Int {
return compareValuesBy(o1, o2) { it.job.duration }.let { if (ascending) it else -it }
}
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt
index ed3acff7..475c8f97 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/JobAdmissionPolicy.kt
@@ -65,6 +65,6 @@ public interface JobAdmissionPolicy : StagePolicy<JobAdmissionPolicy.Logic> {
/**
* Deny the current job and also stop admitting jobs.
*/
- STOP(false, true)
+ STOP(false, true),
}
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt
index bd416546..f1b81259 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt
@@ -31,16 +31,15 @@ import org.opendc.workflow.service.internal.WorkflowServiceImpl
* @property limit The maximum number of concurrent jobs in the system.
*/
public data class LimitJobAdmissionPolicy(public val limit: Int) : JobAdmissionPolicy {
- override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic {
- override fun invoke(
- job: JobState
- ): JobAdmissionPolicy.Advice =
- if (scheduler.activeJobs.size < limit) {
- JobAdmissionPolicy.Advice.ADMIT
- } else {
- JobAdmissionPolicy.Advice.STOP
- }
- }
+ override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic =
+ object : JobAdmissionPolicy.Logic {
+ override fun invoke(job: JobState): JobAdmissionPolicy.Advice =
+ if (scheduler.activeJobs.size < limit) {
+ JobAdmissionPolicy.Advice.ADMIT
+ } else {
+ JobAdmissionPolicy.Advice.STOP
+ }
+ }
override fun toString(): String = "Limit-Active($limit)"
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt
index 31f8f8db..731a0047 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/NullJobAdmissionPolicy.kt
@@ -29,9 +29,10 @@ import org.opendc.workflow.service.internal.WorkflowServiceImpl
* A [JobAdmissionPolicy] that admits all jobs.
*/
public object NullJobAdmissionPolicy : JobAdmissionPolicy {
- override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic = object : JobAdmissionPolicy.Logic {
- override fun invoke(job: JobState): JobAdmissionPolicy.Advice = JobAdmissionPolicy.Advice.ADMIT
- }
+ override fun invoke(scheduler: WorkflowServiceImpl): JobAdmissionPolicy.Logic =
+ object : JobAdmissionPolicy.Logic {
+ override fun invoke(job: JobState): JobAdmissionPolicy.Advice = JobAdmissionPolicy.Advice.ADMIT
+ }
override fun toString(): String = "Always"
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt
index 4f8dc05b..ea005e97 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt
@@ -52,7 +52,10 @@ public object RandomJobOrderPolicy : JobOrderPolicy {
ids.remove(job.job)
}
- override fun compare(o1: JobState, o2: JobState): Int {
+ override fun compare(
+ o1: JobState,
+ o2: JobState,
+ ): Int {
return compareValuesBy(o1, o2) { ids.getValue(it.job) }
}
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt
index 821d4964..48fdafe9 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/ActiveTaskOrderPolicy.kt
@@ -55,7 +55,10 @@ public data class ActiveTaskOrderPolicy(public val ascending: Boolean = true) :
active.merge(task.job, -1, Int::plus)
}
- override fun compare(o1: TaskState, o2: TaskState): Int {
+ override fun compare(
+ o1: TaskState,
+ o2: TaskState,
+ ): Int {
return compareValuesBy(o1, o2) { active.getValue(it.job) }.let {
if (ascending) it else -it
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt
index dae7ad99..104de105 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/CompletionTaskOrderPolicy.kt
@@ -51,7 +51,10 @@ public data class CompletionTaskOrderPolicy(public val ascending: Boolean = true
finished.merge(task.job, 1, Int::plus)
}
- override fun compare(o1: TaskState, o2: TaskState): Int {
+ override fun compare(
+ o1: TaskState,
+ o2: TaskState,
+ ): Int {
return compareValuesBy(o1, o2) { finished.getValue(it.job) / it.job.tasks.size.toDouble() }.let {
if (ascending) it else -it
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt
index 7786f6ec..df9ebfad 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependenciesTaskOrderPolicy.kt
@@ -29,9 +29,10 @@ import org.opendc.workflow.service.internal.WorkflowServiceImpl
* A [TaskOrderPolicy] that orders tasks based on the number of dependency tasks it has.
*/
public data class DependenciesTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy {
- override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy {
- it.task.dependencies.size.let { if (ascending) it else -it }
- }
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> =
+ compareBy {
+ it.task.dependencies.size.let { if (ascending) it else -it }
+ }
override fun toString(): String {
return "Task-Dependencies(${if (ascending) "asc" else "desc"})"
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt
index 4fb835d7..bbc20348 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DependentsTaskOrderPolicy.kt
@@ -29,9 +29,10 @@ import org.opendc.workflow.service.internal.WorkflowServiceImpl
* A [TaskOrderPolicy] that orders tasks based on the number of dependent tasks it has.
*/
public data class DependentsTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy {
- override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy {
- it.dependents.size.let { if (ascending) it else -it }
- }
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> =
+ compareBy {
+ it.dependents.size.let { if (ascending) it else -it }
+ }
override fun toString(): String {
return "Task-Dependents(${if (ascending) "asc" else "desc"})"
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt
index 3a634de7..e21acb41 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationHistoryTaskOrderPolicy.kt
@@ -51,7 +51,10 @@ public data class DurationHistoryTaskOrderPolicy(public val ascending: Boolean =
results.getValue(task.job) += task.finishedAt - task.startedAt
}
- override fun compare(o1: TaskState, o2: TaskState): Int {
+ override fun compare(
+ o1: TaskState,
+ o2: TaskState,
+ ): Int {
return compareValuesBy(o1, o2) { key ->
val history = results.getValue(key.job)
if (history.isEmpty()) {
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt
index a2ca2086..170f3394 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt
@@ -35,7 +35,6 @@ import kotlin.collections.set
* A [TaskOrderPolicy] orders tasks based on the pre-specified (approximate) duration of the task.
*/
public data class DurationTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy {
-
override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> =
object : Comparator<TaskState>, WorkflowSchedulerListener {
private val results = HashMap<UUID, Long>()
@@ -56,7 +55,10 @@ public data class DurationTaskOrderPolicy(public val ascending: Boolean = true)
private val TaskState.duration: Long
get() = results.getValue(task.uid)
- override fun compare(o1: TaskState, o2: TaskState): Int {
+ override fun compare(
+ o1: TaskState,
+ o2: TaskState,
+ ): Int {
return compareValuesBy(o1, o2) { state -> state.duration }.let {
if (ascending) it else -it
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt
index d2edc256..0e9c93da 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt
@@ -29,16 +29,15 @@ import org.opendc.workflow.service.internal.WorkflowServiceImpl
* A [TaskEligibilityPolicy] that limits the total number of active tasks in the system.
*/
public data class LimitTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPolicy {
- override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic {
- override fun invoke(
- task: TaskState
- ): TaskEligibilityPolicy.Advice =
- if (scheduler.activeTasks.size < limit) {
- TaskEligibilityPolicy.Advice.ADMIT
- } else {
- TaskEligibilityPolicy.Advice.STOP
- }
- }
+ override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic =
+ object : TaskEligibilityPolicy.Logic {
+ override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice =
+ if (scheduler.activeTasks.size < limit) {
+ TaskEligibilityPolicy.Advice.ADMIT
+ } else {
+ TaskEligibilityPolicy.Advice.STOP
+ }
+ }
override fun toString(): String = "Limit-Active($limit)"
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt
index cfe2aeed..50a11784 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/NullTaskEligibilityPolicy.kt
@@ -32,9 +32,7 @@ public object NullTaskEligibilityPolicy : TaskEligibilityPolicy {
override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = Logic
private object Logic : TaskEligibilityPolicy.Logic {
- override fun invoke(
- task: TaskState
- ): TaskEligibilityPolicy.Advice = TaskEligibilityPolicy.Advice.ADMIT
+ override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice = TaskEligibilityPolicy.Advice.ADMIT
}
override fun toString(): String = "Always"
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt
index 036f3574..a883ac99 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt
@@ -30,16 +30,17 @@ import java.util.Random
* A [TaskEligibilityPolicy] that randomly accepts tasks in the system with some [probability].
*/
public data class RandomTaskEligibilityPolicy(val probability: Double = 0.5) : TaskEligibilityPolicy {
- override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic = object : TaskEligibilityPolicy.Logic {
- val random = Random(123)
+ override fun invoke(scheduler: WorkflowServiceImpl): TaskEligibilityPolicy.Logic =
+ object : TaskEligibilityPolicy.Logic {
+ val random = Random(123)
- override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice =
- if (random.nextDouble() <= probability || scheduler.activeTasks.isEmpty()) {
- TaskEligibilityPolicy.Advice.ADMIT
- } else {
- TaskEligibilityPolicy.Advice.DENY
- }
- }
+ override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice =
+ if (random.nextDouble() <= probability || scheduler.activeTasks.isEmpty()) {
+ TaskEligibilityPolicy.Advice.ADMIT
+ } else {
+ TaskEligibilityPolicy.Advice.DENY
+ }
+ }
override fun toString(): String = "Random($probability)"
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt
index c12d6a66..134d22e0 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskOrderPolicy.kt
@@ -49,7 +49,10 @@ public object RandomTaskOrderPolicy : TaskOrderPolicy {
ids.remove(task.task)
}
- override fun compare(o1: TaskState, o2: TaskState): Int {
+ override fun compare(
+ o1: TaskState,
+ o2: TaskState,
+ ): Int {
return compareValuesBy(o1, o2) { ids.getValue(it.task) }
}
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt
index e9bbf815..3b4bca8f 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/SubmissionTimeTaskOrderPolicy.kt
@@ -29,9 +29,10 @@ import org.opendc.workflow.service.internal.WorkflowServiceImpl
* A [TaskOrderPolicy] that orders tasks based on the order of arrival in the queue.
*/
public data class SubmissionTimeTaskOrderPolicy(public val ascending: Boolean = true) : TaskOrderPolicy {
- override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> = compareBy {
- it.job.submittedAt.let { if (ascending) it else -it }
- }
+ override fun invoke(scheduler: WorkflowServiceImpl): Comparator<TaskState> =
+ compareBy {
+ it.job.submittedAt.let { if (ascending) it else -it }
+ }
override fun toString(): String {
return "Submission-Time(${if (ascending) "asc" else "desc"})"
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt
index ee31aee2..89ec3847 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/TaskEligibilityPolicy.kt
@@ -65,6 +65,6 @@ public interface TaskEligibilityPolicy : StagePolicy<TaskEligibilityPolicy.Logic
/**
* Deny the current job and also stop admitting jobs.
*/
- STOP(false, true)
+ STOP(false, true),
}
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt
index 608e82df..c4f180b1 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt
@@ -38,5 +38,5 @@ public data class SchedulerStats(
val workflowsFinished: Int,
val tasksSubmitted: Int,
val tasksRunning: Int,
- val tasksFinished: Int
+ val tasksFinished: Int,
)