summaryrefslogtreecommitdiff
path: root/opendc-workflow
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-workflow')
-rw-r--r--opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt2
-rw-r--r--opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Metadata.kt4
-rw-r--r--opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt3
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt18
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt5
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt2
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt5
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt2
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt5
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt5
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt6
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt9
12 files changed, 39 insertions, 27 deletions
diff --git a/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt b/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt
index 5e8b0b9e..b59ad6da 100644
--- a/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt
+++ b/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt
@@ -22,7 +22,7 @@
package org.opendc.workflow.api
-import java.util.*
+import java.util.UUID
/**
* A workload that represents a directed acyclic graph (DAG) of tasks with control and data dependencies between tasks.
diff --git a/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Metadata.kt b/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Metadata.kt
index db208998..a4e48b85 100644
--- a/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Metadata.kt
+++ b/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Metadata.kt
@@ -1,7 +1,5 @@
/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
+ * Copyright (c) 2020 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
diff --git a/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt b/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt
index d91f9879..f805c210 100644
--- a/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt
+++ b/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt
@@ -22,14 +22,13 @@
package org.opendc.workflow.api
-import java.util.*
+import java.util.UUID
/**
* A stage of a [Job].
*
* @property uid A unique identified of this task.
* @property name The name of this task.
- * @property image The application image to run as part of this workflow task.
* @property dependencies The dependencies of this task in order for it to execute.
* @property metadata Additional metadata for this task.
*/
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index 899810a2..b1780896 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -22,12 +22,19 @@
package org.opendc.workflow.service.internal
-import kotlinx.coroutines.*
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.cancel
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.suspendCancellableCoroutine
import org.opendc.common.util.Pacer
-import org.opendc.compute.api.*
+import org.opendc.compute.api.ComputeClient
+import org.opendc.compute.api.Image
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.api.ServerWatcher
import org.opendc.workflow.api.Job
import org.opendc.workflow.api.WORKFLOW_TASK_CORES
-import org.opendc.workflow.service.*
+import org.opendc.workflow.service.WorkflowService
import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
@@ -35,7 +42,8 @@ import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats
import java.time.Clock
import java.time.Duration
-import java.util.*
+import java.util.PriorityQueue
+import java.util.Queue
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
@@ -56,7 +64,7 @@ public class WorkflowServiceImpl(
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
*/
- private val scope = CoroutineScope(context + Job())
+ private val scope = CoroutineScope(context + kotlinx.coroutines.Job())
/**
* The incoming jobs ready to be processed by the scheduler.
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt
index 6a0bfeb9..bd416546 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt
@@ -35,10 +35,11 @@ public data class LimitJobAdmissionPolicy(public val limit: Int) : JobAdmissionP
override fun invoke(
job: JobState
): JobAdmissionPolicy.Advice =
- if (scheduler.activeJobs.size < limit)
+ if (scheduler.activeJobs.size < limit) {
JobAdmissionPolicy.Advice.ADMIT
- else
+ } else {
JobAdmissionPolicy.Advice.STOP
+ }
}
override fun toString(): String = "Limit-Active($limit)"
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt
index 1b359125..4f8dc05b 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt
@@ -26,7 +26,7 @@ import org.opendc.workflow.api.Job
import org.opendc.workflow.service.internal.JobState
import org.opendc.workflow.service.internal.WorkflowSchedulerListener
import org.opendc.workflow.service.internal.WorkflowServiceImpl
-import java.util.*
+import java.util.Random
import kotlin.collections.HashMap
/**
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt
index 42804f5a..a2c11e4d 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt
@@ -65,10 +65,11 @@ public data class BalancingTaskEligibilityPolicy(public val tolerance: Double =
val activeTasks = scheduler.activeTasks.size
val baseline = max(activeTasks / activeJobs.toDouble(), 1.0)
val activeForJob = active[task.job]!!
- return if ((activeForJob + 1) / baseline < tolerance)
+ return if ((activeForJob + 1) / baseline < tolerance) {
TaskEligibilityPolicy.Advice.ADMIT
- else
+ } else {
TaskEligibilityPolicy.Advice.DENY
+ }
}
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt
index d9fde53a..a2ca2086 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt
@@ -26,7 +26,7 @@ import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
import org.opendc.workflow.service.internal.TaskState
import org.opendc.workflow.service.internal.WorkflowSchedulerListener
import org.opendc.workflow.service.internal.WorkflowServiceImpl
-import java.util.*
+import java.util.UUID
import kotlin.collections.HashMap
import kotlin.collections.getValue
import kotlin.collections.set
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt
index 229460df..b0eb3f2c 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt
@@ -57,10 +57,11 @@ public data class LimitPerJobTaskEligibilityPolicy(public val limit: Int) : Task
override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice {
val activeForJob = active[task.job]!!
- return if (activeForJob <= limit)
+ return if (activeForJob <= limit) {
TaskEligibilityPolicy.Advice.ADMIT
- else
+ } else {
TaskEligibilityPolicy.Advice.DENY
+ }
}
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt
index 57aa0d58..d2edc256 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt
@@ -33,10 +33,11 @@ public data class LimitTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPo
override fun invoke(
task: TaskState
): TaskEligibilityPolicy.Advice =
- if (scheduler.activeTasks.size < limit)
+ if (scheduler.activeTasks.size < limit) {
TaskEligibilityPolicy.Advice.ADMIT
- else
+ } else {
TaskEligibilityPolicy.Advice.STOP
+ }
}
override fun toString(): String = "Limit-Active($limit)"
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt
index a01439c2..036f3574 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt
@@ -24,7 +24,7 @@ package org.opendc.workflow.service.scheduler.task
import org.opendc.workflow.service.internal.TaskState
import org.opendc.workflow.service.internal.WorkflowServiceImpl
-import java.util.*
+import java.util.Random
/**
* A [TaskEligibilityPolicy] that randomly accepts tasks in the system with some [probability].
@@ -34,9 +34,9 @@ public data class RandomTaskEligibilityPolicy(val probability: Double = 0.5) : T
val random = Random(123)
override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice =
- if (random.nextDouble() <= probability || scheduler.activeTasks.isEmpty())
+ if (random.nextDouble() <= probability || scheduler.activeTasks.isEmpty()) {
TaskEligibilityPolicy.Advice.ADMIT
- else {
+ } else {
TaskEligibilityPolicy.Advice.DENY
}
}
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index e37f489d..977f5677 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -37,7 +37,10 @@ import org.opendc.experiments.compute.setupHosts
import org.opendc.experiments.compute.topology.HostSpec
import org.opendc.experiments.provisioner.Provisioner
import org.opendc.experiments.provisioner.ProvisioningContext
-import org.opendc.experiments.workflow.*
+import org.opendc.experiments.workflow.WorkflowSchedulerSpec
+import org.opendc.experiments.workflow.replay
+import org.opendc.experiments.workflow.setupWorkflowService
+import org.opendc.experiments.workflow.toJobs
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
@@ -53,7 +56,7 @@ import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy
import java.nio.file.Paths
import java.time.Duration
-import java.util.*
+import java.util.UUID
/**
* Integration test suite for the [WorkflowService].
@@ -90,7 +93,7 @@ internal class WorkflowServiceTest {
jobAdmissionPolicy = NullJobAdmissionPolicy,
jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
taskEligibilityPolicy = NullTaskEligibilityPolicy,
- taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
+ taskOrderPolicy = SubmissionTimeTaskOrderPolicy()
)
)
)