summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-workflow
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-04 10:04:50 +0200
committerGitHub <noreply@github.com>2022-10-04 10:04:50 +0200
commit92cc0908b7ad6c94b08e6016f8815ab07cd1714d (patch)
treeb5edaff69212986265f9edc620e40bb8695f11eb /opendc-experiments/opendc-experiments-workflow
parent2d2a3854d355bd4b074ef651f291d34081e70d96 (diff)
parentbd476d11ab24fe745bb54e97a11133706bb96cb1 (diff)
merge: Add provisioning tool for experiments (#104)
This pull request implements a new tool to help provision and manage the experimental environment. ## Implementation Notes :hammer_and_pick: * Add service registry for cloud services * Add provisioning tool for experiments * Add provisioning step for workflow service * Add provisioners for FaaS service * Use experiment base for Capelin experiments * Use experiment base for web runner * Integrate compute workload classes * Remove Topology interface ## Breaking API Changes :warning: * Removal of the `opendc-compute-workload`, `opendc-faas-workload`, and `opendc-workflow-workload` modules. These are now located inside `opendc-experiments` * Removal of `ComputeServiceHelper`. Use `Provisioner` to provision a `ComputeService`.
Diffstat (limited to 'opendc-experiments/opendc-experiments-workflow')
-rw-r--r--opendc-experiments/opendc-experiments-workflow/build.gradle.kts41
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt130
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt40
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt66
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt40
5 files changed, 317 insertions, 0 deletions
diff --git a/opendc-experiments/opendc-experiments-workflow/build.gradle.kts b/opendc-experiments/opendc-experiments-workflow/build.gradle.kts
new file mode 100644
index 00000000..4fc34d2d
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-workflow/build.gradle.kts
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Support library for simulating workflows with OpenDC"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+ `testing-conventions`
+ `jacoco-conventions`
+}
+
+dependencies {
+ api(projects.opendcExperiments.opendcExperimentsBase)
+ api(projects.opendcWorkflow.opendcWorkflowApi)
+
+ implementation(libs.kotlinx.coroutines)
+ implementation(projects.opendcCompute.opendcComputeService)
+ implementation(projects.opendcWorkflow.opendcWorkflowService)
+ implementation(projects.opendcSimulator.opendcSimulatorCompute)
+ implementation(projects.opendcTrace.opendcTraceApi)
+}
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
new file mode 100644
index 00000000..a15d3d5b
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
@@ -0,0 +1,130 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("TraceHelpers")
+package org.opendc.experiments.workflow
+
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import org.opendc.simulator.compute.workload.SimFlopsWorkload
+import org.opendc.trace.*
+import org.opendc.trace.conv.*
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.api.Task
+import org.opendc.workflow.api.WORKFLOW_TASK_CORES
+import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
+import org.opendc.workflow.service.WorkflowService
+import java.time.Clock
+import java.util.*
+import kotlin.collections.HashMap
+import kotlin.collections.HashSet
+import kotlin.math.min
+
+/**
+ * Convert [Trace] into a list of [Job]s that can be submitted to the workflow service.
+ */
+public fun Trace.toJobs(): List<Job> {
+ val table = checkNotNull(getTable(TABLE_TASKS))
+ val reader = table.newReader()
+
+ val jobs = mutableMapOf<Long, Job>()
+ val tasks = mutableMapOf<Long, Task>()
+ val taskDependencies = mutableMapOf<Task, Set<Long>>()
+
+ try {
+ while (reader.nextRow()) {
+ // Bag of tasks without workflow ID all share the same workflow
+ val workflowId = if (reader.resolve(TASK_WORKFLOW_ID) != -1) reader.getString(TASK_WORKFLOW_ID)!!.toLong() else 0L
+ val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) }
+
+ val id = reader.getString(TASK_ID)!!.toLong()
+ val grantedCpus = if (reader.resolve(TASK_ALLOC_NCPUS) != 0)
+ reader.getInt(TASK_ALLOC_NCPUS)
+ else
+ reader.getInt(TASK_REQ_NCPUS)
+ val submitTime = reader.getInstant(TASK_SUBMIT_TIME)!!
+ val runtime = reader.getDuration(TASK_RUNTIME)!!
+ val flops: Long = 4000 * runtime.seconds * grantedCpus
+ val workload = SimFlopsWorkload(flops)
+ val task = Task(
+ UUID(0L, id),
+ "<unnamed>",
+ HashSet(),
+ mapOf(
+ "workload" to workload,
+ WORKFLOW_TASK_CORES to grantedCpus,
+ WORKFLOW_TASK_DEADLINE to runtime.toMillis()
+ ),
+ )
+
+ tasks[id] = task
+ taskDependencies[task] = reader.getSet(TASK_PARENTS, String::class.java)!!.map { it.toLong() }.toSet()
+
+ (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) }
+ (workflow.tasks as MutableSet<Task>).add(task)
+ }
+
+ // Resolve dependencies for all tasks
+ for ((task, deps) in taskDependencies) {
+ for (dep in deps) {
+ val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" }
+ (task.dependencies as MutableSet<Task>).add(parent)
+ }
+ }
+ } finally {
+ reader.close()
+ }
+
+ return jobs.values.toList()
+}
+
+/**
+ * Helper method to replay the specified list of [jobs] and suspend execution util all jobs have finished.
+ */
+public suspend fun WorkflowService.replay(clock: Clock, jobs: List<Job>) {
+ // Sort jobs by their arrival time
+ val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long }
+ if (orderedJobs.isEmpty()) {
+ return
+ }
+
+ // Wait until the trace is started
+ val startTime = orderedJobs[0].metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long
+ var offset = 0L
+
+ if (startTime != Long.MAX_VALUE) {
+ offset = startTime - clock.millis()
+ delay(offset.coerceAtLeast(0))
+ }
+
+ coroutineScope {
+ for (job in orderedJobs) {
+ val submitTime = job.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long
+ if (submitTime != Long.MAX_VALUE) {
+ delay(((submitTime - offset) - clock.millis()).coerceAtLeast(0))
+ }
+
+ launch { invoke(job) }
+ }
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt
new file mode 100644
index 00000000..cb8056a7
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.workflow
+
+import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
+import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
+import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
+import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
+import java.time.Duration
+
+/**
+ * Specification of the scheduling policies of the workflow scheduler.
+ */
+public data class WorkflowSchedulerSpec(
+ val schedulingQuantum: Duration,
+ val jobAdmissionPolicy: JobAdmissionPolicy,
+ val jobOrderPolicy: JobOrderPolicy,
+ val taskEligibilityPolicy: TaskEligibilityPolicy,
+ val taskOrderPolicy: TaskOrderPolicy,
+)
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
new file mode 100644
index 00000000..a2d6a172
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.workflow
+
+import org.opendc.compute.service.ComputeService
+import org.opendc.experiments.provisioner.ProvisioningContext
+import org.opendc.experiments.provisioner.ProvisioningStep
+import org.opendc.workflow.service.WorkflowService
+import java.time.Duration
+
+/**
+ * A [ProvisioningStep] that provisions a [WorkflowService].
+ *
+ * @param serviceDomain The domain name under which to register the workflow service.
+ * @param computeService The domain name where the underlying compute service is located.
+ * @param scheduler The configuration of the scheduler of the workflow engine.
+ * @param schedulingQuantum The scheduling quantum of the compute scheduler.
+ */
+public class WorkflowServiceProvisioningStep internal constructor(
+ private val serviceDomain: String,
+ private val computeService: String,
+ private val scheduler: WorkflowSchedulerSpec,
+ private val schedulingQuantum: Duration
+) : ProvisioningStep {
+ override fun apply(ctx: ProvisioningContext): AutoCloseable {
+ val computeService = requireNotNull(ctx.registry.resolve(computeService, ComputeService::class.java)) { "Compute service $computeService does not exist" }
+
+ val client = computeService.newClient()
+ val service = WorkflowService(
+ ctx.coroutineContext,
+ ctx.clock,
+ client,
+ scheduler.schedulingQuantum,
+ jobAdmissionPolicy = scheduler.jobAdmissionPolicy,
+ jobOrderPolicy = scheduler.jobOrderPolicy,
+ taskEligibilityPolicy = scheduler.taskEligibilityPolicy,
+ taskOrderPolicy = scheduler.taskOrderPolicy,
+ )
+ ctx.registry.register(serviceDomain, WorkflowService::class.java, service)
+
+ return AutoCloseable {
+ service.close()
+ client.close()
+ }
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt
new file mode 100644
index 00000000..7aae3a9f
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("WorkflowSteps")
+package org.opendc.experiments.workflow
+
+import org.opendc.experiments.provisioner.ProvisioningStep
+import org.opendc.workflow.service.WorkflowService
+import java.time.Duration
+
+/**
+ * Return a [ProvisioningStep] that sets up a [WorkflowService].
+ */
+public fun setupWorkflowService(
+ serviceDomain: String,
+ computeService: String,
+ scheduler: WorkflowSchedulerSpec,
+ schedulingQuantum: Duration = Duration.ofMinutes(5)
+): ProvisioningStep {
+ return WorkflowServiceProvisioningStep(serviceDomain, computeService, scheduler, schedulingQuantum)
+}