summaryrefslogtreecommitdiff
path: root/opendc-workflow
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-workflow')
-rw-r--r--opendc-workflow/opendc-workflow-service/build.gradle.kts4
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt8
-rw-r--r--opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt20
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt127
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt (renamed from opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt)104
5 files changed, 200 insertions, 63 deletions
diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts
index 5e73222c..43b64b15 100644
--- a/opendc-workflow/opendc-workflow-service/build.gradle.kts
+++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts
@@ -39,8 +39,8 @@ dependencies {
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testImplementation(projects.opendcCompute.opendcComputeSimulator)
- testImplementation(projects.opendcFormat)
+ testImplementation(projects.opendcTrace.opendcTraceApi)
testImplementation(projects.opendcTelemetry.opendcTelemetrySdk)
- testImplementation(libs.jackson.module.kotlin)
+ testRuntimeOnly(projects.opendcTrace.opendcTraceGwf)
testRuntimeOnly(libs.log4j.slf4j)
}
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
index d3358ef1..a0248a93 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt
@@ -22,7 +22,7 @@
package org.opendc.workflow.service
-import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.compute.api.ComputeClient
import org.opendc.workflow.api.Job
import org.opendc.workflow.service.internal.WorkflowServiceImpl
@@ -62,7 +62,7 @@ public interface WorkflowService : AutoCloseable {
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
* @param tracer The event tracer to use.
- * @param meter The meter to use.
+ * @param meterProvider The meter provider to use.
* @param compute The compute client to use.
* @param mode The scheduling mode to use.
* @param jobAdmissionPolicy The job admission policy to use.
@@ -73,7 +73,7 @@ public interface WorkflowService : AutoCloseable {
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
- meter: Meter,
+ meterProvider: MeterProvider,
compute: ComputeClient,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -84,7 +84,7 @@ public interface WorkflowService : AutoCloseable {
return WorkflowServiceImpl(
context,
clock,
- meter,
+ meterProvider,
compute,
mode,
jobAdmissionPolicy,
diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
index 32191b8f..a0fd3fad 100644
--- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
+++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt
@@ -23,6 +23,7 @@
package org.opendc.workflow.service.internal
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.map
import mu.KotlinLogging
@@ -48,7 +49,7 @@ import kotlin.coroutines.resume
public class WorkflowServiceImpl(
context: CoroutineContext,
internal val clock: Clock,
- private val meter: Meter,
+ meterProvider: MeterProvider,
private val computeClient: ComputeClient,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -67,6 +68,11 @@ public class WorkflowServiceImpl(
private val logger = KotlinLogging.logger {}
/**
+ * The [Meter] to collect metrics of this service.
+ */
+ private val meter = meterProvider.get("org.opendc.workflow.service")
+
+ /**
* The incoming jobs ready to be processed by the scheduler.
*/
internal val incomingJobs: MutableSet<JobState> = linkedSetOf()
@@ -155,7 +161,7 @@ public class WorkflowServiceImpl(
/**
* The number of jobs that have been submitted to the service.
*/
- private val submittedJobs = meter.longCounterBuilder("jobs.submitted")
+ private val submittedJobs = meter.counterBuilder("jobs.submitted")
.setDescription("Number of submitted jobs")
.setUnit("1")
.build()
@@ -163,7 +169,7 @@ public class WorkflowServiceImpl(
/**
* The number of jobs that are running.
*/
- private val runningJobs = meter.longUpDownCounterBuilder("jobs.active")
+ private val runningJobs = meter.upDownCounterBuilder("jobs.active")
.setDescription("Number of jobs running")
.setUnit("1")
.build()
@@ -171,7 +177,7 @@ public class WorkflowServiceImpl(
/**
* The number of jobs that have finished running.
*/
- private val finishedJobs = meter.longCounterBuilder("jobs.finished")
+ private val finishedJobs = meter.counterBuilder("jobs.finished")
.setDescription("Number of jobs that finished running")
.setUnit("1")
.build()
@@ -179,7 +185,7 @@ public class WorkflowServiceImpl(
/**
* The number of tasks that have been submitted to the service.
*/
- private val submittedTasks = meter.longCounterBuilder("tasks.submitted")
+ private val submittedTasks = meter.counterBuilder("tasks.submitted")
.setDescription("Number of submitted tasks")
.setUnit("1")
.build()
@@ -187,7 +193,7 @@ public class WorkflowServiceImpl(
/**
* The number of jobs that are running.
*/
- private val runningTasks = meter.longUpDownCounterBuilder("tasks.active")
+ private val runningTasks = meter.upDownCounterBuilder("tasks.active")
.setDescription("Number of tasks running")
.setUnit("1")
.build()
@@ -195,7 +201,7 @@ public class WorkflowServiceImpl(
/**
* The number of jobs that have finished running.
*/
- private val finishedTasks = meter.longCounterBuilder("tasks.finished")
+ private val finishedTasks = meter.counterBuilder("tasks.finished")
.setDescription("Number of tasks that finished running")
.setUnit("1")
.build()
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt
new file mode 100644
index 00000000..9ee3736e
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service
+
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import org.opendc.simulator.compute.workload.SimFlopsWorkload
+import org.opendc.trace.*
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.api.Task
+import org.opendc.workflow.api.WORKFLOW_TASK_CORES
+import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
+import java.time.Clock
+import java.util.*
+import kotlin.collections.HashMap
+import kotlin.collections.HashSet
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * Helper tool to replay workflow trace.
+ */
+internal class TraceReplayer(private val trace: Trace) {
+ /**
+ * Replay the workload.
+ */
+ public suspend fun replay(clock: Clock, service: WorkflowService) {
+ val jobs = parseTrace(trace)
+
+ // Sort jobs by their arrival time
+ (jobs as MutableList<Job>).sortBy { it.metadata["WORKFLOW_SUBMIT_TIME"] as Long }
+
+ // Wait until the trace is started
+ val startTime = jobs[0].metadata["WORKFLOW_SUBMIT_TIME"] as Long
+ delay(min(0L, startTime - clock.millis()))
+
+ val offset = startTime - clock.millis()
+
+ coroutineScope {
+ for (job in jobs) {
+ val submitTime = job.metadata["WORKFLOW_SUBMIT_TIME"] as Long
+ delay(max(0, (submitTime - offset) - clock.millis()))
+
+ launch { service.run(job) }
+ }
+ }
+ }
+
+ /**
+ * Convert [trace] into a list of [Job]s that can be submitted to the workflow service.
+ */
+ public fun parseTrace(trace: Trace): List<Job> {
+ val table = checkNotNull(trace.getTable(TABLE_TASKS))
+ val reader = table.newReader()
+
+ val jobs = mutableMapOf<Long, Job>()
+ val tasks = mutableMapOf<Long, Task>()
+ val taskDependencies = mutableMapOf<Task, Set<Long>>()
+
+ try {
+ while (reader.nextRow()) {
+ // Bag of tasks without workflow ID all share the same workflow
+ val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.get(TASK_WORKFLOW_ID).toLong() else 0L
+ val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) }
+
+ val id = reader.get(TASK_ID).toLong()
+ val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS))
+ reader.getInt(TASK_ALLOC_NCPUS)
+ else
+ reader.getInt(TASK_REQ_NCPUS)
+ val submitTime = reader.get(TASK_SUBMIT_TIME)
+ val runtime = reader.get(TASK_RUNTIME)
+ val flops: Long = 4000 * runtime.seconds * grantedCpus
+ val workload = SimFlopsWorkload(flops)
+ val task = Task(
+ UUID(0L, id),
+ "<unnamed>",
+ HashSet(),
+ mapOf(
+ "workload" to workload,
+ WORKFLOW_TASK_CORES to grantedCpus,
+ WORKFLOW_TASK_DEADLINE to runtime.toMillis()
+ ),
+ )
+
+ tasks[id] = task
+ taskDependencies[task] = reader.get(TASK_PARENTS).map { it.toLong() }.toSet()
+
+ (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) }
+ (workflow.tasks as MutableSet<Task>).add(task)
+ }
+
+ // Resolve dependencies for all tasks
+ for ((task, deps) in taskDependencies) {
+ for (dep in deps) {
+ val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" }
+ (task.dependencies as MutableSet<Task>).add(parent)
+ }
+ }
+ } finally {
+ reader.close()
+ }
+
+ return jobs.values.toList()
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index a8d3a9e8..04f54e58 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -25,39 +25,41 @@ package org.opendc.workflow.service
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.ProvisionedCoresWeigher
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.VCpuWeigher
import org.opendc.compute.simulator.SimHost
-import org.opendc.format.environment.sc18.Sc18EnvironmentReader
-import org.opendc.format.trace.gwf.GwfTraceReader
-import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider
+import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.FlowEngine
import org.opendc.telemetry.sdk.toOtelClock
+import org.opendc.trace.Trace
import org.opendc.workflow.service.internal.WorkflowServiceImpl
import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy
import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy
-import kotlin.math.max
+import java.nio.file.Paths
+import java.time.Duration
+import java.util.*
/**
* Integration test suite for the [WorkflowServiceImpl].
*/
-@DisplayName("WorkflowServiceImpl")
-@OptIn(ExperimentalCoroutinesApi::class)
-internal class WorkflowServiceIntegrationTest {
+@DisplayName("WorkflowService")
+internal class WorkflowServiceTest {
/**
* A large integration test where we check whether all tasks in some trace are executed correctly.
*/
@@ -68,34 +70,34 @@ internal class WorkflowServiceIntegrationTest {
.setClock(clock.toOtelClock())
.build()
- val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
- .use { it.read() }
- .map { def ->
- SimHost(
- def.uid,
- def.name,
- def.model,
- def.meta,
- coroutineContext,
- clock,
- MeterProvider.noop().get("opendc-compute-simulator"),
- SimSpaceSharedHypervisorProvider()
- )
- }
-
- val meter = MeterProvider.noop().get("opendc-compute")
+ val interpreter = FlowEngine(coroutineContext, clock)
+ val machineModel = createMachineModel()
+ val hvProvider = SimSpaceSharedHypervisorProvider()
+ val hosts = List(4) { id ->
+ SimHost(
+ UUID(0, id.toLong()),
+ "node-$id",
+ machineModel,
+ emptyMap(),
+ coroutineContext,
+ interpreter,
+ MeterProvider.noop(),
+ hvProvider,
+ )
+ }
+
val computeScheduler = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to -1.0)
+ filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
+ weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0))
)
- val compute = ComputeService(coroutineContext, clock, meter, computeScheduler, schedulingQuantum = 1000)
+ val compute = ComputeService(coroutineContext, clock, MeterProvider.noop(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1))
hosts.forEach { compute.addHost(it) }
val scheduler = WorkflowService(
coroutineContext,
clock,
- meterProvider.get("opendc-workflow"),
+ meterProvider,
compute.newClient(),
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
@@ -104,23 +106,13 @@ internal class WorkflowServiceIntegrationTest {
taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
)
- val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf"))
- var offset = Long.MIN_VALUE
-
- coroutineScope {
- while (reader.hasNext()) {
- val entry = reader.next()
-
- if (offset < 0) {
- offset = entry.start - clock.millis()
- }
+ val trace = Trace.open(
+ Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()),
+ format = "gwf"
+ )
+ val replayer = TraceReplayer(trace)
- delay(max(0, (entry.start - offset) - clock.millis()))
- launch {
- scheduler.run(entry.workload)
- }
- }
- }
+ replayer.replay(clock, scheduler)
hosts.forEach(SimHost::close)
scheduler.close()
@@ -133,10 +125,22 @@ internal class WorkflowServiceIntegrationTest {
{ assertEquals(0, metrics.jobsActive, "Not all submitted jobs started") },
{ assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") },
{ assertEquals(0, metrics.tasksActive, "Not all started tasks finished") },
- { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }
+ { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") },
+ { assertEquals(33213236L, clock.millis()) }
)
}
+ /**
+ * The machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html
+ */
+ private fun createMachineModel(): MachineModel {
+ val node = ProcessingNode("AMD", "am64", "EPYC 7742", 32)
+ val cpus = List(node.coreCount) { id -> ProcessingUnit(node, id, 3400.0) }
+ val memory = List(8) { MemoryUnit("Samsung", "Unknown", 2933.0, 16_000) }
+
+ return MachineModel(cpus, memory)
+ }
+
class WorkflowMetrics {
var jobsSubmitted = 0L
var jobsActive = 0L