diff options
Diffstat (limited to 'opendc-workflow')
| -rw-r--r-- | opendc-workflow/opendc-workflow-service/build.gradle.kts | 4 | ||||
| -rw-r--r-- | opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt | 8 | ||||
| -rw-r--r-- | opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt | 20 | ||||
| -rw-r--r-- | opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt | 127 | ||||
| -rw-r--r-- | opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt (renamed from opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt) | 104 |
5 files changed, 200 insertions, 63 deletions
diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index 5e73222c..43b64b15 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -39,8 +39,8 @@ dependencies { testImplementation(projects.opendcSimulator.opendcSimulatorCore) testImplementation(projects.opendcCompute.opendcComputeSimulator) - testImplementation(projects.opendcFormat) + testImplementation(projects.opendcTrace.opendcTraceApi) testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) - testImplementation(libs.jackson.module.kotlin) + testRuntimeOnly(projects.opendcTrace.opendcTraceGwf) testRuntimeOnly(libs.log4j.slf4j) } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index d3358ef1..a0248a93 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -22,7 +22,7 @@ package org.opendc.workflow.service -import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient import org.opendc.workflow.api.Job import org.opendc.workflow.service.internal.WorkflowServiceImpl @@ -62,7 +62,7 @@ public interface WorkflowService : AutoCloseable { * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. * @param tracer The event tracer to use. - * @param meter The meter to use. + * @param meterProvider The meter provider to use. * @param compute The compute client to use. * @param mode The scheduling mode to use. * @param jobAdmissionPolicy The job admission policy to use. @@ -73,7 +73,7 @@ public interface WorkflowService : AutoCloseable { public operator fun invoke( context: CoroutineContext, clock: Clock, - meter: Meter, + meterProvider: MeterProvider, compute: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -84,7 +84,7 @@ public interface WorkflowService : AutoCloseable { return WorkflowServiceImpl( context, clock, - meter, + meterProvider, compute, mode, jobAdmissionPolicy, diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 32191b8f..a0fd3fad 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -23,6 +23,7 @@ package org.opendc.workflow.service.internal import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import kotlinx.coroutines.flow.map import mu.KotlinLogging @@ -48,7 +49,7 @@ import kotlin.coroutines.resume public class WorkflowServiceImpl( context: CoroutineContext, internal val clock: Clock, - private val meter: Meter, + meterProvider: MeterProvider, private val computeClient: ComputeClient, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -67,6 +68,11 @@ public class WorkflowServiceImpl( private val logger = KotlinLogging.logger {} /** + * The [Meter] to collect metrics of this service. + */ + private val meter = meterProvider.get("org.opendc.workflow.service") + + /** * The incoming jobs ready to be processed by the scheduler. */ internal val incomingJobs: MutableSet<JobState> = linkedSetOf() @@ -155,7 +161,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that have been submitted to the service. */ - private val submittedJobs = meter.longCounterBuilder("jobs.submitted") + private val submittedJobs = meter.counterBuilder("jobs.submitted") .setDescription("Number of submitted jobs") .setUnit("1") .build() @@ -163,7 +169,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that are running. */ - private val runningJobs = meter.longUpDownCounterBuilder("jobs.active") + private val runningJobs = meter.upDownCounterBuilder("jobs.active") .setDescription("Number of jobs running") .setUnit("1") .build() @@ -171,7 +177,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that have finished running. */ - private val finishedJobs = meter.longCounterBuilder("jobs.finished") + private val finishedJobs = meter.counterBuilder("jobs.finished") .setDescription("Number of jobs that finished running") .setUnit("1") .build() @@ -179,7 +185,7 @@ public class WorkflowServiceImpl( /** * The number of tasks that have been submitted to the service. */ - private val submittedTasks = meter.longCounterBuilder("tasks.submitted") + private val submittedTasks = meter.counterBuilder("tasks.submitted") .setDescription("Number of submitted tasks") .setUnit("1") .build() @@ -187,7 +193,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that are running. */ - private val runningTasks = meter.longUpDownCounterBuilder("tasks.active") + private val runningTasks = meter.upDownCounterBuilder("tasks.active") .setDescription("Number of tasks running") .setUnit("1") .build() @@ -195,7 +201,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that have finished running. */ - private val finishedTasks = meter.longCounterBuilder("tasks.finished") + private val finishedTasks = meter.counterBuilder("tasks.finished") .setDescription("Number of tasks that finished running") .setUnit("1") .build() diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt new file mode 100644 index 00000000..9ee3736e --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service + +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.trace.* +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import java.time.Clock +import java.util.* +import kotlin.collections.HashMap +import kotlin.collections.HashSet +import kotlin.math.max +import kotlin.math.min + +/** + * Helper tool to replay workflow trace. + */ +internal class TraceReplayer(private val trace: Trace) { + /** + * Replay the workload. + */ + public suspend fun replay(clock: Clock, service: WorkflowService) { + val jobs = parseTrace(trace) + + // Sort jobs by their arrival time + (jobs as MutableList<Job>).sortBy { it.metadata["WORKFLOW_SUBMIT_TIME"] as Long } + + // Wait until the trace is started + val startTime = jobs[0].metadata["WORKFLOW_SUBMIT_TIME"] as Long + delay(min(0L, startTime - clock.millis())) + + val offset = startTime - clock.millis() + + coroutineScope { + for (job in jobs) { + val submitTime = job.metadata["WORKFLOW_SUBMIT_TIME"] as Long + delay(max(0, (submitTime - offset) - clock.millis())) + + launch { service.run(job) } + } + } + } + + /** + * Convert [trace] into a list of [Job]s that can be submitted to the workflow service. + */ + public fun parseTrace(trace: Trace): List<Job> { + val table = checkNotNull(trace.getTable(TABLE_TASKS)) + val reader = table.newReader() + + val jobs = mutableMapOf<Long, Job>() + val tasks = mutableMapOf<Long, Task>() + val taskDependencies = mutableMapOf<Task, Set<Long>>() + + try { + while (reader.nextRow()) { + // Bag of tasks without workflow ID all share the same workflow + val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.get(TASK_WORKFLOW_ID).toLong() else 0L + val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) } + + val id = reader.get(TASK_ID).toLong() + val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS)) + reader.getInt(TASK_ALLOC_NCPUS) + else + reader.getInt(TASK_REQ_NCPUS) + val submitTime = reader.get(TASK_SUBMIT_TIME) + val runtime = reader.get(TASK_RUNTIME) + val flops: Long = 4000 * runtime.seconds * grantedCpus + val workload = SimFlopsWorkload(flops) + val task = Task( + UUID(0L, id), + "<unnamed>", + HashSet(), + mapOf( + "workload" to workload, + WORKFLOW_TASK_CORES to grantedCpus, + WORKFLOW_TASK_DEADLINE to runtime.toMillis() + ), + ) + + tasks[id] = task + taskDependencies[task] = reader.get(TASK_PARENTS).map { it.toLong() }.toSet() + + (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) } + (workflow.tasks as MutableSet<Task>).add(task) + } + + // Resolve dependencies for all tasks + for ((task, deps) in taskDependencies) { + for (dep in deps) { + val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" } + (task.dependencies as MutableSet<Task>).add(parent) + } + } + } finally { + reader.close() + } + + return jobs.values.toList() + } +} diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index a8d3a9e8..04f54e58 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceIntegrationTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -25,39 +25,41 @@ package org.opendc.workflow.service import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.weights.ProvisionedCoresWeigher +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.VCpuWeigher import org.opendc.compute.simulator.SimHost -import org.opendc.format.environment.sc18.Sc18EnvironmentReader -import org.opendc.format.trace.gwf.GwfTraceReader -import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider +import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.FlowEngine import org.opendc.telemetry.sdk.toOtelClock +import org.opendc.trace.Trace import org.opendc.workflow.service.internal.WorkflowServiceImpl import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy -import kotlin.math.max +import java.nio.file.Paths +import java.time.Duration +import java.util.* /** * Integration test suite for the [WorkflowServiceImpl]. */ -@DisplayName("WorkflowServiceImpl") -@OptIn(ExperimentalCoroutinesApi::class) -internal class WorkflowServiceIntegrationTest { +@DisplayName("WorkflowService") +internal class WorkflowServiceTest { /** * A large integration test where we check whether all tasks in some trace are executed correctly. */ @@ -68,34 +70,34 @@ internal class WorkflowServiceIntegrationTest { .setClock(clock.toOtelClock()) .build() - val hosts = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) - .use { it.read() } - .map { def -> - SimHost( - def.uid, - def.name, - def.model, - def.meta, - coroutineContext, - clock, - MeterProvider.noop().get("opendc-compute-simulator"), - SimSpaceSharedHypervisorProvider() - ) - } - - val meter = MeterProvider.noop().get("opendc-compute") + val interpreter = FlowEngine(coroutineContext, clock) + val machineModel = createMachineModel() + val hvProvider = SimSpaceSharedHypervisorProvider() + val hosts = List(4) { id -> + SimHost( + UUID(0, id.toLong()), + "node-$id", + machineModel, + emptyMap(), + coroutineContext, + interpreter, + MeterProvider.noop(), + hvProvider, + ) + } + val computeScheduler = FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(ProvisionedCoresWeigher() to -1.0) + filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), + weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) ) - val compute = ComputeService(coroutineContext, clock, meter, computeScheduler, schedulingQuantum = 1000) + val compute = ComputeService(coroutineContext, clock, MeterProvider.noop(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1)) hosts.forEach { compute.addHost(it) } val scheduler = WorkflowService( coroutineContext, clock, - meterProvider.get("opendc-workflow"), + meterProvider, compute.newClient(), mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, @@ -104,23 +106,13 @@ internal class WorkflowServiceIntegrationTest { taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), ) - val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) - var offset = Long.MIN_VALUE - - coroutineScope { - while (reader.hasNext()) { - val entry = reader.next() - - if (offset < 0) { - offset = entry.start - clock.millis() - } + val trace = Trace.open( + Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()), + format = "gwf" + ) + val replayer = TraceReplayer(trace) - delay(max(0, (entry.start - offset) - clock.millis())) - launch { - scheduler.run(entry.workload) - } - } - } + replayer.replay(clock, scheduler) hosts.forEach(SimHost::close) scheduler.close() @@ -133,10 +125,22 @@ internal class WorkflowServiceIntegrationTest { { assertEquals(0, metrics.jobsActive, "Not all submitted jobs started") }, { assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") }, { assertEquals(0, metrics.tasksActive, "Not all started tasks finished") }, - { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") } + { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, + { assertEquals(33213236L, clock.millis()) } ) } + /** + * The machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html + */ + private fun createMachineModel(): MachineModel { + val node = ProcessingNode("AMD", "am64", "EPYC 7742", 32) + val cpus = List(node.coreCount) { id -> ProcessingUnit(node, id, 3400.0) } + val memory = List(8) { MemoryUnit("Samsung", "Unknown", 2933.0, 16_000) } + + return MachineModel(cpus, memory) + } + class WorkflowMetrics { var jobsSubmitted = 0L var jobsActive = 0L |
