diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-09-28 15:51:05 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-10-03 17:35:58 +0200 |
| commit | c453e27abe54221f76648bc91edadb2efcd1ec07 (patch) | |
| tree | 2eb75de390dc735519c6d29bf2a6d50694436d26 /opendc-experiments/opendc-experiments-workflow/src | |
| parent | 115e37984624a409bc1ad4f54bf10c9537183390 (diff) | |
feat(exp/workflow): Add provisioning step for workflow service
This change adds a new module `opendc-experiments-workflow` that provides
provisioner implementations for experiments to use for setting up and
using the workflow engine in OpenDC.
Diffstat (limited to 'opendc-experiments/opendc-experiments-workflow/src')
4 files changed, 276 insertions, 0 deletions
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt new file mode 100644 index 00000000..a15d3d5b --- /dev/null +++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("TraceHelpers") +package org.opendc.experiments.workflow + +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.trace.* +import org.opendc.trace.conv.* +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import org.opendc.workflow.service.WorkflowService +import java.time.Clock +import java.util.* +import kotlin.collections.HashMap +import kotlin.collections.HashSet +import kotlin.math.min + +/** + * Convert [Trace] into a list of [Job]s that can be submitted to the workflow service. + */ +public fun Trace.toJobs(): List<Job> { + val table = checkNotNull(getTable(TABLE_TASKS)) + val reader = table.newReader() + + val jobs = mutableMapOf<Long, Job>() + val tasks = mutableMapOf<Long, Task>() + val taskDependencies = mutableMapOf<Task, Set<Long>>() + + try { + while (reader.nextRow()) { + // Bag of tasks without workflow ID all share the same workflow + val workflowId = if (reader.resolve(TASK_WORKFLOW_ID) != -1) reader.getString(TASK_WORKFLOW_ID)!!.toLong() else 0L + val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) } + + val id = reader.getString(TASK_ID)!!.toLong() + val grantedCpus = if (reader.resolve(TASK_ALLOC_NCPUS) != 0) + reader.getInt(TASK_ALLOC_NCPUS) + else + reader.getInt(TASK_REQ_NCPUS) + val submitTime = reader.getInstant(TASK_SUBMIT_TIME)!! + val runtime = reader.getDuration(TASK_RUNTIME)!! + val flops: Long = 4000 * runtime.seconds * grantedCpus + val workload = SimFlopsWorkload(flops) + val task = Task( + UUID(0L, id), + "<unnamed>", + HashSet(), + mapOf( + "workload" to workload, + WORKFLOW_TASK_CORES to grantedCpus, + WORKFLOW_TASK_DEADLINE to runtime.toMillis() + ), + ) + + tasks[id] = task + taskDependencies[task] = reader.getSet(TASK_PARENTS, String::class.java)!!.map { it.toLong() }.toSet() + + (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) } + (workflow.tasks as MutableSet<Task>).add(task) + } + + // Resolve dependencies for all tasks + for ((task, deps) in taskDependencies) { + for (dep in deps) { + val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" } + (task.dependencies as MutableSet<Task>).add(parent) + } + } + } finally { + reader.close() + } + + return jobs.values.toList() +} + +/** + * Helper method to replay the specified list of [jobs] and suspend execution util all jobs have finished. + */ +public suspend fun WorkflowService.replay(clock: Clock, jobs: List<Job>) { + // Sort jobs by their arrival time + val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long } + if (orderedJobs.isEmpty()) { + return + } + + // Wait until the trace is started + val startTime = orderedJobs[0].metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long + var offset = 0L + + if (startTime != Long.MAX_VALUE) { + offset = startTime - clock.millis() + delay(offset.coerceAtLeast(0)) + } + + coroutineScope { + for (job in orderedJobs) { + val submitTime = job.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long + if (submitTime != Long.MAX_VALUE) { + delay(((submitTime - offset) - clock.millis()).coerceAtLeast(0)) + } + + launch { invoke(job) } + } + } +} diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt new file mode 100644 index 00000000..cb8056a7 --- /dev/null +++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.workflow + +import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.JobOrderPolicy +import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy +import java.time.Duration + +/** + * Specification of the scheduling policies of the workflow scheduler. + */ +public data class WorkflowSchedulerSpec( + val schedulingQuantum: Duration, + val jobAdmissionPolicy: JobAdmissionPolicy, + val jobOrderPolicy: JobOrderPolicy, + val taskEligibilityPolicy: TaskEligibilityPolicy, + val taskOrderPolicy: TaskOrderPolicy, +) diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt new file mode 100644 index 00000000..a2d6a172 --- /dev/null +++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.workflow + +import org.opendc.compute.service.ComputeService +import org.opendc.experiments.provisioner.ProvisioningContext +import org.opendc.experiments.provisioner.ProvisioningStep +import org.opendc.workflow.service.WorkflowService +import java.time.Duration + +/** + * A [ProvisioningStep] that provisions a [WorkflowService]. + * + * @param serviceDomain The domain name under which to register the workflow service. + * @param computeService The domain name where the underlying compute service is located. + * @param scheduler The configuration of the scheduler of the workflow engine. + * @param schedulingQuantum The scheduling quantum of the compute scheduler. + */ +public class WorkflowServiceProvisioningStep internal constructor( + private val serviceDomain: String, + private val computeService: String, + private val scheduler: WorkflowSchedulerSpec, + private val schedulingQuantum: Duration +) : ProvisioningStep { + override fun apply(ctx: ProvisioningContext): AutoCloseable { + val computeService = requireNotNull(ctx.registry.resolve(computeService, ComputeService::class.java)) { "Compute service $computeService does not exist" } + + val client = computeService.newClient() + val service = WorkflowService( + ctx.coroutineContext, + ctx.clock, + client, + scheduler.schedulingQuantum, + jobAdmissionPolicy = scheduler.jobAdmissionPolicy, + jobOrderPolicy = scheduler.jobOrderPolicy, + taskEligibilityPolicy = scheduler.taskEligibilityPolicy, + taskOrderPolicy = scheduler.taskOrderPolicy, + ) + ctx.registry.register(serviceDomain, WorkflowService::class.java, service) + + return AutoCloseable { + service.close() + client.close() + } + } +} diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt new file mode 100644 index 00000000..7aae3a9f --- /dev/null +++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("WorkflowSteps") +package org.opendc.experiments.workflow + +import org.opendc.experiments.provisioner.ProvisioningStep +import org.opendc.workflow.service.WorkflowService +import java.time.Duration + +/** + * Return a [ProvisioningStep] that sets up a [WorkflowService]. + */ +public fun setupWorkflowService( + serviceDomain: String, + computeService: String, + scheduler: WorkflowSchedulerSpec, + schedulingQuantum: Duration = Duration.ofMinutes(5) +): ProvisioningStep { + return WorkflowServiceProvisioningStep(serviceDomain, computeService, scheduler, schedulingQuantum) +} |
