diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-09-28 15:51:05 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-10-03 17:35:58 +0200 |
| commit | c453e27abe54221f76648bc91edadb2efcd1ec07 (patch) | |
| tree | 2eb75de390dc735519c6d29bf2a6d50694436d26 /opendc-workflow | |
| parent | 115e37984624a409bc1ad4f54bf10c9537183390 (diff) | |
feat(exp/workflow): Add provisioning step for workflow service
This change adds a new module `opendc-experiments-workflow` that provides
provisioner implementations for experiments to use for setting up and
using the workflow engine in OpenDC.
Diffstat (limited to 'opendc-workflow')
6 files changed, 54 insertions, 313 deletions
diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index b6365885..6908a5af 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -33,9 +33,9 @@ dependencies { implementation(projects.opendcCommon) implementation(libs.kotlin.logging) - testImplementation(projects.opendcWorkflow.opendcWorkflowWorkload) - testImplementation(projects.opendcCompute.opendcComputeWorkload) testImplementation(projects.opendcSimulator.opendcSimulatorCore) + testImplementation(projects.opendcExperiments.opendcExperimentsCompute) + testImplementation(projects.opendcExperiments.opendcExperimentsWorkflow) testImplementation(projects.opendcTrace.opendcTraceApi) testRuntimeOnly(projects.opendcTrace.opendcTraceGwf) testRuntimeOnly(libs.log4j.slf4j) diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 0fb8b67c..49496fed 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -22,17 +22,22 @@ package org.opendc.workflow.service +import org.junit.jupiter.api.Assertions.assertAll import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll +import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.VCpuWeigher -import org.opendc.compute.workload.ComputeServiceHelper import org.opendc.compute.workload.topology.HostSpec +import org.opendc.experiments.compute.setupComputeService +import org.opendc.experiments.compute.setupHosts +import org.opendc.experiments.provisioner.Provisioner +import org.opendc.experiments.provisioner.ProvisioningContext +import org.opendc.experiments.workflow.* import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -46,9 +51,6 @@ import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy -import org.opendc.workflow.workload.WorkflowSchedulerSpec -import org.opendc.workflow.workload.WorkflowServiceHelper -import org.opendc.workflow.workload.toJobs import java.nio.file.Paths import java.time.Duration import java.util.* @@ -63,55 +65,61 @@ internal class WorkflowServiceTest { */ @Test fun testTrace() = runBlockingSimulation { - // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts - val computeScheduler = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), - weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) - ) + val computeService = "compute.opendc.org" + val workflowService = "workflow.opendc.org" - val computeHelper = ComputeServiceHelper( - coroutineContext, - clock, - computeScheduler, - seed = 0, - schedulingQuantum = Duration.ofSeconds(1) - ) + Provisioner(coroutineContext, clock, seed = 0L).use { provisioner -> + val scheduler: (ProvisioningContext) -> ComputeScheduler = { + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), + weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) + ) + } - val hostCount = 4 - repeat(hostCount) { computeHelper.registerHost(createHostSpec(it)) } + provisioner.runSteps( + // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts + setupComputeService(computeService, scheduler, schedulingQuantum = Duration.ofSeconds(1)), + setupHosts(computeService, List(4) { createHostSpec(it) }), - // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines - val workflowScheduler = WorkflowSchedulerSpec( - schedulingQuantum = Duration.ofMillis(100), - jobAdmissionPolicy = NullJobAdmissionPolicy, - jobOrderPolicy = SubmissionTimeJobOrderPolicy(), - taskEligibilityPolicy = NullTaskEligibilityPolicy, - taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), - ) - val workflowHelper = WorkflowServiceHelper(coroutineContext, clock, computeHelper.service.newClient(), workflowScheduler) + // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines + setupWorkflowService( + workflowService, + computeService, + WorkflowSchedulerSpec( + schedulingQuantum = Duration.ofMillis(100), + jobAdmissionPolicy = NullJobAdmissionPolicy, + jobOrderPolicy = SubmissionTimeJobOrderPolicy(), + taskEligibilityPolicy = NullTaskEligibilityPolicy, + taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), + ) + ) + ) + + val service = provisioner.registry.resolve(workflowService, WorkflowService::class.java)!! - try { val trace = Trace.open( Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()), format = "gwf" ) + service.replay(clock, trace.toJobs()) - workflowHelper.replay(trace.toJobs()) - } finally { - workflowHelper.close() - computeHelper.close() - } - - val metrics = workflowHelper.service.getSchedulerStats() + val metrics = service.getSchedulerStats() - assertAll( - { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") }, - { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") }, - { assertEquals(metrics.workflowsSubmitted, metrics.workflowsFinished, "Not all started jobs finished") }, - { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") }, - { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, - { assertEquals(32649883L, clock.millis()) { "Total duration incorrect" } } - ) + assertAll( + { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") }, + { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") }, + { + assertEquals( + metrics.workflowsSubmitted, + metrics.workflowsFinished, + "Not all started jobs finished" + ) + }, + { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") }, + { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, + { assertEquals(32649883L, clock.millis()) { "Total duration incorrect" } } + ) + } } /** diff --git a/opendc-workflow/opendc-workflow-workload/build.gradle.kts b/opendc-workflow/opendc-workflow-workload/build.gradle.kts deleted file mode 100644 index 17eadf29..00000000 --- a/opendc-workflow/opendc-workflow-workload/build.gradle.kts +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Support library for simulating workflows with OpenDC" - -/* Build configuration */ -plugins { - `kotlin-library-conventions` -} - -dependencies { - api(projects.opendcWorkflow.opendcWorkflowService) - - implementation(projects.opendcSimulator.opendcSimulatorCompute) - implementation(projects.opendcTrace.opendcTraceApi) -} diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt deleted file mode 100644 index 5f57723b..00000000 --- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("TraceHelpers") -package org.opendc.workflow.workload - -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.trace.* -import org.opendc.trace.conv.* -import org.opendc.workflow.api.Job -import org.opendc.workflow.api.Task -import org.opendc.workflow.api.WORKFLOW_TASK_CORES -import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE -import java.util.* -import kotlin.collections.HashMap -import kotlin.collections.HashSet -import kotlin.math.min - -/** - * Convert [Trace] into a list of [Job]s that can be submitted to the workflow service. - */ -public fun Trace.toJobs(): List<Job> { - val table = checkNotNull(getTable(TABLE_TASKS)) - val reader = table.newReader() - - val jobs = mutableMapOf<Long, Job>() - val tasks = mutableMapOf<Long, Task>() - val taskDependencies = mutableMapOf<Task, Set<Long>>() - - try { - while (reader.nextRow()) { - // Bag of tasks without workflow ID all share the same workflow - val workflowId = if (reader.resolve(TASK_WORKFLOW_ID) != -1) reader.getString(TASK_WORKFLOW_ID)!!.toLong() else 0L - val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) } - - val id = reader.getString(TASK_ID)!!.toLong() - val grantedCpus = if (reader.resolve(TASK_ALLOC_NCPUS) != 0) - reader.getInt(TASK_ALLOC_NCPUS) - else - reader.getInt(TASK_REQ_NCPUS) - val submitTime = reader.getInstant(TASK_SUBMIT_TIME)!! - val runtime = reader.getDuration(TASK_RUNTIME)!! - val flops: Long = 4000 * runtime.seconds * grantedCpus - val workload = SimFlopsWorkload(flops) - val task = Task( - UUID(0L, id), - "<unnamed>", - HashSet(), - mapOf( - "workload" to workload, - WORKFLOW_TASK_CORES to grantedCpus, - WORKFLOW_TASK_DEADLINE to runtime.toMillis() - ), - ) - - tasks[id] = task - taskDependencies[task] = reader.getSet(TASK_PARENTS, String::class.java)!!.map { it.toLong() }.toSet() - - (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) } - (workflow.tasks as MutableSet<Task>).add(task) - } - - // Resolve dependencies for all tasks - for ((task, deps) in taskDependencies) { - for (dep in deps) { - val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" } - (task.dependencies as MutableSet<Task>).add(parent) - } - } - } finally { - reader.close() - } - - return jobs.values.toList() -} diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt deleted file mode 100644 index d6a375b6..00000000 --- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflow.workload - -import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy -import org.opendc.workflow.service.scheduler.job.JobOrderPolicy -import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy -import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy -import java.time.Duration - -/** - * Specification of the scheduling policies of the workflow scheduler. - */ -public data class WorkflowSchedulerSpec( - val schedulingQuantum: Duration, - val jobAdmissionPolicy: JobAdmissionPolicy, - val jobOrderPolicy: JobOrderPolicy, - val taskEligibilityPolicy: TaskEligibilityPolicy, - val taskOrderPolicy: TaskOrderPolicy, -) diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt deleted file mode 100644 index 435d0190..00000000 --- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflow.workload - -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import org.opendc.compute.api.ComputeClient -import org.opendc.workflow.api.Job -import org.opendc.workflow.service.WorkflowService -import java.time.Clock -import kotlin.coroutines.CoroutineContext - -/** - * Helper class to simulate workflow-based workloads in OpenDC. - * - * @param context [CoroutineContext] to run the simulation in. - * @param clock [Clock] instance tracking simulation time. - * @param computeClient A [ComputeClient] instance to communicate with the cluster scheduler. - * @param schedulerSpec The configuration of the workflow scheduler. - */ -public class WorkflowServiceHelper( - private val context: CoroutineContext, - private val clock: Clock, - private val computeClient: ComputeClient, - private val schedulerSpec: WorkflowSchedulerSpec -) : AutoCloseable { - /** - * The [WorkflowService] that is constructed by this runner. - */ - public val service: WorkflowService = WorkflowService( - context, - clock, - computeClient, - schedulerSpec.schedulingQuantum, - jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy, - jobOrderPolicy = schedulerSpec.jobOrderPolicy, - taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy, - taskOrderPolicy = schedulerSpec.taskOrderPolicy, - ) - - /** - * Run the specified list of [jobs] using the workflow service and suspend execution until all jobs have - * finished. - */ - public suspend fun replay(jobs: List<Job>) { - // Sort jobs by their arrival time - val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long } - if (orderedJobs.isEmpty()) { - return - } - - // Wait until the trace is started - val startTime = orderedJobs[0].metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long - var offset = 0L - - if (startTime != Long.MAX_VALUE) { - offset = startTime - clock.millis() - delay(offset.coerceAtLeast(0)) - } - - coroutineScope { - for (job in orderedJobs) { - val submitTime = job.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long - if (submitTime != Long.MAX_VALUE) { - delay(((submitTime - offset) - clock.millis()).coerceAtLeast(0)) - } - - launch { service.invoke(job) } - } - } - } - - override fun close() { - computeClient.close() - service.close() - } -} |
