diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-11-17 13:40:51 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-11-17 13:40:51 +0100 |
| commit | a921c9f847aabe72bdef42574d96f905d9fb2468 (patch) | |
| tree | f0982efeacf4910a73eb3133a54b50352984018f /opendc-workflow | |
| parent | 381c7589cbf01ca6ed321c58c8a3a9cbea6ebd84 (diff) | |
| parent | 1dcdca7e09e34cdde53f6d14db56780688e19eae (diff) | |
merge: Improve workflow service (#43)
This pull request is the first in a series of changes to the workflow service.
This pull request aims to help users setup workflow simulations.
The next pull requests will focus on improving the design of the workflow service.
* Remove WorkflowSchedulerMode
* Add helper tools for workflow simulations (e.g. `WorkflowServiceHelper`)
**Breaking API Changes**
* Removal of `WorkflowSchedulerMode`
* Rename from `ComputeWorkloadRunner` to `ComputeServiceHelper`
Diffstat (limited to 'opendc-workflow')
12 files changed, 445 insertions, 371 deletions
diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index 43b64b15..4d8b7d7f 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -37,8 +37,9 @@ dependencies { implementation(projects.opendcUtils) implementation(libs.kotlin.logging) + testImplementation(projects.opendcWorkflow.opendcWorkflowWorkload) + testImplementation(projects.opendcCompute.opendcComputeWorkload) testImplementation(projects.opendcSimulator.opendcSimulatorCore) - testImplementation(projects.opendcCompute.opendcComputeSimulator) testImplementation(projects.opendcTrace.opendcTraceApi) testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) testRuntimeOnly(projects.opendcTrace.opendcTraceGwf) diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index a0248a93..ebace07d 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -26,29 +26,24 @@ import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient import org.opendc.workflow.api.Job import org.opendc.workflow.service.internal.WorkflowServiceImpl -import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy import java.time.Clock +import java.time.Duration import kotlin.coroutines.CoroutineContext /** - * A service for cloud workflow management. + * A service for cloud workflow execution. * * The workflow scheduler is modelled after the Reference Architecture for Topology Scheduling by Andreadis et al. */ public interface WorkflowService : AutoCloseable { /** - * Submit the specified [Job] to the workflow service for scheduling. + * Submit the specified [Job] and suspend execution until the job is finished. */ - public suspend fun submit(job: Job) - - /** - * Run the specified [Job] and suspend execution until the job is finished. - */ - public suspend fun run(job: Job) + public suspend fun invoke(job: Job) /** * Terminate the lifecycle of the workflow service, stopping all running workflows. @@ -61,10 +56,9 @@ public interface WorkflowService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. - * @param tracer The event tracer to use. * @param meterProvider The meter provider to use. * @param compute The compute client to use. - * @param mode The scheduling mode to use. + * @param schedulingQuantum The scheduling quantum to use (minimum duration between scheduling cycles). * @param jobAdmissionPolicy The job admission policy to use. * @param jobOrderPolicy The job order policy to use. * @param taskEligibilityPolicy The task eligibility policy to use. @@ -75,7 +69,7 @@ public interface WorkflowService : AutoCloseable { clock: Clock, meterProvider: MeterProvider, compute: ComputeClient, - mode: WorkflowSchedulerMode, + schedulingQuantum: Duration, jobAdmissionPolicy: JobAdmissionPolicy, jobOrderPolicy: JobOrderPolicy, taskEligibilityPolicy: TaskEligibilityPolicy, @@ -86,7 +80,7 @@ public interface WorkflowService : AutoCloseable { clock, meterProvider, compute, - mode, + schedulingQuantum, jobAdmissionPolicy, jobOrderPolicy, taskEligibilityPolicy, diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt index 1bb67169..d2fc1ed7 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt @@ -23,8 +23,9 @@ package org.opendc.workflow.service.internal import org.opendc.workflow.api.Job +import kotlin.coroutines.Continuation -public class JobState(public val job: Job, public val submittedAt: Long) { +public class JobState(public val job: Job, public val submittedAt: Long, internal val cont: Continuation<Unit>) { /** * A flag to indicate whether this job is finished. */ diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt index 29c6aeea..8e76a36e 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt @@ -22,16 +22,44 @@ package org.opendc.workflow.service.internal -public interface WorkflowSchedulerListener { - public fun cycleStarted(scheduler: WorkflowServiceImpl) {} - public fun cycleFinished(scheduler: WorkflowServiceImpl) {} +import org.opendc.workflow.service.WorkflowService +/** + * Interface for listening to events emitted by the [WorkflowService]. + */ +public interface WorkflowSchedulerListener { + /** + * This method is invoked when [job] is submitted to the service. + */ public fun jobSubmitted(job: JobState) {} + + /** + * This method is invoked when [job] is started by the service. + */ public fun jobStarted(job: JobState) {} + + /** + * This method is invoked when [job] finishes. + */ public fun jobFinished(job: JobState) {} + /** + * This method is invoked when [task] becomes ready to be scheduled. + */ public fun taskReady(task: TaskState) {} + + /** + * This method is invoked when [task] is assigned to a machine. + */ public fun taskAssigned(task: TaskState) {} + + /** + * This method is invoked when [task] is started. + */ public fun taskStarted(task: TaskState) {} + + /** + * This method is invoked when [task] finishes. + */ public fun taskFinished(task: TaskState) {} } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index a0fd3fad..1cadce44 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -25,20 +25,18 @@ package org.opendc.workflow.service.internal import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* -import kotlinx.coroutines.flow.map -import mu.KotlinLogging import org.opendc.compute.api.* +import org.opendc.utils.TimerScheduler import org.opendc.workflow.api.Job import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.service.* -import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy import java.time.Clock +import java.time.Duration import java.util.* -import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume @@ -48,10 +46,10 @@ import kotlin.coroutines.resume */ public class WorkflowServiceImpl( context: CoroutineContext, - internal val clock: Clock, + private val clock: Clock, meterProvider: MeterProvider, private val computeClient: ComputeClient, - mode: WorkflowSchedulerMode, + private val schedulingQuantum: Duration, jobAdmissionPolicy: JobAdmissionPolicy, jobOrderPolicy: JobOrderPolicy, taskEligibilityPolicy: TaskEligibilityPolicy, @@ -60,12 +58,7 @@ public class WorkflowServiceImpl( /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. */ - internal val scope = CoroutineScope(context + Job()) - - /** - * The logger instance to use. - */ - private val logger = KotlinLogging.logger {} + private val scope = CoroutineScope(context + Job()) /** * The [Meter] to collect metrics of this service. @@ -75,22 +68,22 @@ public class WorkflowServiceImpl( /** * The incoming jobs ready to be processed by the scheduler. */ - internal val incomingJobs: MutableSet<JobState> = linkedSetOf() + private val incomingJobs: MutableSet<JobState> = linkedSetOf() /** * The incoming tasks ready to be processed by the scheduler. */ - internal val incomingTasks: MutableSet<TaskState> = linkedSetOf() + private val incomingTasks: MutableSet<TaskState> = linkedSetOf() /** * The job queue. */ - internal val jobQueue: Queue<JobState> + private val jobQueue: Queue<JobState> /** * The task queue. */ - internal val taskQueue: Queue<TaskState> + private val taskQueue: Queue<TaskState> /** * The active jobs in the system. @@ -105,12 +98,7 @@ public class WorkflowServiceImpl( /** * The running tasks by [Server]. */ - internal val taskByServer = mutableMapOf<Server, TaskState>() - - /** - * The continuation of the jobs. - */ - private val conts = mutableMapOf<Job, Continuation<Unit>>() + private val taskByServer = mutableMapOf<Server, TaskState>() /** * The root listener of this scheduler. @@ -119,15 +107,7 @@ public class WorkflowServiceImpl( /** * The listeners to delegate to. */ - val listeners = mutableSetOf<WorkflowSchedulerListener>() - - override fun cycleStarted(scheduler: WorkflowServiceImpl) { - listeners.forEach { it.cycleStarted(scheduler) } - } - - override fun cycleFinished(scheduler: WorkflowServiceImpl) { - listeners.forEach { it.cycleFinished(scheduler) } - } + val listeners = mutableListOf<WorkflowSchedulerListener>() override fun jobSubmitted(job: JobState) { listeners.forEach { it.jobSubmitted(job) } @@ -206,25 +186,27 @@ public class WorkflowServiceImpl( .setUnit("1") .build() - private val mode: WorkflowSchedulerMode.Logic + /** + * The [TimerScheduler] to use for scheduling the scheduler cycles. + */ + private val timerScheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock) + private val jobAdmissionPolicy: JobAdmissionPolicy.Logic private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic private lateinit var image: Image init { - this.mode = mode(this) this.jobAdmissionPolicy = jobAdmissionPolicy(this) this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid }) this.taskEligibilityPolicy = taskEligibilityPolicy(this) this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid }) - scope.launch { - image = computeClient.newImage("workflow-runner") - } + + scope.launch { image = computeClient.newImage("workflow-runner") } } - override suspend fun run(job: Job) { + override suspend fun invoke(job: Job): Unit = suspendCancellableCoroutine { cont -> // J1 Incoming Jobs - val jobInstance = JobState(job, clock.millis()) + val jobInstance = JobState(job, clock.millis(), cont) val instances = job.tasks.associateWith { TaskState(jobInstance, it) } @@ -243,36 +225,61 @@ public class WorkflowServiceImpl( submittedTasks.add(1) } - return suspendCancellableCoroutine { cont -> - instances.values.toCollection(jobInstance.tasks) - incomingJobs += jobInstance - rootListener.jobSubmitted(jobInstance) - conts[job] = cont + instances.values.toCollection(jobInstance.tasks) + incomingJobs += jobInstance + rootListener.jobSubmitted(jobInstance) + submittedJobs.add(1) - submittedJobs.add(1) + requestSchedulingCycle() + } - requestCycle() - } + override fun close() { + scope.cancel() } - override suspend fun submit(job: Job) { - scope.launch { run(job) } + /** + * Register a [WorkflowSchedulerListener]. + */ + public fun addListener(listener: WorkflowSchedulerListener) { + rootListener.listeners += listener } - override fun close() { - scope.cancel() + /** + * Unregister a [WorkflowSchedulerListener]. + */ + public fun removeListener(listener: WorkflowSchedulerListener) { + rootListener.listeners -= listener } /** - * Indicate to the scheduler that a scheduling cycle is needed. + * Indicate that a new scheduling cycle is needed due to a change to the service's state. */ - private fun requestCycle() = mode.requestCycle() + private fun requestSchedulingCycle() { + // Bail out in case we have already requested a new cycle or the queue is empty. + if (timerScheduler.isTimerActive(Unit) || incomingJobs.isEmpty()) { + return + } + + val quantum = schedulingQuantum.toMillis() + if (quantum == 0L) { + doSchedule() + return + } + + // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). + // This is important because the slices of the VMs need to be aligned. + // We calculate here the delay until the next scheduling slot. + val delay = quantum - (clock.millis() % quantum) + + timerScheduler.startSingleTimer(Unit, delay) { + doSchedule() + } + } /** * Perform a scheduling cycle immediately. */ - @OptIn(ExperimentalCoroutinesApi::class) - internal suspend fun schedule() { + private fun doSchedule() { // J2 Create list of eligible jobs val iterator = incomingJobs.iterator() while (iterator.hasNext()) { @@ -293,8 +300,8 @@ public class WorkflowServiceImpl( } // J4 Per job - while (jobQueue.isNotEmpty()) { - val jobInstance = jobQueue.poll() + while (true) { + val jobInstance = jobQueue.poll() ?: break // Edge-case: job has no tasks if (jobInstance.isFinished) { @@ -361,7 +368,7 @@ public class WorkflowServiceImpl( } } - public override fun onStateChanged(server: Server, newState: ServerState) { + override fun onStateChanged(server: Server, newState: ServerState) { when (newState) { ServerState.PROVISIONING -> {} ServerState.RUNNING -> { @@ -402,7 +409,7 @@ public class WorkflowServiceImpl( finishJob(job) } - requestCycle() + requestSchedulingCycle() } ServerState.DELETED -> { } @@ -416,14 +423,6 @@ public class WorkflowServiceImpl( finishedJobs.add(1) rootListener.jobFinished(job) - conts.remove(job.job)?.resume(Unit) - } - - public fun addListener(listener: WorkflowSchedulerListener) { - rootListener.listeners += listener - } - - public fun removeListener(listener: WorkflowSchedulerListener) { - rootListener.listeners -= listener + job.cont.resume(Unit) } } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt deleted file mode 100644 index 58e7893f..00000000 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflow.service.scheduler - -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import org.opendc.workflow.service.internal.WorkflowServiceImpl - -/** - * The operating mode of a workflow scheduler. - */ -public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> { - /** - * The logic for operating the cycles of a workflow scheduler. - */ - public interface Logic { - /** - * Request a new scheduling cycle to be performed. - */ - public fun requestCycle() - } - - /** - * An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received. - */ - public object Interactive : WorkflowSchedulerMode() { - override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic { - override fun requestCycle() { - scheduler.scope.launch { scheduler.schedule() } - } - } - - override fun toString(): String = "Interactive" - } - - /** - * A batch scheduler triggers a scheduling cycle every time quantum if needed. - */ - public data class Batch(val quantum: Long) : WorkflowSchedulerMode() { - private var next: kotlinx.coroutines.Job? = null - - override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic { - override fun requestCycle() { - if (next == null) { - // In batch mode, we assume that the scheduler runs at a fixed slot every time - // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot. - val delay = quantum - (scheduler.clock.millis() % quantum) - - val job = scheduler.scope.launch { - delay(delay) - next = null - scheduler.schedule() - } - next = job - } - } - } - - override fun toString(): String = "Batch($quantum)" - } - - /** - * A scheduling cycle is triggered at a random point in time. - */ - public data class Random(private val random: java.util.Random = java.util.Random(123)) : WorkflowSchedulerMode() { - private var next: kotlinx.coroutines.Job? = null - - override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic { - override fun requestCycle() { - if (next == null) { - val delay = random.nextInt(200).toLong() - - val job = scheduler.scope.launch { - delay(delay) - next = null - scheduler.schedule() - } - next = job - } - } - } - - override fun toString(): String = "Random" - } -} diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt deleted file mode 100644 index 9ee3736e..00000000 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflow.service - -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.trace.* -import org.opendc.workflow.api.Job -import org.opendc.workflow.api.Task -import org.opendc.workflow.api.WORKFLOW_TASK_CORES -import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE -import java.time.Clock -import java.util.* -import kotlin.collections.HashMap -import kotlin.collections.HashSet -import kotlin.math.max -import kotlin.math.min - -/** - * Helper tool to replay workflow trace. - */ -internal class TraceReplayer(private val trace: Trace) { - /** - * Replay the workload. - */ - public suspend fun replay(clock: Clock, service: WorkflowService) { - val jobs = parseTrace(trace) - - // Sort jobs by their arrival time - (jobs as MutableList<Job>).sortBy { it.metadata["WORKFLOW_SUBMIT_TIME"] as Long } - - // Wait until the trace is started - val startTime = jobs[0].metadata["WORKFLOW_SUBMIT_TIME"] as Long - delay(min(0L, startTime - clock.millis())) - - val offset = startTime - clock.millis() - - coroutineScope { - for (job in jobs) { - val submitTime = job.metadata["WORKFLOW_SUBMIT_TIME"] as Long - delay(max(0, (submitTime - offset) - clock.millis())) - - launch { service.run(job) } - } - } - } - - /** - * Convert [trace] into a list of [Job]s that can be submitted to the workflow service. - */ - public fun parseTrace(trace: Trace): List<Job> { - val table = checkNotNull(trace.getTable(TABLE_TASKS)) - val reader = table.newReader() - - val jobs = mutableMapOf<Long, Job>() - val tasks = mutableMapOf<Long, Task>() - val taskDependencies = mutableMapOf<Task, Set<Long>>() - - try { - while (reader.nextRow()) { - // Bag of tasks without workflow ID all share the same workflow - val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.get(TASK_WORKFLOW_ID).toLong() else 0L - val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) } - - val id = reader.get(TASK_ID).toLong() - val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS)) - reader.getInt(TASK_ALLOC_NCPUS) - else - reader.getInt(TASK_REQ_NCPUS) - val submitTime = reader.get(TASK_SUBMIT_TIME) - val runtime = reader.get(TASK_RUNTIME) - val flops: Long = 4000 * runtime.seconds * grantedCpus - val workload = SimFlopsWorkload(flops) - val task = Task( - UUID(0L, id), - "<unnamed>", - HashSet(), - mapOf( - "workload" to workload, - WORKFLOW_TASK_CORES to grantedCpus, - WORKFLOW_TASK_DEADLINE to runtime.toMillis() - ), - ) - - tasks[id] = task - taskDependencies[task] = reader.get(TASK_PARENTS).map { it.toLong() }.toSet() - - (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) } - (workflow.tasks as MutableSet<Task>).add(task) - } - - // Resolve dependencies for all tasks - for ((task, deps) in taskDependencies) { - for (dep in deps) { - val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" } - (task.dependencies as MutableSet<Task>).add(parent) - } - } - } finally { - reader.close() - } - - return jobs.values.toList() - } -} diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 04f54e58..066e9685 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -22,41 +22,40 @@ package org.opendc.workflow.service -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.SdkMeterProvider import io.opentelemetry.sdk.metrics.export.MetricProducer import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.VCpuWeigher -import org.opendc.compute.simulator.SimHost +import org.opendc.compute.workload.ComputeServiceHelper +import org.opendc.compute.workload.topology.HostSpec import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.power.ConstantPowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.flow.FlowEngine -import org.opendc.telemetry.sdk.toOtelClock import org.opendc.trace.Trace -import org.opendc.workflow.service.internal.WorkflowServiceImpl -import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy +import org.opendc.workflow.workload.WorkflowSchedulerSpec +import org.opendc.workflow.workload.WorkflowServiceHelper +import org.opendc.workflow.workload.toJobs import java.nio.file.Paths import java.time.Duration import java.util.* /** - * Integration test suite for the [WorkflowServiceImpl]. + * Integration test suite for the [WorkflowService]. */ @DisplayName("WorkflowService") internal class WorkflowServiceTest { @@ -65,60 +64,39 @@ internal class WorkflowServiceTest { */ @Test fun testTrace() = runBlockingSimulation { - val meterProvider: MeterProvider = SdkMeterProvider - .builder() - .setClock(clock.toOtelClock()) - .build() - - val interpreter = FlowEngine(coroutineContext, clock) - val machineModel = createMachineModel() - val hvProvider = SimSpaceSharedHypervisorProvider() - val hosts = List(4) { id -> - SimHost( - UUID(0, id.toLong()), - "node-$id", - machineModel, - emptyMap(), - coroutineContext, - interpreter, - MeterProvider.noop(), - hvProvider, - ) - } - + // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts + val HOST_COUNT = 4 val computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) ) - val compute = ComputeService(coroutineContext, clock, MeterProvider.noop(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1)) + val computeHelper = ComputeServiceHelper(coroutineContext, clock, computeScheduler, schedulingQuantum = Duration.ofSeconds(1)) - hosts.forEach { compute.addHost(it) } + repeat(HOST_COUNT) { computeHelper.registerHost(createHostSpec(it)) } - val scheduler = WorkflowService( - coroutineContext, - clock, - meterProvider, - compute.newClient(), - mode = WorkflowSchedulerMode.Batch(100), + // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines + val workflowScheduler = WorkflowSchedulerSpec( + schedulingQuantum = Duration.ofMillis(100), jobAdmissionPolicy = NullJobAdmissionPolicy, jobOrderPolicy = SubmissionTimeJobOrderPolicy(), taskEligibilityPolicy = NullTaskEligibilityPolicy, taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), ) + val workflowHelper = WorkflowServiceHelper(coroutineContext, clock, computeHelper.service.newClient(), workflowScheduler) - val trace = Trace.open( - Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()), - format = "gwf" - ) - val replayer = TraceReplayer(trace) - - replayer.replay(clock, scheduler) + try { + val trace = Trace.open( + Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()), + format = "gwf" + ) - hosts.forEach(SimHost::close) - scheduler.close() - compute.close() + workflowHelper.replay(trace.toJobs()) + } finally { + workflowHelper.close() + computeHelper.close() + } - val metrics = collectMetrics(meterProvider as MetricProducer) + val metrics = collectMetrics(workflowHelper.metricProducer) assertAll( { assertEquals(758, metrics.jobsSubmitted, "No jobs submitted") }, @@ -126,19 +104,29 @@ internal class WorkflowServiceTest { { assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") }, { assertEquals(0, metrics.tasksActive, "Not all started tasks finished") }, { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, - { assertEquals(33213236L, clock.millis()) } + { assertEquals(33214236L, clock.millis()) { "Total duration incorrect" } } ) } /** - * The machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html + * Construct a [HostSpec] for a simulated host. */ - private fun createMachineModel(): MachineModel { + private fun createHostSpec(uid: Int): HostSpec { + // Machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html val node = ProcessingNode("AMD", "am64", "EPYC 7742", 32) - val cpus = List(node.coreCount) { id -> ProcessingUnit(node, id, 3400.0) } + val cpus = List(node.coreCount) { ProcessingUnit(node, it, 3400.0) } val memory = List(8) { MemoryUnit("Samsung", "Unknown", 2933.0, 16_000) } - return MachineModel(cpus, memory) + val machineModel = MachineModel(cpus, memory) + + return HostSpec( + UUID(0, uid.toLong()), + "host-$uid", + emptyMap(), + machineModel, + SimplePowerDriver(ConstantPowerModel(0.0)), + SimSpaceSharedHypervisorProvider() + ) } class WorkflowMetrics { diff --git a/opendc-workflow/opendc-workflow-workload/build.gradle.kts b/opendc-workflow/opendc-workflow-workload/build.gradle.kts new file mode 100644 index 00000000..dfb77a39 --- /dev/null +++ b/opendc-workflow/opendc-workflow-workload/build.gradle.kts @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Support library for simulating workflows with OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcWorkflow.opendcWorkflowService) + + implementation(projects.opendcSimulator.opendcSimulatorCompute) + implementation(projects.opendcTrace.opendcTraceApi) + implementation(projects.opendcTelemetry.opendcTelemetrySdk) + implementation(libs.opentelemetry.semconv) +} diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt new file mode 100644 index 00000000..73995d08 --- /dev/null +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("TraceHelpers") +package org.opendc.workflow.workload + +import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.trace.* +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import java.util.* +import kotlin.collections.HashMap +import kotlin.collections.HashSet +import kotlin.math.min + +/** + * Convert [Trace] into a list of [Job]s that can be submitted to the workflow service. + */ +public fun Trace.toJobs(): List<Job> { + val table = checkNotNull(getTable(TABLE_TASKS)) + val reader = table.newReader() + + val jobs = mutableMapOf<Long, Job>() + val tasks = mutableMapOf<Long, Task>() + val taskDependencies = mutableMapOf<Task, Set<Long>>() + + try { + while (reader.nextRow()) { + // Bag of tasks without workflow ID all share the same workflow + val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.get(TASK_WORKFLOW_ID).toLong() else 0L + val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) } + + val id = reader.get(TASK_ID).toLong() + val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS)) + reader.getInt(TASK_ALLOC_NCPUS) + else + reader.getInt(TASK_REQ_NCPUS) + val submitTime = reader.get(TASK_SUBMIT_TIME) + val runtime = reader.get(TASK_RUNTIME) + val flops: Long = 4000 * runtime.seconds * grantedCpus + val workload = SimFlopsWorkload(flops) + val task = Task( + UUID(0L, id), + "<unnamed>", + HashSet(), + mapOf( + "workload" to workload, + WORKFLOW_TASK_CORES to grantedCpus, + WORKFLOW_TASK_DEADLINE to runtime.toMillis() + ), + ) + + tasks[id] = task + taskDependencies[task] = reader.get(TASK_PARENTS).map { it.toLong() }.toSet() + + (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) } + (workflow.tasks as MutableSet<Task>).add(task) + } + + // Resolve dependencies for all tasks + for ((task, deps) in taskDependencies) { + for (dep in deps) { + val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" } + (task.dependencies as MutableSet<Task>).add(parent) + } + } + } finally { + reader.close() + } + + return jobs.values.toList() +} diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt new file mode 100644 index 00000000..d6a375b6 --- /dev/null +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.workload + +import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.JobOrderPolicy +import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy +import java.time.Duration + +/** + * Specification of the scheduling policies of the workflow scheduler. + */ +public data class WorkflowSchedulerSpec( + val schedulingQuantum: Duration, + val jobAdmissionPolicy: JobAdmissionPolicy, + val jobOrderPolicy: JobOrderPolicy, + val taskEligibilityPolicy: TaskEligibilityPolicy, + val taskOrderPolicy: TaskOrderPolicy, +) diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt new file mode 100644 index 00000000..0198900f --- /dev/null +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.workload + +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import org.opendc.compute.api.ComputeClient +import org.opendc.telemetry.sdk.toOtelClock +import org.opendc.workflow.api.Job +import org.opendc.workflow.service.WorkflowService +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * Helper class to simulate workflow-based workloads in OpenDC. + * + * @param context [CoroutineContext] to run the simulation in. + * @param clock [Clock] instance tracking simulation time. + * @param computeClient A [ComputeClient] instance to communicate with the cluster scheduler. + * @param schedulerSpec The configuration of the workflow scheduler. + */ +public class WorkflowServiceHelper( + private val context: CoroutineContext, + private val clock: Clock, + private val computeClient: ComputeClient, + private val schedulerSpec: WorkflowSchedulerSpec +) : AutoCloseable { + /** + * The [WorkflowService] that is constructed by this runner. + */ + public val service: WorkflowService + + /** + * The [MetricProducer] exposed by the [WorkflowService]. + */ + public val metricProducer: MetricProducer + + init { + val resource = Resource.builder() + .put(ResourceAttributes.SERVICE_NAME, "opendc-workflow") + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + metricProducer = meterProvider + + service = WorkflowService( + context, + clock, + meterProvider, + computeClient, + schedulerSpec.schedulingQuantum, + jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy, + jobOrderPolicy = schedulerSpec.jobOrderPolicy, + taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy, + taskOrderPolicy = schedulerSpec.taskOrderPolicy, + ) + } + + /** + * Run the specified list of [jobs] using the workflow service and suspend execution until all jobs have + * finished. + */ + public suspend fun replay(jobs: List<Job>) { + // Sort jobs by their arrival time + val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long } + if (orderedJobs.isEmpty()) { + return + } + + // Wait until the trace is started + val startTime = orderedJobs[0].metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long + var offset = 0L + + if (startTime != Long.MAX_VALUE) { + offset = startTime - clock.millis() + delay(offset.coerceAtLeast(0)) + } + + coroutineScope { + for (job in orderedJobs) { + val submitTime = job.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long + if (submitTime != Long.MAX_VALUE) { + delay(((submitTime - offset) - clock.millis()).coerceAtLeast(0)) + } + + launch { service.invoke(job) } + } + } + } + + override fun close() { + computeClient.close() + service.close() + } +} |
