diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-25 14:53:54 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-25 14:53:54 +0200 |
| commit | aa9b32f8cd1467e9718959f400f6777e5d71737d (patch) | |
| tree | b88bbede15108c6855d7f94ded4c7054df186a72 /opendc-workflow | |
| parent | eb0e0a3bc557c05a70eead388797ab850ea87366 (diff) | |
| parent | b7a71e5b4aa77b41ef41deec2ace42b67a5a13a7 (diff) | |
merge: Integrate v2.1 progress into public repository
This pull request integrates the changes planned for the v2.1 release of
OpenDC into the public Github repository in order to sync the progress
of both repositories.
Diffstat (limited to 'opendc-workflow')
| -rw-r--r-- | opendc-workflow/opendc-workflow-service/build.gradle.kts | 4 | ||||
| -rw-r--r-- | opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt | 8 | ||||
| -rw-r--r-- | opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt | 20 | ||||
| -rw-r--r-- | opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt | 127 | ||||
| -rw-r--r-- | opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt (renamed from opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt) | 104 |
5 files changed, 200 insertions, 63 deletions
diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index 5e73222c..43b64b15 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -39,8 +39,8 @@ dependencies { testImplementation(projects.opendcSimulator.opendcSimulatorCore) testImplementation(projects.opendcCompute.opendcComputeSimulator) - testImplementation(projects.opendcFormat) + testImplementation(projects.opendcTrace.opendcTraceApi) testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) - testImplementation(libs.jackson.module.kotlin) + testRuntimeOnly(projects.opendcTrace.opendcTraceGwf) testRuntimeOnly(libs.log4j.slf4j) } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index d3358ef1..a0248a93 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -22,7 +22,7 @@ package org.opendc.workflow.service -import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient import org.opendc.workflow.api.Job import org.opendc.workflow.service.internal.WorkflowServiceImpl @@ -62,7 +62,7 @@ public interface WorkflowService : AutoCloseable { * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. * @param tracer The event tracer to use. - * @param meter The meter to use. + * @param meterProvider The meter provider to use. * @param compute The compute client to use. * @param mode The scheduling mode to use. * @param jobAdmissionPolicy The job admission policy to use. @@ -73,7 +73,7 @@ public interface WorkflowService : AutoCloseable { public operator fun invoke( context: CoroutineContext, clock: Clock, - meter: Meter, + meterProvider: MeterProvider, compute: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -84,7 +84,7 @@ public interface WorkflowService : AutoCloseable { return WorkflowServiceImpl( context, clock, - meter, + meterProvider, compute, mode, jobAdmissionPolicy, diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 32191b8f..a0fd3fad 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -23,6 +23,7 @@ package org.opendc.workflow.service.internal import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import kotlinx.coroutines.flow.map import mu.KotlinLogging @@ -48,7 +49,7 @@ import kotlin.coroutines.resume public class WorkflowServiceImpl( context: CoroutineContext, internal val clock: Clock, - private val meter: Meter, + meterProvider: MeterProvider, private val computeClient: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -67,6 +68,11 @@ public class WorkflowServiceImpl( private val logger = KotlinLogging.logger {} /** + * The [Meter] to collect metrics of this service. + */ + private val meter = meterProvider.get("org.opendc.workflow.service") + + /** * The incoming jobs ready to be processed by the scheduler. */ internal val incomingJobs: MutableSet<JobState> = linkedSetOf() @@ -155,7 +161,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that have been submitted to the service. */ - private val submittedJobs = meter.longCounterBuilder("jobs.submitted") + private val submittedJobs = meter.counterBuilder("jobs.submitted") .setDescription("Number of submitted jobs") .setUnit("1") .build() @@ -163,7 +169,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that are running. */ - private val runningJobs = meter.longUpDownCounterBuilder("jobs.active") + private val runningJobs = meter.upDownCounterBuilder("jobs.active") .setDescription("Number of jobs running") .setUnit("1") .build() @@ -171,7 +177,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that have finished running. */ - private val finishedJobs = meter.longCounterBuilder("jobs.finished") + private val finishedJobs = meter.counterBuilder("jobs.finished") .setDescription("Number of jobs that finished running") .setUnit("1") .build() @@ -179,7 +185,7 @@ public class WorkflowServiceImpl( /** * The number of tasks that have been submitted to the service. */ - private val submittedTasks = meter.longCounterBuilder("tasks.submitted") + private val submittedTasks = meter.counterBuilder("tasks.submitted") .setDescription("Number of submitted tasks") .setUnit("1") .build() @@ -187,7 +193,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that are running. */ - private val runningTasks = meter.longUpDownCounterBuilder("tasks.active") + private val runningTasks = meter.upDownCounterBuilder("tasks.active") .setDescription("Number of tasks running") .setUnit("1") .build() @@ -195,7 +201,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that have finished running. */ - private val finishedTasks = meter.longCounterBuilder("tasks.finished") + private val finishedTasks = meter.counterBuilder("tasks.finished") .setDescription("Number of tasks that finished running") .setUnit("1") .build() diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt new file mode 100644 index 00000000..9ee3736e --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service + +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.trace.* +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import java.time.Clock +import java.util.* +import kotlin.collections.HashMap +import kotlin.collections.HashSet +import kotlin.math.max +import kotlin.math.min + +/** + * Helper tool to replay workflow trace. + */ +internal class TraceReplayer(private val trace: Trace) { + /** + * Replay the workload. + */ + public suspend fun replay(clock: Clock, service: WorkflowService) { + val jobs = parseTrace(trace) + + // Sort jobs by their arrival time + (jobs as MutableList<Job>).sortBy { it.metadata["WORKFLOW_SUBMIT_TIME"] as Long } + + // Wait until the trace is started + val startTime = jobs[0].metadata["WORKFLOW_SUBMIT_TIME"] as Long + delay(min(0L, startTime - clock.millis())) + + val offset = startTime - clock.millis() + + coroutineScope { + for (job in jobs) { + val submitTime = job.metadata["WORKFLOW_SUBMIT_TIME"] as Long + delay(max(0, (submitTime - offset) - clock.millis())) + + launch { service.run(job) } + } + } + } + + /** + * Convert [trace] into a list of [Job]s that can be submitted to the workflow service. + */ + public fun parseTrace(trace: Trace): List<Job> { + val table = checkNotNull(trace.getTable(TABLE_TASKS)) + val reader = table.newReader() + + val jobs = mutableMapOf<Long, Job>() + val tasks = mutableMapOf<Long, Task>() + val taskDependencies = mutableMapOf<Task, Set<Long>>() + + try { + while (reader.nextRow()) { + // Bag of tasks without workflow ID all share the same workflow + val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.get(TASK_WORKFLOW_ID).toLong() else 0L + val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) } + + val id = reader.get(TASK_ID).toLong() + val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS)) + reader.getInt(TASK_ALLOC_NCPUS) + else + reader.getInt(TASK_REQ_NCPUS) + val submitTime = reader.get(TASK_SUBMIT_TIME) + val runtime = reader.get(TASK_RUNTIME) + val flops: Long = 4000 * runtime.seconds * grantedCpus + val workload = SimFlopsWorkload(flops) + val task = Task( + UUID(0L, id), + "<unnamed>", + HashSet(), + mapOf( + "workload" to workload, + WORKFLOW_TASK_CORES to grantedCpus, + WORKFLOW_TASK_DEADLINE to runtime.toMillis() + ), + ) + + tasks[id] = task + taskDependencies[task] = reader.get(TASK_PARENTS).map { it.toLong() }.toSet() + + (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) } + (workflow.tasks as MutableSet<Task>).add(task) + } + + // Resolve dependencies for all tasks + for ((task, deps) in taskDependencies) { + for (dep in deps) { + val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" } + (task.dependencies as MutableSet<Task>).add(parent) + } + } + } finally { + reader.close() + } + + return jobs.values.toList() + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index a8d3a9e8..04f54e58 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -25,39 +25,41 @@ package org.opendc.workflow.service import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.weights.ProvisionedCoresWeigher +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.VCpuWeigher import org.opendc.compute.simulator.SimHost -import org.opendc.format.environment.sc18.Sc18EnvironmentReader -import org.opendc.format.trace.gwf.GwfTraceReader -import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider +import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.FlowEngine import org.opendc.telemetry.sdk.toOtelClock +import org.opendc.trace.Trace import org.opendc.workflow.service.internal.WorkflowServiceImpl import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy -import kotlin.math.max +import java.nio.file.Paths +import java.time.Duration +import java.util.* /** * Integration test suite for the [WorkflowServiceImpl]. */ -@DisplayName("WorkflowServiceImpl") -@OptIn(ExperimentalCoroutinesApi::class) -internal class WorkflowServiceIntegrationTest { +@DisplayName("WorkflowService") +internal class WorkflowServiceTest { /** * A large integration test where we check whether all tasks in some trace are executed correctly. */ @@ -68,34 +70,34 @@ internal class WorkflowServiceIntegrationTest { .setClock(clock.toOtelClock()) .build() - val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) - .use { it.read() } - .map { def -> - SimHost( - def.uid, - def.name, - def.model, - def.meta, - coroutineContext, - clock, - MeterProvider.noop().get("opendc-compute-simulator"), - SimSpaceSharedHypervisorProvider() - ) - } - - val meter = MeterProvider.noop().get("opendc-compute") + val interpreter = FlowEngine(coroutineContext, clock) + val machineModel = createMachineModel() + val hvProvider = SimSpaceSharedHypervisorProvider() + val hosts = List(4) { id -> + SimHost( + UUID(0, id.toLong()), + "node-$id", + machineModel, + emptyMap(), + coroutineContext, + interpreter, + MeterProvider.noop(), + hvProvider, + ) + } + val computeScheduler = FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(ProvisionedCoresWeigher() to -1.0) + filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), + weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) ) - val compute = ComputeService(coroutineContext, clock, meter, computeScheduler, schedulingQuantum = 1000) + val compute = ComputeService(coroutineContext, clock, MeterProvider.noop(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1)) hosts.forEach { compute.addHost(it) } val scheduler = WorkflowService( coroutineContext, clock, - meterProvider.get("opendc-workflow"), + meterProvider, compute.newClient(), mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, @@ -104,23 +106,13 @@ internal class WorkflowServiceIntegrationTest { taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), ) - val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) - var offset = Long.MIN_VALUE - - coroutineScope { - while (reader.hasNext()) { - val entry = reader.next() - - if (offset < 0) { - offset = entry.start - clock.millis() - } + val trace = Trace.open( + Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()), + format = "gwf" + ) + val replayer = TraceReplayer(trace) - delay(max(0, (entry.start - offset) - clock.millis())) - launch { - scheduler.run(entry.workload) - } - } - } + replayer.replay(clock, scheduler) hosts.forEach(SimHost::close) scheduler.close() @@ -133,10 +125,22 @@ internal class WorkflowServiceIntegrationTest { { assertEquals(0, metrics.jobsActive, "Not all submitted jobs started") }, { assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") }, { assertEquals(0, metrics.tasksActive, "Not all started tasks finished") }, - { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") } + { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, + { assertEquals(33213236L, clock.millis()) } ) } + /** + * The machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html + */ + private fun createMachineModel(): MachineModel { + val node = ProcessingNode("AMD", "am64", "EPYC 7742", 32) + val cpus = List(node.coreCount) { id -> ProcessingUnit(node, id, 3400.0) } + val memory = List(8) { MemoryUnit("Samsung", "Unknown", 2933.0, 16_000) } + + return MachineModel(cpus, memory) + } + class WorkflowMetrics { var jobsSubmitted = 0L var jobsActive = 0L |
