summaryrefslogtreecommitdiff
path: root/opendc-workflow
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-workflow')
-rw-r--r--opendc-workflow/opendc-workflow-service/build.gradle.kts3
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt20
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt3
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt34
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt133
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt105
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt127
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt96
-rw-r--r--opendc-workflow/opendc-workflow-workload/build.gradle.kts39
-rw-r--r--opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt93
-rw-r--r--opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt40
-rw-r--r--opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt123
12 files changed, 445 insertions, 371 deletions
diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts
index 43b64b15..4d8b7d7f 100644
--- a/opendc-workflow/opendc-workflow-service/build.gradle.kts
+++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts
@@ -37,8 +37,9 @@ dependencies {
implementation(projects.opendcUtils)
implementation(libs.kotlin.logging)
+ testImplementation(projects.opendcWorkflow.opendcWorkflowWorkload)
+ testImplementation(projects.opendcCompute.opendcComputeWorkload)
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
- testImplementation(projects.opendcCompute.opendcComputeSimulator)
testImplementation(projects.opendcTrace.opendcTraceApi)
testImplementation(projects.opendcTelemetry.opendcTelemetrySdk)
testRuntimeOnly(projects.opendcTrace.opendcTraceGwf)
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
index a0248a93..ebace07d 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
@@ -26,29 +26,24 @@ import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.compute.api.ComputeClient
import org.opendc.workflow.api.Job
import org.opendc.workflow.service.internal.WorkflowServiceImpl
-import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
import java.time.Clock
+import java.time.Duration
import kotlin.coroutines.CoroutineContext
/**
- * A service for cloud workflow management.
+ * A service for cloud workflow execution.
*
* The workflow scheduler is modelled after the Reference Architecture for Topology Scheduling by Andreadis et al.
*/
public interface WorkflowService : AutoCloseable {
/**
- * Submit the specified [Job] to the workflow service for scheduling.
+ * Submit the specified [Job] and suspend execution until the job is finished.
*/
- public suspend fun submit(job: Job)
-
- /**
- * Run the specified [Job] and suspend execution until the job is finished.
- */
- public suspend fun run(job: Job)
+ public suspend fun invoke(job: Job)
/**
* Terminate the lifecycle of the workflow service, stopping all running workflows.
@@ -61,10 +56,9 @@ public interface WorkflowService : AutoCloseable {
*
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
- * @param tracer The event tracer to use.
* @param meterProvider The meter provider to use.
* @param compute The compute client to use.
- * @param mode The scheduling mode to use.
+ * @param schedulingQuantum The scheduling quantum to use (minimum duration between scheduling cycles).
* @param jobAdmissionPolicy The job admission policy to use.
* @param jobOrderPolicy The job order policy to use.
* @param taskEligibilityPolicy The task eligibility policy to use.
@@ -75,7 +69,7 @@ public interface WorkflowService : AutoCloseable {
clock: Clock,
meterProvider: MeterProvider,
compute: ComputeClient,
- mode: WorkflowSchedulerMode,
+ schedulingQuantum: Duration,
jobAdmissionPolicy: JobAdmissionPolicy,
jobOrderPolicy: JobOrderPolicy,
taskEligibilityPolicy: TaskEligibilityPolicy,
@@ -86,7 +80,7 @@ public interface WorkflowService : AutoCloseable {
clock,
meterProvider,
compute,
- mode,
+ schedulingQuantum,
jobAdmissionPolicy,
jobOrderPolicy,
taskEligibilityPolicy,
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt
index 1bb67169..d2fc1ed7 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/JobState.kt
@@ -23,8 +23,9 @@
package org.opendc.workflow.service.internal
import org.opendc.workflow.api.Job
+import kotlin.coroutines.Continuation
-public class JobState(public val job: Job, public val submittedAt: Long) {
+public class JobState(public val job: Job, public val submittedAt: Long, internal val cont: Continuation<Unit>) {
/**
* A flag to indicate whether this job is finished.
*/
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt
index 29c6aeea..8e76a36e 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowSchedulerListener.kt
@@ -22,16 +22,44 @@
package org.opendc.workflow.service.internal
-public interface WorkflowSchedulerListener {
- public fun cycleStarted(scheduler: WorkflowServiceImpl) {}
- public fun cycleFinished(scheduler: WorkflowServiceImpl) {}
+import org.opendc.workflow.service.WorkflowService
+/**
+ * Interface for listening to events emitted by the [WorkflowService].
+ */
+public interface WorkflowSchedulerListener {
+ /**
+ * This method is invoked when [job] is submitted to the service.
+ */
public fun jobSubmitted(job: JobState) {}
+
+ /**
+ * This method is invoked when [job] is started by the service.
+ */
public fun jobStarted(job: JobState) {}
+
+ /**
+ * This method is invoked when [job] finishes.
+ */
public fun jobFinished(job: JobState) {}
+ /**
+ * This method is invoked when [task] becomes ready to be scheduled.
+ */
public fun taskReady(task: TaskState) {}
+
+ /**
+ * This method is invoked when [task] is assigned to a machine.
+ */
public fun taskAssigned(task: TaskState) {}
+
+ /**
+ * This method is invoked when [task] is started.
+ */
public fun taskStarted(task: TaskState) {}
+
+ /**
+ * This method is invoked when [task] finishes.
+ */
public fun taskFinished(task: TaskState) {}
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index a0fd3fad..1cadce44 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -25,20 +25,18 @@ package org.opendc.workflow.service.internal
import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.map
-import mu.KotlinLogging
import org.opendc.compute.api.*
+import org.opendc.utils.TimerScheduler
import org.opendc.workflow.api.Job
import org.opendc.workflow.api.WORKFLOW_TASK_CORES
import org.opendc.workflow.service.*
-import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
import java.time.Clock
+import java.time.Duration
import java.util.*
-import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
@@ -48,10 +46,10 @@ import kotlin.coroutines.resume
*/
public class WorkflowServiceImpl(
context: CoroutineContext,
- internal val clock: Clock,
+ private val clock: Clock,
meterProvider: MeterProvider,
private val computeClient: ComputeClient,
- mode: WorkflowSchedulerMode,
+ private val schedulingQuantum: Duration,
jobAdmissionPolicy: JobAdmissionPolicy,
jobOrderPolicy: JobOrderPolicy,
taskEligibilityPolicy: TaskEligibilityPolicy,
@@ -60,12 +58,7 @@ public class WorkflowServiceImpl(
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
*/
- internal val scope = CoroutineScope(context + Job())
-
- /**
- * The logger instance to use.
- */
- private val logger = KotlinLogging.logger {}
+ private val scope = CoroutineScope(context + Job())
/**
* The [Meter] to collect metrics of this service.
@@ -75,22 +68,22 @@ public class WorkflowServiceImpl(
/**
* The incoming jobs ready to be processed by the scheduler.
*/
- internal val incomingJobs: MutableSet<JobState> = linkedSetOf()
+ private val incomingJobs: MutableSet<JobState> = linkedSetOf()
/**
* The incoming tasks ready to be processed by the scheduler.
*/
- internal val incomingTasks: MutableSet<TaskState> = linkedSetOf()
+ private val incomingTasks: MutableSet<TaskState> = linkedSetOf()
/**
* The job queue.
*/
- internal val jobQueue: Queue<JobState>
+ private val jobQueue: Queue<JobState>
/**
* The task queue.
*/
- internal val taskQueue: Queue<TaskState>
+ private val taskQueue: Queue<TaskState>
/**
* The active jobs in the system.
@@ -105,12 +98,7 @@ public class WorkflowServiceImpl(
/**
* The running tasks by [Server].
*/
- internal val taskByServer = mutableMapOf<Server, TaskState>()
-
- /**
- * The continuation of the jobs.
- */
- private val conts = mutableMapOf<Job, Continuation<Unit>>()
+ private val taskByServer = mutableMapOf<Server, TaskState>()
/**
* The root listener of this scheduler.
@@ -119,15 +107,7 @@ public class WorkflowServiceImpl(
/**
* The listeners to delegate to.
*/
- val listeners = mutableSetOf<WorkflowSchedulerListener>()
-
- override fun cycleStarted(scheduler: WorkflowServiceImpl) {
- listeners.forEach { it.cycleStarted(scheduler) }
- }
-
- override fun cycleFinished(scheduler: WorkflowServiceImpl) {
- listeners.forEach { it.cycleFinished(scheduler) }
- }
+ val listeners = mutableListOf<WorkflowSchedulerListener>()
override fun jobSubmitted(job: JobState) {
listeners.forEach { it.jobSubmitted(job) }
@@ -206,25 +186,27 @@ public class WorkflowServiceImpl(
.setUnit("1")
.build()
- private val mode: WorkflowSchedulerMode.Logic
+ /**
+ * The [TimerScheduler] to use for scheduling the scheduler cycles.
+ */
+ private val timerScheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock)
+
private val jobAdmissionPolicy: JobAdmissionPolicy.Logic
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
private lateinit var image: Image
init {
- this.mode = mode(this)
this.jobAdmissionPolicy = jobAdmissionPolicy(this)
this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid })
this.taskEligibilityPolicy = taskEligibilityPolicy(this)
this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid })
- scope.launch {
- image = computeClient.newImage("workflow-runner")
- }
+
+ scope.launch { image = computeClient.newImage("workflow-runner") }
}
- override suspend fun run(job: Job) {
+ override suspend fun invoke(job: Job): Unit = suspendCancellableCoroutine { cont ->
// J1 Incoming Jobs
- val jobInstance = JobState(job, clock.millis())
+ val jobInstance = JobState(job, clock.millis(), cont)
val instances = job.tasks.associateWith {
TaskState(jobInstance, it)
}
@@ -243,36 +225,61 @@ public class WorkflowServiceImpl(
submittedTasks.add(1)
}
- return suspendCancellableCoroutine { cont ->
- instances.values.toCollection(jobInstance.tasks)
- incomingJobs += jobInstance
- rootListener.jobSubmitted(jobInstance)
- conts[job] = cont
+ instances.values.toCollection(jobInstance.tasks)
+ incomingJobs += jobInstance
+ rootListener.jobSubmitted(jobInstance)
+ submittedJobs.add(1)
- submittedJobs.add(1)
+ requestSchedulingCycle()
+ }
- requestCycle()
- }
+ override fun close() {
+ scope.cancel()
}
- override suspend fun submit(job: Job) {
- scope.launch { run(job) }
+ /**
+ * Register a [WorkflowSchedulerListener].
+ */
+ public fun addListener(listener: WorkflowSchedulerListener) {
+ rootListener.listeners += listener
}
- override fun close() {
- scope.cancel()
+ /**
+ * Unregister a [WorkflowSchedulerListener].
+ */
+ public fun removeListener(listener: WorkflowSchedulerListener) {
+ rootListener.listeners -= listener
}
/**
- * Indicate to the scheduler that a scheduling cycle is needed.
+ * Indicate that a new scheduling cycle is needed due to a change to the service's state.
*/
- private fun requestCycle() = mode.requestCycle()
+ private fun requestSchedulingCycle() {
+ // Bail out in case we have already requested a new cycle or the queue is empty.
+ if (timerScheduler.isTimerActive(Unit) || incomingJobs.isEmpty()) {
+ return
+ }
+
+ val quantum = schedulingQuantum.toMillis()
+ if (quantum == 0L) {
+ doSchedule()
+ return
+ }
+
+ // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
+ // This is important because the slices of the VMs need to be aligned.
+ // We calculate here the delay until the next scheduling slot.
+ val delay = quantum - (clock.millis() % quantum)
+
+ timerScheduler.startSingleTimer(Unit, delay) {
+ doSchedule()
+ }
+ }
/**
* Perform a scheduling cycle immediately.
*/
- @OptIn(ExperimentalCoroutinesApi::class)
- internal suspend fun schedule() {
+ private fun doSchedule() {
// J2 Create list of eligible jobs
val iterator = incomingJobs.iterator()
while (iterator.hasNext()) {
@@ -293,8 +300,8 @@ public class WorkflowServiceImpl(
}
// J4 Per job
- while (jobQueue.isNotEmpty()) {
- val jobInstance = jobQueue.poll()
+ while (true) {
+ val jobInstance = jobQueue.poll() ?: break
// Edge-case: job has no tasks
if (jobInstance.isFinished) {
@@ -361,7 +368,7 @@ public class WorkflowServiceImpl(
}
}
- public override fun onStateChanged(server: Server, newState: ServerState) {
+ override fun onStateChanged(server: Server, newState: ServerState) {
when (newState) {
ServerState.PROVISIONING -> {}
ServerState.RUNNING -> {
@@ -402,7 +409,7 @@ public class WorkflowServiceImpl(
finishJob(job)
}
- requestCycle()
+ requestSchedulingCycle()
}
ServerState.DELETED -> {
}
@@ -416,14 +423,6 @@ public class WorkflowServiceImpl(
finishedJobs.add(1)
rootListener.jobFinished(job)
- conts.remove(job.job)?.resume(Unit)
- }
-
- public fun addListener(listener: WorkflowSchedulerListener) {
- rootListener.listeners += listener
- }
-
- public fun removeListener(listener: WorkflowSchedulerListener) {
- rootListener.listeners -= listener
+ job.cont.resume(Unit)
}
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt
deleted file mode 100644
index 58e7893f..00000000
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/WorkflowSchedulerMode.kt
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflow.service.scheduler
-
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import org.opendc.workflow.service.internal.WorkflowServiceImpl
-
-/**
- * The operating mode of a workflow scheduler.
- */
-public sealed class WorkflowSchedulerMode : StagePolicy<WorkflowSchedulerMode.Logic> {
- /**
- * The logic for operating the cycles of a workflow scheduler.
- */
- public interface Logic {
- /**
- * Request a new scheduling cycle to be performed.
- */
- public fun requestCycle()
- }
-
- /**
- * An interactive scheduler immediately triggers a new scheduling cycle when a workflow is received.
- */
- public object Interactive : WorkflowSchedulerMode() {
- override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic {
- override fun requestCycle() {
- scheduler.scope.launch { scheduler.schedule() }
- }
- }
-
- override fun toString(): String = "Interactive"
- }
-
- /**
- * A batch scheduler triggers a scheduling cycle every time quantum if needed.
- */
- public data class Batch(val quantum: Long) : WorkflowSchedulerMode() {
- private var next: kotlinx.coroutines.Job? = null
-
- override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic {
- override fun requestCycle() {
- if (next == null) {
- // In batch mode, we assume that the scheduler runs at a fixed slot every time
- // quantum (e.g t=0, t=60, t=120). We calculate here the delay until the next scheduling slot.
- val delay = quantum - (scheduler.clock.millis() % quantum)
-
- val job = scheduler.scope.launch {
- delay(delay)
- next = null
- scheduler.schedule()
- }
- next = job
- }
- }
- }
-
- override fun toString(): String = "Batch($quantum)"
- }
-
- /**
- * A scheduling cycle is triggered at a random point in time.
- */
- public data class Random(private val random: java.util.Random = java.util.Random(123)) : WorkflowSchedulerMode() {
- private var next: kotlinx.coroutines.Job? = null
-
- override fun invoke(scheduler: WorkflowServiceImpl): Logic = object : Logic {
- override fun requestCycle() {
- if (next == null) {
- val delay = random.nextInt(200).toLong()
-
- val job = scheduler.scope.launch {
- delay(delay)
- next = null
- scheduler.schedule()
- }
- next = job
- }
- }
- }
-
- override fun toString(): String = "Random"
- }
-}
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt
deleted file mode 100644
index 9ee3736e..00000000
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflow.service
-
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.trace.*
-import org.opendc.workflow.api.Job
-import org.opendc.workflow.api.Task
-import org.opendc.workflow.api.WORKFLOW_TASK_CORES
-import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
-import java.time.Clock
-import java.util.*
-import kotlin.collections.HashMap
-import kotlin.collections.HashSet
-import kotlin.math.max
-import kotlin.math.min
-
-/**
- * Helper tool to replay workflow trace.
- */
-internal class TraceReplayer(private val trace: Trace) {
- /**
- * Replay the workload.
- */
- public suspend fun replay(clock: Clock, service: WorkflowService) {
- val jobs = parseTrace(trace)
-
- // Sort jobs by their arrival time
- (jobs as MutableList<Job>).sortBy { it.metadata["WORKFLOW_SUBMIT_TIME"] as Long }
-
- // Wait until the trace is started
- val startTime = jobs[0].metadata["WORKFLOW_SUBMIT_TIME"] as Long
- delay(min(0L, startTime - clock.millis()))
-
- val offset = startTime - clock.millis()
-
- coroutineScope {
- for (job in jobs) {
- val submitTime = job.metadata["WORKFLOW_SUBMIT_TIME"] as Long
- delay(max(0, (submitTime - offset) - clock.millis()))
-
- launch { service.run(job) }
- }
- }
- }
-
- /**
- * Convert [trace] into a list of [Job]s that can be submitted to the workflow service.
- */
- public fun parseTrace(trace: Trace): List<Job> {
- val table = checkNotNull(trace.getTable(TABLE_TASKS))
- val reader = table.newReader()
-
- val jobs = mutableMapOf<Long, Job>()
- val tasks = mutableMapOf<Long, Task>()
- val taskDependencies = mutableMapOf<Task, Set<Long>>()
-
- try {
- while (reader.nextRow()) {
- // Bag of tasks without workflow ID all share the same workflow
- val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.get(TASK_WORKFLOW_ID).toLong() else 0L
- val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) }
-
- val id = reader.get(TASK_ID).toLong()
- val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS))
- reader.getInt(TASK_ALLOC_NCPUS)
- else
- reader.getInt(TASK_REQ_NCPUS)
- val submitTime = reader.get(TASK_SUBMIT_TIME)
- val runtime = reader.get(TASK_RUNTIME)
- val flops: Long = 4000 * runtime.seconds * grantedCpus
- val workload = SimFlopsWorkload(flops)
- val task = Task(
- UUID(0L, id),
- "<unnamed>",
- HashSet(),
- mapOf(
- "workload" to workload,
- WORKFLOW_TASK_CORES to grantedCpus,
- WORKFLOW_TASK_DEADLINE to runtime.toMillis()
- ),
- )
-
- tasks[id] = task
- taskDependencies[task] = reader.get(TASK_PARENTS).map { it.toLong() }.toSet()
-
- (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) }
- (workflow.tasks as MutableSet<Task>).add(task)
- }
-
- // Resolve dependencies for all tasks
- for ((task, deps) in taskDependencies) {
- for (dep in deps) {
- val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" }
- (task.dependencies as MutableSet<Task>).add(parent)
- }
- }
- } finally {
- reader.close()
- }
-
- return jobs.values.toList()
- }
-}
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index 04f54e58..066e9685 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -22,41 +22,40 @@
package org.opendc.workflow.service
-import io.opentelemetry.api.metrics.MeterProvider
-import io.opentelemetry.sdk.metrics.SdkMeterProvider
import io.opentelemetry.sdk.metrics.export.MetricProducer
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.FilterScheduler
import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.VCpuWeigher
-import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.workload.ComputeServiceHelper
+import org.opendc.compute.workload.topology.HostSpec
import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.power.ConstantPowerModel
+import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.simulator.flow.FlowEngine
-import org.opendc.telemetry.sdk.toOtelClock
import org.opendc.trace.Trace
-import org.opendc.workflow.service.internal.WorkflowServiceImpl
-import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy
import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy
+import org.opendc.workflow.workload.WorkflowSchedulerSpec
+import org.opendc.workflow.workload.WorkflowServiceHelper
+import org.opendc.workflow.workload.toJobs
import java.nio.file.Paths
import java.time.Duration
import java.util.*
/**
- * Integration test suite for the [WorkflowServiceImpl].
+ * Integration test suite for the [WorkflowService].
*/
@DisplayName("WorkflowService")
internal class WorkflowServiceTest {
@@ -65,60 +64,39 @@ internal class WorkflowServiceTest {
*/
@Test
fun testTrace() = runBlockingSimulation {
- val meterProvider: MeterProvider = SdkMeterProvider
- .builder()
- .setClock(clock.toOtelClock())
- .build()
-
- val interpreter = FlowEngine(coroutineContext, clock)
- val machineModel = createMachineModel()
- val hvProvider = SimSpaceSharedHypervisorProvider()
- val hosts = List(4) { id ->
- SimHost(
- UUID(0, id.toLong()),
- "node-$id",
- machineModel,
- emptyMap(),
- coroutineContext,
- interpreter,
- MeterProvider.noop(),
- hvProvider,
- )
- }
-
+ // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts
+ val HOST_COUNT = 4
val computeScheduler = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0))
)
- val compute = ComputeService(coroutineContext, clock, MeterProvider.noop(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1))
+ val computeHelper = ComputeServiceHelper(coroutineContext, clock, computeScheduler, schedulingQuantum = Duration.ofSeconds(1))
- hosts.forEach { compute.addHost(it) }
+ repeat(HOST_COUNT) { computeHelper.registerHost(createHostSpec(it)) }
- val scheduler = WorkflowService(
- coroutineContext,
- clock,
- meterProvider,
- compute.newClient(),
- mode = WorkflowSchedulerMode.Batch(100),
+ // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines
+ val workflowScheduler = WorkflowSchedulerSpec(
+ schedulingQuantum = Duration.ofMillis(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
taskEligibilityPolicy = NullTaskEligibilityPolicy,
taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
)
+ val workflowHelper = WorkflowServiceHelper(coroutineContext, clock, computeHelper.service.newClient(), workflowScheduler)
- val trace = Trace.open(
- Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()),
- format = "gwf"
- )
- val replayer = TraceReplayer(trace)
-
- replayer.replay(clock, scheduler)
+ try {
+ val trace = Trace.open(
+ Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()),
+ format = "gwf"
+ )
- hosts.forEach(SimHost::close)
- scheduler.close()
- compute.close()
+ workflowHelper.replay(trace.toJobs())
+ } finally {
+ workflowHelper.close()
+ computeHelper.close()
+ }
- val metrics = collectMetrics(meterProvider as MetricProducer)
+ val metrics = collectMetrics(workflowHelper.metricProducer)
assertAll(
{ assertEquals(758, metrics.jobsSubmitted, "No jobs submitted") },
@@ -126,19 +104,29 @@ internal class WorkflowServiceTest {
{ assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") },
{ assertEquals(0, metrics.tasksActive, "Not all started tasks finished") },
{ assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") },
- { assertEquals(33213236L, clock.millis()) }
+ { assertEquals(33214236L, clock.millis()) { "Total duration incorrect" } }
)
}
/**
- * The machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html
+ * Construct a [HostSpec] for a simulated host.
*/
- private fun createMachineModel(): MachineModel {
+ private fun createHostSpec(uid: Int): HostSpec {
+ // Machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html
val node = ProcessingNode("AMD", "am64", "EPYC 7742", 32)
- val cpus = List(node.coreCount) { id -> ProcessingUnit(node, id, 3400.0) }
+ val cpus = List(node.coreCount) { ProcessingUnit(node, it, 3400.0) }
val memory = List(8) { MemoryUnit("Samsung", "Unknown", 2933.0, 16_000) }
- return MachineModel(cpus, memory)
+ val machineModel = MachineModel(cpus, memory)
+
+ return HostSpec(
+ UUID(0, uid.toLong()),
+ "host-$uid",
+ emptyMap(),
+ machineModel,
+ SimplePowerDriver(ConstantPowerModel(0.0)),
+ SimSpaceSharedHypervisorProvider()
+ )
}
class WorkflowMetrics {
diff --git a/opendc-workflow/opendc-workflow-workload/build.gradle.kts b/opendc-workflow/opendc-workflow-workload/build.gradle.kts
new file mode 100644
index 00000000..dfb77a39
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-workload/build.gradle.kts
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Support library for simulating workflows with OpenDC"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+ `testing-conventions`
+}
+
+dependencies {
+ api(platform(projects.opendcPlatform))
+ api(projects.opendcWorkflow.opendcWorkflowService)
+
+ implementation(projects.opendcSimulator.opendcSimulatorCompute)
+ implementation(projects.opendcTrace.opendcTraceApi)
+ implementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ implementation(libs.opentelemetry.semconv)
+}
diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt
new file mode 100644
index 00000000..73995d08
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("TraceHelpers")
+package org.opendc.workflow.workload
+
+import org.opendc.simulator.compute.workload.SimFlopsWorkload
+import org.opendc.trace.*
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.api.Task
+import org.opendc.workflow.api.WORKFLOW_TASK_CORES
+import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
+import java.util.*
+import kotlin.collections.HashMap
+import kotlin.collections.HashSet
+import kotlin.math.min
+
+/**
+ * Convert [Trace] into a list of [Job]s that can be submitted to the workflow service.
+ */
+public fun Trace.toJobs(): List<Job> {
+ val table = checkNotNull(getTable(TABLE_TASKS))
+ val reader = table.newReader()
+
+ val jobs = mutableMapOf<Long, Job>()
+ val tasks = mutableMapOf<Long, Task>()
+ val taskDependencies = mutableMapOf<Task, Set<Long>>()
+
+ try {
+ while (reader.nextRow()) {
+ // Bag of tasks without workflow ID all share the same workflow
+ val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.get(TASK_WORKFLOW_ID).toLong() else 0L
+ val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) }
+
+ val id = reader.get(TASK_ID).toLong()
+ val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS))
+ reader.getInt(TASK_ALLOC_NCPUS)
+ else
+ reader.getInt(TASK_REQ_NCPUS)
+ val submitTime = reader.get(TASK_SUBMIT_TIME)
+ val runtime = reader.get(TASK_RUNTIME)
+ val flops: Long = 4000 * runtime.seconds * grantedCpus
+ val workload = SimFlopsWorkload(flops)
+ val task = Task(
+ UUID(0L, id),
+ "<unnamed>",
+ HashSet(),
+ mapOf(
+ "workload" to workload,
+ WORKFLOW_TASK_CORES to grantedCpus,
+ WORKFLOW_TASK_DEADLINE to runtime.toMillis()
+ ),
+ )
+
+ tasks[id] = task
+ taskDependencies[task] = reader.get(TASK_PARENTS).map { it.toLong() }.toSet()
+
+ (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) }
+ (workflow.tasks as MutableSet<Task>).add(task)
+ }
+
+ // Resolve dependencies for all tasks
+ for ((task, deps) in taskDependencies) {
+ for (dep in deps) {
+ val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" }
+ (task.dependencies as MutableSet<Task>).add(parent)
+ }
+ }
+ } finally {
+ reader.close()
+ }
+
+ return jobs.values.toList()
+}
diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt
new file mode 100644
index 00000000..d6a375b6
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.workload
+
+import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
+import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
+import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
+import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
+import java.time.Duration
+
+/**
+ * Specification of the scheduling policies of the workflow scheduler.
+ */
+public data class WorkflowSchedulerSpec(
+ val schedulingQuantum: Duration,
+ val jobAdmissionPolicy: JobAdmissionPolicy,
+ val jobOrderPolicy: JobOrderPolicy,
+ val taskEligibilityPolicy: TaskEligibilityPolicy,
+ val taskOrderPolicy: TaskOrderPolicy,
+)
diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
new file mode 100644
index 00000000..0198900f
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
@@ -0,0 +1,123 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.workload
+
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import io.opentelemetry.sdk.resources.Resource
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import org.opendc.compute.api.ComputeClient
+import org.opendc.telemetry.sdk.toOtelClock
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.service.WorkflowService
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Helper class to simulate workflow-based workloads in OpenDC.
+ *
+ * @param context [CoroutineContext] to run the simulation in.
+ * @param clock [Clock] instance tracking simulation time.
+ * @param computeClient A [ComputeClient] instance to communicate with the cluster scheduler.
+ * @param schedulerSpec The configuration of the workflow scheduler.
+ */
+public class WorkflowServiceHelper(
+ private val context: CoroutineContext,
+ private val clock: Clock,
+ private val computeClient: ComputeClient,
+ private val schedulerSpec: WorkflowSchedulerSpec
+) : AutoCloseable {
+ /**
+ * The [WorkflowService] that is constructed by this runner.
+ */
+ public val service: WorkflowService
+
+ /**
+ * The [MetricProducer] exposed by the [WorkflowService].
+ */
+ public val metricProducer: MetricProducer
+
+ init {
+ val resource = Resource.builder()
+ .put(ResourceAttributes.SERVICE_NAME, "opendc-workflow")
+ .build()
+
+ val meterProvider = SdkMeterProvider.builder()
+ .setClock(clock.toOtelClock())
+ .setResource(resource)
+ .build()
+ metricProducer = meterProvider
+
+ service = WorkflowService(
+ context,
+ clock,
+ meterProvider,
+ computeClient,
+ schedulerSpec.schedulingQuantum,
+ jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy,
+ jobOrderPolicy = schedulerSpec.jobOrderPolicy,
+ taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy,
+ taskOrderPolicy = schedulerSpec.taskOrderPolicy,
+ )
+ }
+
+ /**
+ * Run the specified list of [jobs] using the workflow service and suspend execution until all jobs have
+ * finished.
+ */
+ public suspend fun replay(jobs: List<Job>) {
+ // Sort jobs by their arrival time
+ val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long }
+ if (orderedJobs.isEmpty()) {
+ return
+ }
+
+ // Wait until the trace is started
+ val startTime = orderedJobs[0].metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long
+ var offset = 0L
+
+ if (startTime != Long.MAX_VALUE) {
+ offset = startTime - clock.millis()
+ delay(offset.coerceAtLeast(0))
+ }
+
+ coroutineScope {
+ for (job in orderedJobs) {
+ val submitTime = job.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long
+ if (submitTime != Long.MAX_VALUE) {
+ delay(((submitTime - offset) - clock.millis()).coerceAtLeast(0))
+ }
+
+ launch { service.invoke(job) }
+ }
+ }
+ }
+
+ override fun close() {
+ computeClient.close()
+ service.close()
+ }
+}