diff options
Diffstat (limited to 'opendc-workflow')
12 files changed, 39 insertions, 27 deletions
diff --git a/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt b/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt index 5e8b0b9e..b59ad6da 100644 --- a/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt +++ b/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Job.kt @@ -22,7 +22,7 @@ package org.opendc.workflow.api -import java.util.* +import java.util.UUID /** * A workload that represents a directed acyclic graph (DAG) of tasks with control and data dependencies between tasks. diff --git a/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Metadata.kt b/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Metadata.kt index db208998..a4e48b85 100644 --- a/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Metadata.kt +++ b/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Metadata.kt @@ -1,7 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 atlarge-research + * Copyright (c) 2020 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt b/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt index d91f9879..f805c210 100644 --- a/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt +++ b/opendc-workflow/opendc-workflow-api/src/main/kotlin/org/opendc/workflow/api/Task.kt @@ -22,14 +22,13 @@ package org.opendc.workflow.api -import java.util.* +import java.util.UUID /** * A stage of a [Job]. * * @property uid A unique identified of this task. * @property name The name of this task. - * @property image The application image to run as part of this workflow task. * @property dependencies The dependencies of this task in order for it to execute. * @property metadata Additional metadata for this task. */ diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 899810a2..b1780896 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -22,12 +22,19 @@ package org.opendc.workflow.service.internal -import kotlinx.coroutines.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.cancel +import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine import org.opendc.common.util.Pacer -import org.opendc.compute.api.* +import org.opendc.compute.api.ComputeClient +import org.opendc.compute.api.Image +import org.opendc.compute.api.Server +import org.opendc.compute.api.ServerState +import org.opendc.compute.api.ServerWatcher import org.opendc.workflow.api.Job import org.opendc.workflow.api.WORKFLOW_TASK_CORES -import org.opendc.workflow.service.* +import org.opendc.workflow.service.WorkflowService import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy @@ -35,7 +42,8 @@ import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats import java.time.Clock import java.time.Duration -import java.util.* +import java.util.PriorityQueue +import java.util.Queue import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume @@ -56,7 +64,7 @@ public class WorkflowServiceImpl( /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. */ - private val scope = CoroutineScope(context + Job()) + private val scope = CoroutineScope(context + kotlinx.coroutines.Job()) /** * The incoming jobs ready to be processed by the scheduler. diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt index 6a0bfeb9..bd416546 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/LimitJobAdmissionPolicy.kt @@ -35,10 +35,11 @@ public data class LimitJobAdmissionPolicy(public val limit: Int) : JobAdmissionP override fun invoke( job: JobState ): JobAdmissionPolicy.Advice = - if (scheduler.activeJobs.size < limit) + if (scheduler.activeJobs.size < limit) { JobAdmissionPolicy.Advice.ADMIT - else + } else { JobAdmissionPolicy.Advice.STOP + } } override fun toString(): String = "Limit-Active($limit)" diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt index 1b359125..4f8dc05b 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/job/RandomJobOrderPolicy.kt @@ -26,7 +26,7 @@ import org.opendc.workflow.api.Job import org.opendc.workflow.service.internal.JobState import org.opendc.workflow.service.internal.WorkflowSchedulerListener import org.opendc.workflow.service.internal.WorkflowServiceImpl -import java.util.* +import java.util.Random import kotlin.collections.HashMap /** diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt index 42804f5a..a2c11e4d 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/BalancingTaskEligibilityPolicy.kt @@ -65,10 +65,11 @@ public data class BalancingTaskEligibilityPolicy(public val tolerance: Double = val activeTasks = scheduler.activeTasks.size val baseline = max(activeTasks / activeJobs.toDouble(), 1.0) val activeForJob = active[task.job]!! - return if ((activeForJob + 1) / baseline < tolerance) + return if ((activeForJob + 1) / baseline < tolerance) { TaskEligibilityPolicy.Advice.ADMIT - else + } else { TaskEligibilityPolicy.Advice.DENY + } } } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt index d9fde53a..a2ca2086 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/DurationTaskOrderPolicy.kt @@ -26,7 +26,7 @@ import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE import org.opendc.workflow.service.internal.TaskState import org.opendc.workflow.service.internal.WorkflowSchedulerListener import org.opendc.workflow.service.internal.WorkflowServiceImpl -import java.util.* +import java.util.UUID import kotlin.collections.HashMap import kotlin.collections.getValue import kotlin.collections.set diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt index 229460df..b0eb3f2c 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitPerJobTaskEligibilityPolicy.kt @@ -57,10 +57,11 @@ public data class LimitPerJobTaskEligibilityPolicy(public val limit: Int) : Task override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice { val activeForJob = active[task.job]!! - return if (activeForJob <= limit) + return if (activeForJob <= limit) { TaskEligibilityPolicy.Advice.ADMIT - else + } else { TaskEligibilityPolicy.Advice.DENY + } } } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt index 57aa0d58..d2edc256 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/LimitTaskEligibilityPolicy.kt @@ -33,10 +33,11 @@ public data class LimitTaskEligibilityPolicy(val limit: Int) : TaskEligibilityPo override fun invoke( task: TaskState ): TaskEligibilityPolicy.Advice = - if (scheduler.activeTasks.size < limit) + if (scheduler.activeTasks.size < limit) { TaskEligibilityPolicy.Advice.ADMIT - else + } else { TaskEligibilityPolicy.Advice.STOP + } } override fun toString(): String = "Limit-Active($limit)" diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt index a01439c2..036f3574 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/task/RandomTaskEligibilityPolicy.kt @@ -24,7 +24,7 @@ package org.opendc.workflow.service.scheduler.task import org.opendc.workflow.service.internal.TaskState import org.opendc.workflow.service.internal.WorkflowServiceImpl -import java.util.* +import java.util.Random /** * A [TaskEligibilityPolicy] that randomly accepts tasks in the system with some [probability]. @@ -34,9 +34,9 @@ public data class RandomTaskEligibilityPolicy(val probability: Double = 0.5) : T val random = Random(123) override fun invoke(task: TaskState): TaskEligibilityPolicy.Advice = - if (random.nextDouble() <= probability || scheduler.activeTasks.isEmpty()) + if (random.nextDouble() <= probability || scheduler.activeTasks.isEmpty()) { TaskEligibilityPolicy.Advice.ADMIT - else { + } else { TaskEligibilityPolicy.Advice.DENY } } diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index e37f489d..977f5677 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -37,7 +37,10 @@ import org.opendc.experiments.compute.setupHosts import org.opendc.experiments.compute.topology.HostSpec import org.opendc.experiments.provisioner.Provisioner import org.opendc.experiments.provisioner.ProvisioningContext -import org.opendc.experiments.workflow.* +import org.opendc.experiments.workflow.WorkflowSchedulerSpec +import org.opendc.experiments.workflow.replay +import org.opendc.experiments.workflow.setupWorkflowService +import org.opendc.experiments.workflow.toJobs import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -53,7 +56,7 @@ import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy import java.nio.file.Paths import java.time.Duration -import java.util.* +import java.util.UUID /** * Integration test suite for the [WorkflowService]. @@ -90,7 +93,7 @@ internal class WorkflowServiceTest { jobAdmissionPolicy = NullJobAdmissionPolicy, jobOrderPolicy = SubmissionTimeJobOrderPolicy(), taskEligibilityPolicy = NullTaskEligibilityPolicy, - taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), + taskOrderPolicy = SubmissionTimeTaskOrderPolicy() ) ) ) |
