From 1cfd967d6d27f339b264449ff2a1adeb705de598 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 16 Nov 2021 15:49:46 +0100 Subject: feat(workflow): Add helper tools for workflow simulations This change adds a new module, opendc-workflow-workload that contains helper code for constructing workflow simulations using OpenDC. --- .../org/opendc/workflow/workload/TraceHelpers.kt | 93 ++++++++++++++++ .../workflow/workload/WorkflowSchedulerSpec.kt | 40 +++++++ .../workflow/workload/WorkflowServiceHelper.kt | 123 +++++++++++++++++++++ 3 files changed, 256 insertions(+) create mode 100644 opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt create mode 100644 opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt create mode 100644 opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt (limited to 'opendc-workflow/opendc-workflow-workload/src') diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt new file mode 100644 index 00000000..73995d08 --- /dev/null +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("TraceHelpers") +package org.opendc.workflow.workload + +import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.trace.* +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import java.util.* +import kotlin.collections.HashMap +import kotlin.collections.HashSet +import kotlin.math.min + +/** + * Convert [Trace] into a list of [Job]s that can be submitted to the workflow service. + */ +public fun Trace.toJobs(): List { + val table = checkNotNull(getTable(TABLE_TASKS)) + val reader = table.newReader() + + val jobs = mutableMapOf() + val tasks = mutableMapOf() + val taskDependencies = mutableMapOf>() + + try { + while (reader.nextRow()) { + // Bag of tasks without workflow ID all share the same workflow + val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.get(TASK_WORKFLOW_ID).toLong() else 0L + val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "", HashSet(), HashMap()) } + + val id = reader.get(TASK_ID).toLong() + val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS)) + reader.getInt(TASK_ALLOC_NCPUS) + else + reader.getInt(TASK_REQ_NCPUS) + val submitTime = reader.get(TASK_SUBMIT_TIME) + val runtime = reader.get(TASK_RUNTIME) + val flops: Long = 4000 * runtime.seconds * grantedCpus + val workload = SimFlopsWorkload(flops) + val task = Task( + UUID(0L, id), + "", + HashSet(), + mapOf( + "workload" to workload, + WORKFLOW_TASK_CORES to grantedCpus, + WORKFLOW_TASK_DEADLINE to runtime.toMillis() + ), + ) + + tasks[id] = task + taskDependencies[task] = reader.get(TASK_PARENTS).map { it.toLong() }.toSet() + + (workflow.metadata as MutableMap).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) } + (workflow.tasks as MutableSet).add(task) + } + + // Resolve dependencies for all tasks + for ((task, deps) in taskDependencies) { + for (dep in deps) { + val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" } + (task.dependencies as MutableSet).add(parent) + } + } + } finally { + reader.close() + } + + return jobs.values.toList() +} diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt new file mode 100644 index 00000000..b22e16d9 --- /dev/null +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.workload + +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.JobOrderPolicy +import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy + +/** + * Specification of the scheduling policies of the workflow scheduler. + */ +public data class WorkflowSchedulerSpec( + val batchMode: WorkflowSchedulerMode, + val jobAdmissionPolicy: JobAdmissionPolicy, + val jobOrderPolicy: JobOrderPolicy, + val taskEligibilityPolicy: TaskEligibilityPolicy, + val taskOrderPolicy: TaskOrderPolicy, +) diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt new file mode 100644 index 00000000..236a036b --- /dev/null +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.workload + +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import org.opendc.compute.api.ComputeClient +import org.opendc.telemetry.sdk.toOtelClock +import org.opendc.workflow.api.Job +import org.opendc.workflow.service.WorkflowService +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * Helper class to simulate workflow-based workloads in OpenDC. + * + * @param context [CoroutineContext] to run the simulation in. + * @param clock [Clock] instance tracking simulation time. + * @param computeClient A [ComputeClient] instance to communicate with the cluster scheduler. + * @param schedulerSpec The configuration of the workflow scheduler. + */ +public class WorkflowServiceHelper( + private val context: CoroutineContext, + private val clock: Clock, + private val computeClient: ComputeClient, + private val schedulerSpec: WorkflowSchedulerSpec + ) : AutoCloseable { + /** + * The [WorkflowService] that is constructed by this runner. + */ + public val service: WorkflowService + + /** + * The [MetricProducer] exposed by the [WorkflowService]. + */ + public val metricProducer: MetricProducer + + init { + val resource = Resource.builder() + .put(ResourceAttributes.SERVICE_NAME, "opendc-workflow") + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + metricProducer = meterProvider + + service = WorkflowService( + context, + clock, + meterProvider, + computeClient, + mode = schedulerSpec.batchMode, + jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy, + jobOrderPolicy = schedulerSpec.jobOrderPolicy, + taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy, + taskOrderPolicy = schedulerSpec.taskOrderPolicy, + ) + } + + /** + * Run the specified list of [jobs] using the workflow service and suspend execution until all jobs have + * finished. + */ + public suspend fun replay(jobs: List) { + // Sort jobs by their arrival time + val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long } + if (orderedJobs.isEmpty()) { + return + } + + // Wait until the trace is started + val startTime = orderedJobs[0].metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long + var offset = 0L + + if (startTime != Long.MAX_VALUE) { + offset = startTime - clock.millis() + delay(offset.coerceAtLeast(0)) + } + + coroutineScope { + for (job in orderedJobs) { + val submitTime = job.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long + if (submitTime != Long.MAX_VALUE) { + delay(((submitTime - offset) - clock.millis()).coerceAtLeast(0)) + } + + launch { service.run(job) } + } + } + } + + override fun close() { + computeClient.close() + service.close() + } +} -- cgit v1.2.3