summaryrefslogtreecommitdiff
path: root/opendc-workflow/opendc-workflow-service
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-02 11:50:43 +0200
committerGitHub <noreply@github.com>2021-09-02 11:50:43 +0200
commit05f80bd9fb7caf765e3ebbb70d48d0d5e185bd42 (patch)
tree5fa9501621ad327028c2f2e12c9c367f44f6aebe /opendc-workflow/opendc-workflow-service
parent99f391d11db57c3db3f326958de8f66502969cdb (diff)
parent5935531137a22fdb920921580d491f86adec65c9 (diff)
merge: Add generic trace reading library
This pull request adds a generic trace reading library to OpenDC. The library has been designed to support a wide range of trace formats and uses a streaming approach to improve performance of reading large traces. * Add trace reading API * Implement API for GWF format * Implement API for SWF format * Implement API for WTF format * Implement API for Bitbrains format * Implement API for Bitbrains Parquet format **Breaking API Changes** * `opendc-format` has been removed in favour of `opendc-trace-*`
Diffstat (limited to 'opendc-workflow/opendc-workflow-service')
-rw-r--r--opendc-workflow/opendc-workflow-service/build.gradle.kts6
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt127
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt (renamed from opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt)75
3 files changed, 165 insertions, 43 deletions
diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts
index bc082dbc..941202d2 100644
--- a/opendc-workflow/opendc-workflow-service/build.gradle.kts
+++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts
@@ -39,11 +39,7 @@ dependencies {
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testImplementation(projects.opendcCompute.opendcComputeSimulator)
- testImplementation(projects.opendcFormat)
+ testImplementation(projects.opendcTrace.opendcTraceGwf)
testImplementation(projects.opendcTelemetry.opendcTelemetrySdk)
- testImplementation(libs.jackson.module.kotlin) {
- exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect")
- }
- testImplementation(kotlin("reflect"))
testRuntimeOnly(libs.log4j.slf4j)
}
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt
new file mode 100644
index 00000000..a390fe08
--- /dev/null
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.workflow.service
+
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import org.opendc.simulator.compute.workload.SimFlopsWorkload
+import org.opendc.trace.*
+import org.opendc.workflow.api.Job
+import org.opendc.workflow.api.Task
+import org.opendc.workflow.api.WORKFLOW_TASK_CORES
+import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
+import java.time.Clock
+import java.util.*
+import kotlin.collections.HashMap
+import kotlin.collections.HashSet
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * Helper tool to replay workflow trace.
+ */
+internal class TraceReplayer(private val trace: Trace) {
+ /**
+ * Replay the workload.
+ */
+ public suspend fun replay(clock: Clock, service: WorkflowService) {
+ val jobs = parseTrace(trace)
+
+ // Sort jobs by their arrival time
+ (jobs as MutableList<Job>).sortBy { it.metadata["WORKFLOW_SUBMIT_TIME"] as Long }
+
+ // Wait until the trace is started
+ val startTime = jobs[0].metadata["WORKFLOW_SUBMIT_TIME"] as Long
+ delay(min(0L, startTime - clock.millis()))
+
+ val offset = startTime - clock.millis()
+
+ coroutineScope {
+ for (job in jobs) {
+ val submitTime = job.metadata["WORKFLOW_SUBMIT_TIME"] as Long
+ delay(max(0, (submitTime - offset) - clock.millis()))
+
+ launch { service.run(job) }
+ }
+ }
+ }
+
+ /**
+ * Convert [trace] into a list of [Job]s that can be submitted to the workflow service.
+ */
+ public fun parseTrace(trace: Trace): List<Job> {
+ val table = checkNotNull(trace.getTable(TABLE_TASKS))
+ val reader = table.newReader()
+
+ val jobs = mutableMapOf<Long, Job>()
+ val tasks = mutableMapOf<Long, Task>()
+ val taskDependencies = mutableMapOf<Task, Set<Long>>()
+
+ try {
+ while (reader.nextRow()) {
+ // Bag of tasks without workflow ID all share the same workflow
+ val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.getLong(TASK_WORKFLOW_ID) else 0L
+ val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) }
+
+ val id = reader.getLong(TASK_ID)
+ val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS))
+ reader.getInt(TASK_ALLOC_NCPUS)
+ else
+ reader.getInt(TASK_REQ_NCPUS)
+ val submitTime = reader.getLong(TASK_SUBMIT_TIME)
+ val runtime = reader.getLong(TASK_RUNTIME)
+ val flops: Long = 4000 * runtime * grantedCpus
+ val workload = SimFlopsWorkload(flops)
+ val task = Task(
+ UUID(0L, id),
+ "<unnamed>",
+ HashSet(),
+ mapOf(
+ "workload" to workload,
+ WORKFLOW_TASK_CORES to grantedCpus,
+ WORKFLOW_TASK_DEADLINE to (runtime * 1000)
+ ),
+ )
+
+ tasks[id] = task
+ taskDependencies[task] = reader.get(TASK_PARENTS)
+
+ (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime) { a, b -> min(a as Long, b as Long) }
+ (workflow.tasks as MutableSet<Task>).add(task)
+ }
+
+ // Resolve dependencies for all tasks
+ for ((task, deps) in taskDependencies) {
+ for (dep in deps) {
+ val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" }
+ (task.dependencies as MutableSet<Task>).add(parent)
+ }
+ }
+ } finally {
+ reader.close()
+ }
+
+ return jobs.values.toList()
+ }
+}
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index d82959e7..07433d1f 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -25,9 +25,6 @@ package org.opendc.workflow.service
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
@@ -39,25 +36,28 @@ import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.VCpuWeigher
import org.opendc.compute.simulator.SimHost
-import org.opendc.format.environment.sc18.Sc18EnvironmentReader
-import org.opendc.format.trace.gwf.GwfTraceReader
import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.SimResourceInterpreter
import org.opendc.telemetry.sdk.toOtelClock
+import org.opendc.trace.gwf.GwfTraceFormat
import org.opendc.workflow.service.internal.WorkflowServiceImpl
import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode
import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy
import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy
import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy
import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy
-import kotlin.math.max
+import java.util.*
/**
* Integration test suite for the [WorkflowServiceImpl].
*/
@DisplayName("WorkflowService")
-internal class WorkflowServiceIntegrationTest {
+internal class WorkflowServiceTest {
/**
* A large integration test where we check whether all tasks in some trace are executed correctly.
*/
@@ -69,20 +69,20 @@ internal class WorkflowServiceIntegrationTest {
.build()
val interpreter = SimResourceInterpreter(coroutineContext, clock)
- val hosts = Sc18EnvironmentReader(checkNotNull(object {}.javaClass.getResourceAsStream("/environment.json")))
- .use { it.read() }
- .map { def ->
- SimHost(
- def.uid,
- def.name,
- def.model,
- def.meta,
- coroutineContext,
- interpreter,
- MeterProvider.noop().get("opendc-compute-simulator"),
- SimSpaceSharedHypervisorProvider()
- )
- }
+ val machineModel = createMachineModel()
+ val hvProvider = SimSpaceSharedHypervisorProvider()
+ val hosts = List(4) { id ->
+ SimHost(
+ UUID(0, id.toLong()),
+ "node-$id",
+ machineModel,
+ emptyMap(),
+ coroutineContext,
+ interpreter,
+ meterProvider.get("opendc-compute-simulator"),
+ hvProvider,
+ )
+ }
val meter = MeterProvider.noop().get("opendc-compute")
val computeScheduler = FilterScheduler(
@@ -105,23 +105,10 @@ internal class WorkflowServiceIntegrationTest {
taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
)
- val reader = GwfTraceReader(checkNotNull(object {}.javaClass.getResourceAsStream("/trace.gwf")))
- var offset = Long.MIN_VALUE
-
- coroutineScope {
- while (reader.hasNext()) {
- val entry = reader.next()
+ val trace = GwfTraceFormat().open(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")))
+ val replayer = TraceReplayer(trace)
- if (offset < 0) {
- offset = entry.start - clock.millis()
- }
-
- delay(max(0, (entry.start - offset) - clock.millis()))
- launch {
- scheduler.run(entry.workload)
- }
- }
- }
+ replayer.replay(clock, scheduler)
hosts.forEach(SimHost::close)
scheduler.close()
@@ -134,10 +121,22 @@ internal class WorkflowServiceIntegrationTest {
{ assertEquals(0, metrics.jobsActive, "Not all submitted jobs started") },
{ assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") },
{ assertEquals(0, metrics.tasksActive, "Not all started tasks finished") },
- { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }
+ { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") },
+ { assertEquals(33213237L, clock.millis()) }
)
}
+ /**
+ * The machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html
+ */
+ private fun createMachineModel(): MachineModel {
+ val node = ProcessingNode("AMD", "am64", "EPYC 7742", 32)
+ val cpus = List(node.coreCount) { id -> ProcessingUnit(node, id, 3400.0) }
+ val memory = List(8) { MemoryUnit("Samsung", "Unknown", 2933.0, 16_000) }
+
+ return MachineModel(cpus, memory)
+ }
+
class WorkflowMetrics {
var jobsSubmitted = 0L
var jobsActive = 0L