summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2024-03-05 13:23:57 +0100
committerGitHub <noreply@github.com>2024-03-05 13:23:57 +0100
commit5864cbcbfe2eb8c36ca05c3a39c7e5916aeecaec (patch)
tree5b2773b8dc21c2e1b526fb70f829c376dd80532a /opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc
parentd28002a3c151d198298574312f32f1cb43f3a660 (diff)
Updated package versions, updated web server tests. (#207)
* Updated all package versions including kotlin. Updated all web-server tests to run. * Changed the java version of the tests. OpenDC now only supports java 19. * small update * test update * new update * updated docker version to 19 * updated docker version to 19
Diffstat (limited to 'opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc')
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt40
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt2
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt26
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt2
4 files changed, 41 insertions, 29 deletions
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
index 2037dad4..e396901c 100644
--- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
@@ -66,30 +66,35 @@ public fun Trace.toJobs(): List<Job> {
val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) }
val id = reader.getString(TASK_ID)!!.toLong()
- val grantedCpus = if (reader.resolve(TASK_ALLOC_NCPUS) != 0) {
- reader.getInt(TASK_ALLOC_NCPUS)
- } else {
- reader.getInt(TASK_REQ_NCPUS)
- }
+ val grantedCpus =
+ if (reader.resolve(TASK_ALLOC_NCPUS) != 0) {
+ reader.getInt(TASK_ALLOC_NCPUS)
+ } else {
+ reader.getInt(TASK_REQ_NCPUS)
+ }
val submitTime = reader.getInstant(TASK_SUBMIT_TIME)!!
val runtime = reader.getDuration(TASK_RUNTIME)!!
val flops: Long = 4000 * runtime.seconds * grantedCpus
val workload = SimWorkloads.flops(flops, 1.0)
- val task = Task(
- UUID(0L, id),
- "<unnamed>",
- HashSet(),
- mapOf(
- "workload" to workload,
- WORKFLOW_TASK_CORES to grantedCpus,
- WORKFLOW_TASK_DEADLINE to runtime.toMillis()
+ val task =
+ Task(
+ UUID(0L, id),
+ "<unnamed>",
+ HashSet(),
+ mapOf(
+ "workload" to workload,
+ WORKFLOW_TASK_CORES to grantedCpus,
+ WORKFLOW_TASK_DEADLINE to runtime.toMillis(),
+ ),
)
- )
tasks[id] = task
taskDependencies[task] = reader.getSet(TASK_PARENTS, String::class.java)!!.map { it.toLong() }.toSet()
- (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) }
+ (workflow.metadata as MutableMap<String, Any>).merge(
+ "WORKFLOW_SUBMIT_TIME",
+ submitTime.toEpochMilli(),
+ ) { a, b -> min(a as Long, b as Long) }
(workflow.tasks as MutableSet<Task>).add(task)
}
@@ -110,7 +115,10 @@ public fun Trace.toJobs(): List<Job> {
/**
* Helper method to replay the specified list of [jobs] and suspend execution util all jobs have finished.
*/
-public suspend fun WorkflowService.replay(clock: InstantSource, jobs: List<Job>) {
+public suspend fun WorkflowService.replay(
+ clock: InstantSource,
+ jobs: List<Job>,
+) {
// Sort jobs by their arrival time
val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long }
if (orderedJobs.isEmpty()) {
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt
index 8bd087e7..cb8056a7 100644
--- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt
@@ -36,5 +36,5 @@ public data class WorkflowSchedulerSpec(
val jobAdmissionPolicy: JobAdmissionPolicy,
val jobOrderPolicy: JobOrderPolicy,
val taskEligibilityPolicy: TaskEligibilityPolicy,
- val taskOrderPolicy: TaskOrderPolicy
+ val taskOrderPolicy: TaskOrderPolicy,
)
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
index 862ebf3d..af2a4871 100644
--- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
@@ -40,21 +40,25 @@ public class WorkflowServiceProvisioningStep internal constructor(
private val serviceDomain: String,
private val computeService: String,
private val scheduler: WorkflowSchedulerSpec,
- private val schedulingQuantum: Duration
+ private val schedulingQuantum: Duration,
) : ProvisioningStep {
override fun apply(ctx: ProvisioningContext): AutoCloseable {
- val computeService = requireNotNull(ctx.registry.resolve(computeService, ComputeService::class.java)) { "Compute service $computeService does not exist" }
+ val computeService =
+ requireNotNull(
+ ctx.registry.resolve(computeService, ComputeService::class.java),
+ ) { "Compute service $computeService does not exist" }
val client = computeService.newClient()
- val service = WorkflowService(
- ctx.dispatcher,
- client,
- scheduler.schedulingQuantum,
- jobAdmissionPolicy = scheduler.jobAdmissionPolicy,
- jobOrderPolicy = scheduler.jobOrderPolicy,
- taskEligibilityPolicy = scheduler.taskEligibilityPolicy,
- taskOrderPolicy = scheduler.taskOrderPolicy
- )
+ val service =
+ WorkflowService(
+ ctx.dispatcher,
+ client,
+ scheduler.schedulingQuantum,
+ jobAdmissionPolicy = scheduler.jobAdmissionPolicy,
+ jobOrderPolicy = scheduler.jobOrderPolicy,
+ taskEligibilityPolicy = scheduler.taskEligibilityPolicy,
+ taskOrderPolicy = scheduler.taskOrderPolicy,
+ )
ctx.registry.register(serviceDomain, WorkflowService::class.java, service)
return AutoCloseable {
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt
index efcbf889..bfcf3734 100644
--- a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt
@@ -35,7 +35,7 @@ public fun setupWorkflowService(
serviceDomain: String,
computeService: String,
scheduler: WorkflowSchedulerSpec,
- schedulingQuantum: Duration = Duration.ofMinutes(5)
+ schedulingQuantum: Duration = Duration.ofMinutes(5),
): ProvisioningStep {
return WorkflowServiceProvisioningStep(serviceDomain, computeService, scheduler, schedulingQuantum)
}