diff options
Diffstat (limited to 'opendc-workflow/opendc-workflow-service')
| -rw-r--r-- | opendc-workflow/opendc-workflow-service/build.gradle.kts | 6 | ||||
| -rw-r--r-- | opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt | 127 | ||||
| -rw-r--r-- | opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt (renamed from opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt) | 75 |
3 files changed, 165 insertions, 43 deletions
diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index bc082dbc..941202d2 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -39,11 +39,7 @@ dependencies { testImplementation(projects.opendcSimulator.opendcSimulatorCore) testImplementation(projects.opendcCompute.opendcComputeSimulator) - testImplementation(projects.opendcFormat) + testImplementation(projects.opendcTrace.opendcTraceGwf) testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) - testImplementation(libs.jackson.module.kotlin) { - exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect") - } - testImplementation(kotlin("reflect")) testRuntimeOnly(libs.log4j.slf4j) } diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt new file mode 100644 index 00000000..a390fe08 --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service + +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.trace.* +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import java.time.Clock +import java.util.* +import kotlin.collections.HashMap +import kotlin.collections.HashSet +import kotlin.math.max +import kotlin.math.min + +/** + * Helper tool to replay workflow trace. + */ +internal class TraceReplayer(private val trace: Trace) { + /** + * Replay the workload. + */ + public suspend fun replay(clock: Clock, service: WorkflowService) { + val jobs = parseTrace(trace) + + // Sort jobs by their arrival time + (jobs as MutableList<Job>).sortBy { it.metadata["WORKFLOW_SUBMIT_TIME"] as Long } + + // Wait until the trace is started + val startTime = jobs[0].metadata["WORKFLOW_SUBMIT_TIME"] as Long + delay(min(0L, startTime - clock.millis())) + + val offset = startTime - clock.millis() + + coroutineScope { + for (job in jobs) { + val submitTime = job.metadata["WORKFLOW_SUBMIT_TIME"] as Long + delay(max(0, (submitTime - offset) - clock.millis())) + + launch { service.run(job) } + } + } + } + + /** + * Convert [trace] into a list of [Job]s that can be submitted to the workflow service. + */ + public fun parseTrace(trace: Trace): List<Job> { + val table = checkNotNull(trace.getTable(TABLE_TASKS)) + val reader = table.newReader() + + val jobs = mutableMapOf<Long, Job>() + val tasks = mutableMapOf<Long, Task>() + val taskDependencies = mutableMapOf<Task, Set<Long>>() + + try { + while (reader.nextRow()) { + // Bag of tasks without workflow ID all share the same workflow + val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.getLong(TASK_WORKFLOW_ID) else 0L + val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) } + + val id = reader.getLong(TASK_ID) + val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS)) + reader.getInt(TASK_ALLOC_NCPUS) + else + reader.getInt(TASK_REQ_NCPUS) + val submitTime = reader.getLong(TASK_SUBMIT_TIME) + val runtime = reader.getLong(TASK_RUNTIME) + val flops: Long = 4000 * runtime * grantedCpus + val workload = SimFlopsWorkload(flops) + val task = Task( + UUID(0L, id), + "<unnamed>", + HashSet(), + mapOf( + "workload" to workload, + WORKFLOW_TASK_CORES to grantedCpus, + WORKFLOW_TASK_DEADLINE to (runtime * 1000) + ), + ) + + tasks[id] = task + taskDependencies[task] = reader.get(TASK_PARENTS) + + (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime) { a, b -> min(a as Long, b as Long) } + (workflow.tasks as MutableSet<Task>).add(task) + } + + // Resolve dependencies for all tasks + for ((task, deps) in taskDependencies) { + for (dep in deps) { + val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" } + (task.dependencies as MutableSet<Task>).add(parent) + } + } + } finally { + reader.close() + } + + return jobs.values.toList() + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index d82959e7..07433d1f 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -25,9 +25,6 @@ package org.opendc.workflow.service import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test @@ -39,25 +36,28 @@ import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.VCpuWeigher import org.opendc.compute.simulator.SimHost -import org.opendc.format.environment.sc18.Sc18EnvironmentReader -import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.sdk.toOtelClock +import org.opendc.trace.gwf.GwfTraceFormat import org.opendc.workflow.service.internal.WorkflowServiceImpl import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy -import kotlin.math.max +import java.util.* /** * Integration test suite for the [WorkflowServiceImpl]. */ @DisplayName("WorkflowService") -internal class WorkflowServiceIntegrationTest { +internal class WorkflowServiceTest { /** * A large integration test where we check whether all tasks in some trace are executed correctly. */ @@ -69,20 +69,20 @@ internal class WorkflowServiceIntegrationTest { .build() val interpreter = SimResourceInterpreter(coroutineContext, clock) - val hosts = Sc18EnvironmentReader(checkNotNull(object {}.javaClass.getResourceAsStream("/environment.json"))) - .use { it.read() } - .map { def -> - SimHost( - def.uid, - def.name, - def.model, - def.meta, - coroutineContext, - interpreter, - MeterProvider.noop().get("opendc-compute-simulator"), - SimSpaceSharedHypervisorProvider() - ) - } + val machineModel = createMachineModel() + val hvProvider = SimSpaceSharedHypervisorProvider() + val hosts = List(4) { id -> + SimHost( + UUID(0, id.toLong()), + "node-$id", + machineModel, + emptyMap(), + coroutineContext, + interpreter, + meterProvider.get("opendc-compute-simulator"), + hvProvider, + ) + } val meter = MeterProvider.noop().get("opendc-compute") val computeScheduler = FilterScheduler( @@ -105,23 +105,10 @@ internal class WorkflowServiceIntegrationTest { taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), ) - val reader = GwfTraceReader(checkNotNull(object {}.javaClass.getResourceAsStream("/trace.gwf"))) - var offset = Long.MIN_VALUE - - coroutineScope { - while (reader.hasNext()) { - val entry = reader.next() + val trace = GwfTraceFormat().open(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf"))) + val replayer = TraceReplayer(trace) - if (offset < 0) { - offset = entry.start - clock.millis() - } - - delay(max(0, (entry.start - offset) - clock.millis())) - launch { - scheduler.run(entry.workload) - } - } - } + replayer.replay(clock, scheduler) hosts.forEach(SimHost::close) scheduler.close() @@ -134,10 +121,22 @@ internal class WorkflowServiceIntegrationTest { { assertEquals(0, metrics.jobsActive, "Not all submitted jobs started") }, { assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") }, { assertEquals(0, metrics.tasksActive, "Not all started tasks finished") }, - { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") } + { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, + { assertEquals(33213237L, clock.millis()) } ) } + /** + * The machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html + */ + private fun createMachineModel(): MachineModel { + val node = ProcessingNode("AMD", "am64", "EPYC 7742", 32) + val cpus = List(node.coreCount) { id -> ProcessingUnit(node, id, 3400.0) } + val memory = List(8) { MemoryUnit("Samsung", "Unknown", 2933.0, 16_000) } + + return MachineModel(cpus, memory) + } + class WorkflowMetrics { var jobsSubmitted = 0L var jobsActive = 0L |
