summaryrefslogtreecommitdiff
path: root/opendc-workflow
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-workflow')
-rw-r--r--opendc-workflow/opendc-workflow-service/build.gradle.kts4
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt96
-rw-r--r--opendc-workflow/opendc-workflow-workload/build.gradle.kts35
-rw-r--r--opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt94
-rw-r--r--opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt40
-rw-r--r--opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt98
6 files changed, 54 insertions, 313 deletions
diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts
index b6365885..6908a5af 100644
--- a/opendc-workflow/opendc-workflow-service/build.gradle.kts
+++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts
@@ -33,9 +33,9 @@ dependencies {
implementation(projects.opendcCommon)
implementation(libs.kotlin.logging)
- testImplementation(projects.opendcWorkflow.opendcWorkflowWorkload)
- testImplementation(projects.opendcCompute.opendcComputeWorkload)
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
+ testImplementation(projects.opendcExperiments.opendcExperimentsCompute)
+ testImplementation(projects.opendcExperiments.opendcExperimentsWorkflow)
testImplementation(projects.opendcTrace.opendcTraceApi)
testRuntimeOnly(projects.opendcTrace.opendcTraceGwf)
testRuntimeOnly(libs.log4j.slf4j)
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index 0fb8b67c..49496fed 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -22,17 +22,22 @@
package org.opendc.workflow.service
+import org.junit.jupiter.api.Assertions.assertAll
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertAll
+import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.service.scheduler.FilterScheduler
import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.VCpuWeigher
-import org.opendc.compute.workload.ComputeServiceHelper
import org.opendc.compute.workload.topology.HostSpec
+import org.opendc.experiments.compute.setupComputeService
+import org.opendc.experiments.compute.setupHosts
+import org.opendc.experiments.provisioner.Provisioner
+import org.opendc.experiments.provisioner.ProvisioningContext
+import org.opendc.experiments.workflow.*
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
@@ -46,9 +51,6 @@ import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy
import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy
-import org.opendc.workflow.workload.WorkflowSchedulerSpec
-import org.opendc.workflow.workload.WorkflowServiceHelper
-import org.opendc.workflow.workload.toJobs
import java.nio.file.Paths
import java.time.Duration
import java.util.*
@@ -63,55 +65,61 @@ internal class WorkflowServiceTest {
*/
@Test
fun testTrace() = runBlockingSimulation {
- // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts
- val computeScheduler = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
- weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0))
- )
+ val computeService = "compute.opendc.org"
+ val workflowService = "workflow.opendc.org"
- val computeHelper = ComputeServiceHelper(
- coroutineContext,
- clock,
- computeScheduler,
- seed = 0,
- schedulingQuantum = Duration.ofSeconds(1)
- )
+ Provisioner(coroutineContext, clock, seed = 0L).use { provisioner ->
+ val scheduler: (ProvisioningContext) -> ComputeScheduler = {
+ FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
+ weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0))
+ )
+ }
- val hostCount = 4
- repeat(hostCount) { computeHelper.registerHost(createHostSpec(it)) }
+ provisioner.runSteps(
+ // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts
+ setupComputeService(computeService, scheduler, schedulingQuantum = Duration.ofSeconds(1)),
+ setupHosts(computeService, List(4) { createHostSpec(it) }),
- // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines
- val workflowScheduler = WorkflowSchedulerSpec(
- schedulingQuantum = Duration.ofMillis(100),
- jobAdmissionPolicy = NullJobAdmissionPolicy,
- jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
- taskEligibilityPolicy = NullTaskEligibilityPolicy,
- taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
- )
- val workflowHelper = WorkflowServiceHelper(coroutineContext, clock, computeHelper.service.newClient(), workflowScheduler)
+ // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines
+ setupWorkflowService(
+ workflowService,
+ computeService,
+ WorkflowSchedulerSpec(
+ schedulingQuantum = Duration.ofMillis(100),
+ jobAdmissionPolicy = NullJobAdmissionPolicy,
+ jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
+ taskEligibilityPolicy = NullTaskEligibilityPolicy,
+ taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
+ )
+ )
+ )
+
+ val service = provisioner.registry.resolve(workflowService, WorkflowService::class.java)!!
- try {
val trace = Trace.open(
Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()),
format = "gwf"
)
+ service.replay(clock, trace.toJobs())
- workflowHelper.replay(trace.toJobs())
- } finally {
- workflowHelper.close()
- computeHelper.close()
- }
-
- val metrics = workflowHelper.service.getSchedulerStats()
+ val metrics = service.getSchedulerStats()
- assertAll(
- { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") },
- { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") },
- { assertEquals(metrics.workflowsSubmitted, metrics.workflowsFinished, "Not all started jobs finished") },
- { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") },
- { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") },
- { assertEquals(32649883L, clock.millis()) { "Total duration incorrect" } }
- )
+ assertAll(
+ { assertEquals(758, metrics.workflowsSubmitted, "No jobs submitted") },
+ { assertEquals(0, metrics.workflowsRunning, "Not all submitted jobs started") },
+ {
+ assertEquals(
+ metrics.workflowsSubmitted,
+ metrics.workflowsFinished,
+ "Not all started jobs finished"
+ )
+ },
+ { assertEquals(0, metrics.tasksRunning, "Not all started tasks finished") },
+ { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") },
+ { assertEquals(32649883L, clock.millis()) { "Total duration incorrect" } }
+ )
+ }
}
/**
diff --git a/opendc-workflow/opendc-workflow-workload/build.gradle.kts b/opendc-workflow/opendc-workflow-workload/build.gradle.kts
deleted file mode 100644
index 17eadf29..00000000
--- a/opendc-workflow/opendc-workflow-workload/build.gradle.kts
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-description = "Support library for simulating workflows with OpenDC"
-
-/* Build configuration */
-plugins {
- `kotlin-library-conventions`
-}
-
-dependencies {
- api(projects.opendcWorkflow.opendcWorkflowService)
-
- implementation(projects.opendcSimulator.opendcSimulatorCompute)
- implementation(projects.opendcTrace.opendcTraceApi)
-}
diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt
deleted file mode 100644
index 5f57723b..00000000
--- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-@file:JvmName("TraceHelpers")
-package org.opendc.workflow.workload
-
-import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.trace.*
-import org.opendc.trace.conv.*
-import org.opendc.workflow.api.Job
-import org.opendc.workflow.api.Task
-import org.opendc.workflow.api.WORKFLOW_TASK_CORES
-import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
-import java.util.*
-import kotlin.collections.HashMap
-import kotlin.collections.HashSet
-import kotlin.math.min
-
-/**
- * Convert [Trace] into a list of [Job]s that can be submitted to the workflow service.
- */
-public fun Trace.toJobs(): List<Job> {
- val table = checkNotNull(getTable(TABLE_TASKS))
- val reader = table.newReader()
-
- val jobs = mutableMapOf<Long, Job>()
- val tasks = mutableMapOf<Long, Task>()
- val taskDependencies = mutableMapOf<Task, Set<Long>>()
-
- try {
- while (reader.nextRow()) {
- // Bag of tasks without workflow ID all share the same workflow
- val workflowId = if (reader.resolve(TASK_WORKFLOW_ID) != -1) reader.getString(TASK_WORKFLOW_ID)!!.toLong() else 0L
- val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) }
-
- val id = reader.getString(TASK_ID)!!.toLong()
- val grantedCpus = if (reader.resolve(TASK_ALLOC_NCPUS) != 0)
- reader.getInt(TASK_ALLOC_NCPUS)
- else
- reader.getInt(TASK_REQ_NCPUS)
- val submitTime = reader.getInstant(TASK_SUBMIT_TIME)!!
- val runtime = reader.getDuration(TASK_RUNTIME)!!
- val flops: Long = 4000 * runtime.seconds * grantedCpus
- val workload = SimFlopsWorkload(flops)
- val task = Task(
- UUID(0L, id),
- "<unnamed>",
- HashSet(),
- mapOf(
- "workload" to workload,
- WORKFLOW_TASK_CORES to grantedCpus,
- WORKFLOW_TASK_DEADLINE to runtime.toMillis()
- ),
- )
-
- tasks[id] = task
- taskDependencies[task] = reader.getSet(TASK_PARENTS, String::class.java)!!.map { it.toLong() }.toSet()
-
- (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) }
- (workflow.tasks as MutableSet<Task>).add(task)
- }
-
- // Resolve dependencies for all tasks
- for ((task, deps) in taskDependencies) {
- for (dep in deps) {
- val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" }
- (task.dependencies as MutableSet<Task>).add(parent)
- }
- }
- } finally {
- reader.close()
- }
-
- return jobs.values.toList()
-}
diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt
deleted file mode 100644
index d6a375b6..00000000
--- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflow.workload
-
-import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy
-import org.opendc.workflow.service.scheduler.job.JobOrderPolicy
-import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy
-import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy
-import java.time.Duration
-
-/**
- * Specification of the scheduling policies of the workflow scheduler.
- */
-public data class WorkflowSchedulerSpec(
- val schedulingQuantum: Duration,
- val jobAdmissionPolicy: JobAdmissionPolicy,
- val jobOrderPolicy: JobOrderPolicy,
- val taskEligibilityPolicy: TaskEligibilityPolicy,
- val taskOrderPolicy: TaskOrderPolicy,
-)
diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
deleted file mode 100644
index 435d0190..00000000
--- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflow.workload
-
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
-import org.opendc.compute.api.ComputeClient
-import org.opendc.workflow.api.Job
-import org.opendc.workflow.service.WorkflowService
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
-
-/**
- * Helper class to simulate workflow-based workloads in OpenDC.
- *
- * @param context [CoroutineContext] to run the simulation in.
- * @param clock [Clock] instance tracking simulation time.
- * @param computeClient A [ComputeClient] instance to communicate with the cluster scheduler.
- * @param schedulerSpec The configuration of the workflow scheduler.
- */
-public class WorkflowServiceHelper(
- private val context: CoroutineContext,
- private val clock: Clock,
- private val computeClient: ComputeClient,
- private val schedulerSpec: WorkflowSchedulerSpec
-) : AutoCloseable {
- /**
- * The [WorkflowService] that is constructed by this runner.
- */
- public val service: WorkflowService = WorkflowService(
- context,
- clock,
- computeClient,
- schedulerSpec.schedulingQuantum,
- jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy,
- jobOrderPolicy = schedulerSpec.jobOrderPolicy,
- taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy,
- taskOrderPolicy = schedulerSpec.taskOrderPolicy,
- )
-
- /**
- * Run the specified list of [jobs] using the workflow service and suspend execution until all jobs have
- * finished.
- */
- public suspend fun replay(jobs: List<Job>) {
- // Sort jobs by their arrival time
- val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long }
- if (orderedJobs.isEmpty()) {
- return
- }
-
- // Wait until the trace is started
- val startTime = orderedJobs[0].metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long
- var offset = 0L
-
- if (startTime != Long.MAX_VALUE) {
- offset = startTime - clock.millis()
- delay(offset.coerceAtLeast(0))
- }
-
- coroutineScope {
- for (job in orderedJobs) {
- val submitTime = job.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long
- if (submitTime != Long.MAX_VALUE) {
- delay(((submitTime - offset) - clock.millis()).coerceAtLeast(0))
- }
-
- launch { service.invoke(job) }
- }
- }
- }
-
- override fun close() {
- computeClient.close()
- service.close()
- }
-}