summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-workflow/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-09-28 15:51:05 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-03 17:35:58 +0200
commitc453e27abe54221f76648bc91edadb2efcd1ec07 (patch)
tree2eb75de390dc735519c6d29bf2a6d50694436d26 /opendc-experiments/opendc-experiments-workflow/src
parent115e37984624a409bc1ad4f54bf10c9537183390 (diff)
feat(exp/workflow): Add provisioning step for workflow service
This change adds a new module `opendc-experiments-workflow` that provides provisioner implementations for experiments to use for setting up and using the workflow engine in OpenDC.
Diffstat (limited to 'opendc-experiments/opendc-experiments-workflow/src')
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt130
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt40
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt66
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt40
4 files changed, 276 insertions, 0 deletions
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
new file mode 100644
index 00000000..a15d3d5b
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
@@ -0,0 +1,130 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("TraceHelpers")
+package org.opendc.experiments.workflow
+
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import org.opendc.simulator.compute.workload.SimFlopsWorkload
+import org.opendc.trace.*
+import org.opendc.trace.conv.*
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.api.Task
+import org.opendc.workflow.api.WORKFLOW_TASK_CORES
+import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
+import org.opendc.workflow.service.WorkflowService
+import java.time.Clock
+import java.util.*
+import kotlin.collections.HashMap
+import kotlin.collections.HashSet
+import kotlin.math.min
+
+/**
+ * Convert [Trace] into a list of [Job]s that can be submitted to the workflow service.
+ */
+public fun Trace.toJobs(): List<Job> {
+ val table = checkNotNull(getTable(TABLE_TASKS))
+ val reader = table.newReader()
+
+ val jobs = mutableMapOf<Long, Job>()
+ val tasks = mutableMapOf<Long, Task>()
+ val taskDependencies = mutableMapOf<Task, Set<Long>>()
+
+ try {
+ while (reader.nextRow()) {
+ // Bag of tasks without workflow ID all share the same workflow
+ val workflowId = if (reader.resolve(TASK_WORKFLOW_ID) != -1) reader.getString(TASK_WORKFLOW_ID)!!.toLong() else 0L
+ val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) }
+
+ val id = reader.getString(TASK_ID)!!.toLong()
+ val grantedCpus = if (reader.resolve(TASK_ALLOC_NCPUS) != 0)
+ reader.getInt(TASK_ALLOC_NCPUS)
+ else
+ reader.getInt(TASK_REQ_NCPUS)
+ val submitTime = reader.getInstant(TASK_SUBMIT_TIME)!!
+ val runtime = reader.getDuration(TASK_RUNTIME)!!
+ val flops: Long = 4000 * runtime.seconds * grantedCpus
+ val workload = SimFlopsWorkload(flops)
+ val task = Task(
+ UUID(0L, id),
+ "<unnamed>",
+ HashSet(),
+ mapOf(
+ "workload" to workload,
+ WORKFLOW_TASK_CORES to grantedCpus,
+ WORKFLOW_TASK_DEADLINE to runtime.toMillis()
+ ),
+ )
+
+ tasks[id] = task
+ taskDependencies[task] = reader.getSet(TASK_PARENTS, String::class.java)!!.map { it.toLong() }.toSet()
+
+ (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) }
+ (workflow.tasks as MutableSet<Task>).add(task)
+ }
+
+ // Resolve dependencies for all tasks
+ for ((task, deps) in taskDependencies) {
+ for (dep in deps) {
+ val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" }
+ (task.dependencies as MutableSet<Task>).add(parent)
+ }
+ }
+ } finally {
+ reader.close()
+ }
+
+ return jobs.values.toList()
+}
+
+/**
+ * Helper method to replay the specified list of [jobs] and suspend execution util all jobs have finished.
+ */
+public suspend fun WorkflowService.replay(clock: Clock, jobs: List<Job>) {
+ // Sort jobs by their arrival time
+ val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long }
+ if (orderedJobs.isEmpty()) {
+ return
+ }
+
+ // Wait until the trace is started
+ val startTime = orderedJobs[0].metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long
+ var offset = 0L
+
+ if (startTime != Long.MAX_VALUE) {
+ offset = startTime - clock.millis()
+ delay(offset.coerceAtLeast(0))
+ }
+
+ coroutineScope {
+ for (job in orderedJobs) {
+ val submitTime = job.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long
+ if (submitTime != Long.MAX_VALUE) {
+ delay(((submitTime - offset) - clock.millis()).coerceAtLeast(0))
+ }
+
+ launch { invoke(job) }
+ }
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt
new file mode 100644
index 00000000..cb8056a7
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.workflow
+
+import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
+import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
+import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
+import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
+import java.time.Duration
+
+/**
+ * Specification of the scheduling policies of the workflow scheduler.
+ */
+public data class WorkflowSchedulerSpec(
+ val schedulingQuantum: Duration,
+ val jobAdmissionPolicy: JobAdmissionPolicy,
+ val jobOrderPolicy: JobOrderPolicy,
+ val taskEligibilityPolicy: TaskEligibilityPolicy,
+ val taskOrderPolicy: TaskOrderPolicy,
+)
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
new file mode 100644
index 00000000..a2d6a172
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.workflow
+
+import org.opendc.compute.service.ComputeService
+import org.opendc.experiments.provisioner.ProvisioningContext
+import org.opendc.experiments.provisioner.ProvisioningStep
+import org.opendc.workflow.service.WorkflowService
+import java.time.Duration
+
+/**
+ * A [ProvisioningStep] that provisions a [WorkflowService].
+ *
+ * @param serviceDomain The domain name under which to register the workflow service.
+ * @param computeService The domain name where the underlying compute service is located.
+ * @param scheduler The configuration of the scheduler of the workflow engine.
+ * @param schedulingQuantum The scheduling quantum of the compute scheduler.
+ */
+public class WorkflowServiceProvisioningStep internal constructor(
+ private val serviceDomain: String,
+ private val computeService: String,
+ private val scheduler: WorkflowSchedulerSpec,
+ private val schedulingQuantum: Duration
+) : ProvisioningStep {
+ override fun apply(ctx: ProvisioningContext): AutoCloseable {
+ val computeService = requireNotNull(ctx.registry.resolve(computeService, ComputeService::class.java)) { "Compute service $computeService does not exist" }
+
+ val client = computeService.newClient()
+ val service = WorkflowService(
+ ctx.coroutineContext,
+ ctx.clock,
+ client,
+ scheduler.schedulingQuantum,
+ jobAdmissionPolicy = scheduler.jobAdmissionPolicy,
+ jobOrderPolicy = scheduler.jobOrderPolicy,
+ taskEligibilityPolicy = scheduler.taskEligibilityPolicy,
+ taskOrderPolicy = scheduler.taskOrderPolicy,
+ )
+ ctx.registry.register(serviceDomain, WorkflowService::class.java, service)
+
+ return AutoCloseable {
+ service.close()
+ client.close()
+ }
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt
new file mode 100644
index 00000000..7aae3a9f
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("WorkflowSteps")
+package org.opendc.experiments.workflow
+
+import org.opendc.experiments.provisioner.ProvisioningStep
+import org.opendc.workflow.service.WorkflowService
+import java.time.Duration
+
+/**
+ * Return a [ProvisioningStep] that sets up a [WorkflowService].
+ */
+public fun setupWorkflowService(
+ serviceDomain: String,
+ computeService: String,
+ scheduler: WorkflowSchedulerSpec,
+ schedulingQuantum: Duration = Duration.ofMinutes(5)
+): ProvisioningStep {
+ return WorkflowServiceProvisioningStep(serviceDomain, computeService, scheduler, schedulingQuantum)
+}