summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-09-28 15:51:05 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-03 17:35:58 +0200
commitc453e27abe54221f76648bc91edadb2efcd1ec07 (patch)
tree2eb75de390dc735519c6d29bf2a6d50694436d26
parent115e37984624a409bc1ad4f54bf10c9537183390 (diff)
feat(exp/workflow): Add provisioning step for workflow service
This change adds a new module `opendc-experiments-workflow` that provides provisioner implementations for experiments to use for setting up and using the workflow engine in OpenDC.
-rw-r--r--opendc-experiments/opendc-experiments-compute/build.gradle.kts3
-rw-r--r--opendc-experiments/opendc-experiments-workflow/build.gradle.kts (renamed from opendc-workflow/opendc-workflow-workload/build.gradle.kts)10
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt (renamed from opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt)40
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt (renamed from opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt)4
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt66
-rw-r--r--opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt40
-rw-r--r--opendc-workflow/opendc-workflow-service/build.gradle.kts4
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt96
-rw-r--r--opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt98
-rw-r--r--settings.gradle.kts2
10 files changed, 210 insertions, 153 deletions
diff --git a/opendc-experiments/opendc-experiments-compute/build.gradle.kts b/opendc-experiments/opendc-experiments-compute/build.gradle.kts
index a4ea0b02..70f16199 100644
--- a/opendc-experiments/opendc-experiments-compute/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-compute/build.gradle.kts
@@ -31,6 +31,5 @@ plugins {
dependencies {
api(projects.opendcExperiments.opendcExperimentsBase)
-
- implementation(projects.opendcCompute.opendcComputeWorkload)
+ api(projects.opendcCompute.opendcComputeWorkload)
}
diff --git a/opendc-workflow/opendc-workflow-workload/build.gradle.kts b/opendc-experiments/opendc-experiments-workflow/build.gradle.kts
index 17eadf29..4fc34d2d 100644
--- a/opendc-workflow/opendc-workflow-workload/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-workflow/build.gradle.kts
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -25,11 +25,17 @@ description = "Support library for simulating workflows with OpenDC"
/* Build configuration */
plugins {
`kotlin-library-conventions`
+ `testing-conventions`
+ `jacoco-conventions`
}
dependencies {
- api(projects.opendcWorkflow.opendcWorkflowService)
+ api(projects.opendcExperiments.opendcExperimentsBase)
+ api(projects.opendcWorkflow.opendcWorkflowApi)
+ implementation(libs.kotlinx.coroutines)
+ implementation(projects.opendcCompute.opendcComputeService)
+ implementation(projects.opendcWorkflow.opendcWorkflowService)
implementation(projects.opendcSimulator.opendcSimulatorCompute)
implementation(projects.opendcTrace.opendcTraceApi)
}
diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
index 5f57723b..a15d3d5b 100644
--- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/TraceHelpers.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -21,8 +21,11 @@
*/
@file:JvmName("TraceHelpers")
-package org.opendc.workflow.workload
+package org.opendc.experiments.workflow
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.trace.*
import org.opendc.trace.conv.*
@@ -30,6 +33,8 @@ import org.opendc.workflow.api.Job
import org.opendc.workflow.api.Task
import org.opendc.workflow.api.WORKFLOW_TASK_CORES
import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
+import org.opendc.workflow.service.WorkflowService
+import java.time.Clock
import java.util.*
import kotlin.collections.HashMap
import kotlin.collections.HashSet
@@ -92,3 +97,34 @@ public fun Trace.toJobs(): List<Job> {
return jobs.values.toList()
}
+
+/**
+ * Helper method to replay the specified list of [jobs] and suspend execution util all jobs have finished.
+ */
+public suspend fun WorkflowService.replay(clock: Clock, jobs: List<Job>) {
+ // Sort jobs by their arrival time
+ val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long }
+ if (orderedJobs.isEmpty()) {
+ return
+ }
+
+ // Wait until the trace is started
+ val startTime = orderedJobs[0].metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long
+ var offset = 0L
+
+ if (startTime != Long.MAX_VALUE) {
+ offset = startTime - clock.millis()
+ delay(offset.coerceAtLeast(0))
+ }
+
+ coroutineScope {
+ for (job in orderedJobs) {
+ val submitTime = job.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long
+ if (submitTime != Long.MAX_VALUE) {
+ delay(((submitTime - offset) - clock.millis()).coerceAtLeast(0))
+ }
+
+ launch { invoke(job) }
+ }
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt
index d6a375b6..cb8056a7 100644
--- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSchedulerSpec.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.workflow.workload
+package org.opendc.experiments.workflow
import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
new file mode 100644
index 00000000..a2d6a172
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowServiceProvisioningStep.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.workflow
+
+import org.opendc.compute.service.ComputeService
+import org.opendc.experiments.provisioner.ProvisioningContext
+import org.opendc.experiments.provisioner.ProvisioningStep
+import org.opendc.workflow.service.WorkflowService
+import java.time.Duration
+
+/**
+ * A [ProvisioningStep] that provisions a [WorkflowService].
+ *
+ * @param serviceDomain The domain name under which to register the workflow service.
+ * @param computeService The domain name where the underlying compute service is located.
+ * @param scheduler The configuration of the scheduler of the workflow engine.
+ * @param schedulingQuantum The scheduling quantum of the compute scheduler.
+ */
+public class WorkflowServiceProvisioningStep internal constructor(
+ private val serviceDomain: String,
+ private val computeService: String,
+ private val scheduler: WorkflowSchedulerSpec,
+ private val schedulingQuantum: Duration
+) : ProvisioningStep {
+ override fun apply(ctx: ProvisioningContext): AutoCloseable {
+ val computeService = requireNotNull(ctx.registry.resolve(computeService, ComputeService::class.java)) { "Compute service $computeService does not exist" }
+
+ val client = computeService.newClient()
+ val service = WorkflowService(
+ ctx.coroutineContext,
+ ctx.clock,
+ client,
+ scheduler.schedulingQuantum,
+ jobAdmissionPolicy = scheduler.jobAdmissionPolicy,
+ jobOrderPolicy = scheduler.jobOrderPolicy,
+ taskEligibilityPolicy = scheduler.taskEligibilityPolicy,
+ taskOrderPolicy = scheduler.taskOrderPolicy,
+ )
+ ctx.registry.register(serviceDomain, WorkflowService::class.java, service)
+
+ return AutoCloseable {
+ service.close()
+ client.close()
+ }
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt
new file mode 100644
index 00000000..7aae3a9f
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-workflow/src/main/kotlin/org/opendc/experiments/workflow/WorkflowSteps.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("WorkflowSteps")
+package org.opendc.experiments.workflow
+
+import org.opendc.experiments.provisioner.ProvisioningStep
+import org.opendc.workflow.service.WorkflowService
+import java.time.Duration
+
+/**
+ * Return a [ProvisioningStep] that sets up a [WorkflowService].
+ */
+public fun setupWorkflowService(
+ serviceDomain: String,
+ computeService: String,
+ scheduler: WorkflowSchedulerSpec,
+ schedulingQuantum: Duration = Duration.ofMinutes(5)
+): ProvisioningStep {
+ return WorkflowServiceProvisioningStep(serviceDomain, computeService, scheduler, schedulingQuantum)
+}
diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts
index b6365885..6908a5af 100644
--- a/opendc-workflow/opendc-workflow-service/build.gradle.kts
+++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts
@@ -33,9 +33,9 @@ dependencies {
implementation(projects.opendcCommon)
implementation(libs.kotlin.logging)
- testImplementation(projects.opendcWorkflow.opendcWorkflowWorkload)
- testImplementation(projects.opendcCompute.opendcComputeWorkload)
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
+ testImplementation(projects.opendcExperiments.opendcExperimentsCompute)
+ testImplementation(projects.opendcExperiments.opendcExperimentsWorkflow)
testImplementation(projects.opendcTrace.opendcTraceApi)
testRuntimeOnly(projects.opendcTrace.opendcTraceGwf)
testRuntimeOnly(libs.log4j.slf4j)
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index 0fb8b67c..49496fed 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -22,17 +22,22 @@
package org.opendc.workflow.service
+import org.junit.jupiter.api.Assertions.assertAll
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertAll
+import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.service.scheduler.FilterScheduler
import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.VCpuWeigher
-import org.opendc.compute.workload.ComputeServiceHelper
import org.opendc.compute.workload.topology.HostSpec
+import org.opendc.experiments.compute.setupComputeService
+import org.opendc.experiments.compute.setupHosts
+import org.opendc.experiments.provisioner.Provisioner
+import org.opendc.experiments.provisioner.ProvisioningContext
+import org.opendc.experiments.workflow.*
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
@@ -46,9 +51,6 @@ import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy
import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy
-import org.opendc.workflow.workload.WorkflowSchedulerSpec
-import org.opendc.workflow.workload.WorkflowServiceHelper
-import org.opendc.workflow.workload.toJobs
import java.nio.file.Paths
import java.time.Duration
import java.util.*
@@ -63,55 +65,61 @@ internal class WorkflowServiceTest {
*/
@Test
fun testTrace() = runBlockingSimulation {
- // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts
- val computeScheduler = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
- weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0))
- )
+ val computeService = "compute.opendc.org"
+ val workflowService = "workflow.opendc.org"
- val computeHelper = ComputeServiceHelper(
- coroutineContext,
- clock,
- computeScheduler,
- seed = 0,
- schedulingQuantum = Duration.ofSeconds(1)
- )
+ Provisioner(coroutineContext, clock, seed = 0L).use { provisioner ->
+ val scheduler: (ProvisioningContext) -> ComputeScheduler = {
+ FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
+ weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0))
+ )
+ }
- val hostCount = 4
- repeat(hostCount) { computeHelper.registerHost(createHostSpec(it)) }
+ provisioner.runSteps(
+ // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts
+ setupComputeService(computeService, scheduler, schedulingQuantum = Duration.ofSeconds(1)),
+ setupHosts(computeService, List(4) { createHostSpec(it) }),
- // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines
- val workflowScheduler = WorkflowSchedulerSpec(
- schedulingQuantum = Duration.ofMillis(100),
- jobAdmissionPolicy = NullJobAdmissionPolicy,
- jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
- taskEligibilityPolicy = NullTaskEligibilityPolicy,
- taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
- )
- val workflowHelper = WorkflowServiceHelper(coroutineContext, clock, computeHelper.service.newClient(), workflowScheduler)
+ // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines
+ setupWorkflowService(
+ workflowService,
+ computeService,
+ WorkflowSchedulerSpec(
+ schedulingQuantum = Duration.ofMillis(100),
+ jobAdmissionPolicy = NullJobAdmissionPolicy,
+ jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
+ taskEligibilityPolicy = NullTaskEligibilityPolicy,
+ taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
+ )
+ )
+ )
+
+ val service = provisioner.registry.resolve(workflowService, WorkflowService::class.java)!!
- try {
val trace = Trace.open(
Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()),
format = "gwf"
)
+ service.replay(clock, trace.toJobs())
- workflowHelper.replay(trace.toJobs())
- } finally {
- workflowHelper.close()
- computeHelper.close()
- }
-
- val metrics = workflowHelper.service.getSchedulerStats()
+ val metrics = service.getSchedulerStats()
- assertAll(
- { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") },
- { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") },
- { assertEquals(metrics.workflowsSubmitted, metrics.workflowsFinished, "Not all started jobs finished") },
- { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") },
- { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") },
- { assertEquals(32649883L, clock.millis()) { "Total duration incorrect" } }
- )
+ assertAll(
+ { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") },
+ { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") },
+ {
+ assertEquals(
+ metrics.workflowsSubmitted,
+ metrics.workflowsFinished,
+ "Not all started jobs finished"
+ )
+ },
+ { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") },
+ { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") },
+ { assertEquals(32649883L, clock.millis()) { "Total duration incorrect" } }
+ )
+ }
}
/**
diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
deleted file mode 100644
index 435d0190..00000000
--- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflow.workload
-
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import org.opendc.compute.api.ComputeClient
-import org.opendc.workflow.api.Job
-import org.opendc.workflow.service.WorkflowService
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
-
-/**
- * Helper class to simulate workflow-based workloads in OpenDC.
- *
- * @param context [CoroutineContext] to run the simulation in.
- * @param clock [Clock] instance tracking simulation time.
- * @param computeClient A [ComputeClient] instance to communicate with the cluster scheduler.
- * @param schedulerSpec The configuration of the workflow scheduler.
- */
-public class WorkflowServiceHelper(
- private val context: CoroutineContext,
- private val clock: Clock,
- private val computeClient: ComputeClient,
- private val schedulerSpec: WorkflowSchedulerSpec
-) : AutoCloseable {
- /**
- * The [WorkflowService] that is constructed by this runner.
- */
- public val service: WorkflowService = WorkflowService(
- context,
- clock,
- computeClient,
- schedulerSpec.schedulingQuantum,
- jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy,
- jobOrderPolicy = schedulerSpec.jobOrderPolicy,
- taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy,
- taskOrderPolicy = schedulerSpec.taskOrderPolicy,
- )
-
- /**
- * Run the specified list of [jobs] using the workflow service and suspend execution until all jobs have
- * finished.
- */
- public suspend fun replay(jobs: List<Job>) {
- // Sort jobs by their arrival time
- val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long }
- if (orderedJobs.isEmpty()) {
- return
- }
-
- // Wait until the trace is started
- val startTime = orderedJobs[0].metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long
- var offset = 0L
-
- if (startTime != Long.MAX_VALUE) {
- offset = startTime - clock.millis()
- delay(offset.coerceAtLeast(0))
- }
-
- coroutineScope {
- for (job in orderedJobs) {
- val submitTime = job.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long
- if (submitTime != Long.MAX_VALUE) {
- delay(((submitTime - offset) - clock.millis()).coerceAtLeast(0))
- }
-
- launch { service.invoke(job) }
- }
- }
- }
-
- override fun close() {
- computeClient.close()
- service.close()
- }
-}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index d37b96de..92b1eaf3 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -28,13 +28,13 @@ include(":opendc-compute:opendc-compute-simulator")
include(":opendc-compute:opendc-compute-workload")
include(":opendc-workflow:opendc-workflow-api")
include(":opendc-workflow:opendc-workflow-service")
-include(":opendc-workflow:opendc-workflow-workload")
include(":opendc-faas:opendc-faas-api")
include(":opendc-faas:opendc-faas-service")
include(":opendc-faas:opendc-faas-simulator")
include(":opendc-faas:opendc-faas-workload")
include(":opendc-experiments:opendc-experiments-base")
include(":opendc-experiments:opendc-experiments-compute")
+include(":opendc-experiments:opendc-experiments-workflow")
include(":opendc-experiments:opendc-experiments-capelin")
include(":opendc-experiments:opendc-experiments-tf20")
include(":opendc-web:opendc-web-proto")