diff options
14 files changed, 364 insertions, 199 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index 90ee56cb..59203b66 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadRunner.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -40,25 +40,28 @@ import org.opendc.simulator.flow.FlowEngine import org.opendc.telemetry.compute.* import org.opendc.telemetry.sdk.toOtelClock import java.time.Clock +import java.time.Duration import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.math.max /** - * Helper class to simulated VM-based workloads in OpenDC. + * Helper class to simulate VM-based workloads in OpenDC. * * @param context [CoroutineContext] to run the simulation in. * @param clock [Clock] instance tracking simulation time. * @param scheduler [ComputeScheduler] implementation to use for the service. * @param failureModel A failure model to use for injecting failures. * @param interferenceModel The model to use for performance interference. + * @param schedulingQuantum The scheduling quantum of the scheduler. */ -public class ComputeWorkloadRunner( +public class ComputeServiceHelper( private val context: CoroutineContext, private val clock: Clock, scheduler: ComputeScheduler, private val failureModel: FailureModel? = null, private val interferenceModel: VmInterferenceModel? = null, + schedulingQuantum: Duration = Duration.ofMinutes(5) ) : AutoCloseable { /** * The [ComputeService] that has been configured by the manager. @@ -83,7 +86,7 @@ public class ComputeWorkloadRunner( private val hosts = mutableSetOf<SimHost>() init { - val (service, serviceMeterProvider) = createService(scheduler) + val (service, serviceMeterProvider) = createService(scheduler, schedulingQuantum) this._metricProducers.add(serviceMeterProvider) this.service = service } @@ -91,7 +94,7 @@ public class ComputeWorkloadRunner( /** * Converge a simulation of the [ComputeService] by replaying the workload trace given by [trace]. */ - public suspend fun run(trace: List<VirtualMachine>, seed: Long) { + public suspend fun run(trace: List<VirtualMachine>, seed: Long, submitImmediately: Boolean = false) { val random = Random(seed) val injector = failureModel?.createInjector(context, clock, service, random) val client = service.newClient() @@ -116,7 +119,10 @@ public class ComputeWorkloadRunner( // Make sure the trace entries are ordered by submission time assert(start - offset >= 0) { "Invalid trace order" } - delay(max(0, (start - offset) - now)) + + if (!submitImmediately) { + delay(max(0, (start - offset) - now)) + } launch { val workloadOffset = -offset + 300001 @@ -206,7 +212,7 @@ public class ComputeWorkloadRunner( /** * Construct a [ComputeService] instance. */ - private fun createService(scheduler: ComputeScheduler): Pair<ComputeService, SdkMeterProvider> { + private fun createService(scheduler: ComputeScheduler, schedulingQuantum: Duration): Pair<ComputeService, SdkMeterProvider> { val resource = Resource.builder() .put(ResourceAttributes.SERVICE_NAME, "opendc-compute") .build() @@ -216,7 +222,7 @@ public class ComputeWorkloadRunner( .setResource(resource) .build() - val service = ComputeService(context, clock, meterProvider, scheduler) + val service = ComputeService(context, clock, meterProvider, scheduler, schedulingQuantum) return service to meterProvider } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt index 74f9a1f8..de4300c7 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/topology/TopologyHelpers.kt @@ -23,12 +23,12 @@ @file:JvmName("TopologyHelpers") package org.opendc.compute.workload.topology -import org.opendc.compute.workload.ComputeWorkloadRunner +import org.opendc.compute.workload.ComputeServiceHelper /** - * Apply the specified [topology] to the given [ComputeWorkloadRunner]. + * Apply the specified [topology] to the given [ComputeServiceHelper]. */ -public fun ComputeWorkloadRunner.apply(topology: Topology, optimize: Boolean = false) { +public fun ComputeServiceHelper.apply(topology: Topology, optimize: Boolean = false) { val hosts = topology.resolve() for (spec in hosts) { registerHost(spec, optimize) diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt index 48a90985..4b35de95 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt @@ -67,7 +67,7 @@ class CapelinBenchmarks { filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)) ) - val runner = ComputeWorkloadRunner( + val runner = ComputeServiceHelper( coroutineContext, clock, computeScheduler diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 53c9de11..b548ae58 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -24,8 +24,8 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory import mu.KotlinLogging +import org.opendc.compute.workload.ComputeServiceHelper import org.opendc.compute.workload.ComputeWorkloadLoader -import org.opendc.compute.workload.ComputeWorkloadRunner import org.opendc.compute.workload.createComputeScheduler import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter import org.opendc.compute.workload.grid5000 @@ -109,7 +109,7 @@ abstract class Portfolio(name: String) : Experiment(name) { grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong())) else null - val runner = ComputeWorkloadRunner( + val runner = ComputeServiceHelper( coroutineContext, clock, computeScheduler, diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 1a8948f3..eedc3131 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -83,7 +83,7 @@ class CapelinIntegrationTest { @Test fun testLarge() = runBlockingSimulation { val workload = createTestWorkload(1.0) - val runner = ComputeWorkloadRunner( + val runner = ComputeServiceHelper( coroutineContext, clock, computeScheduler @@ -131,7 +131,7 @@ class CapelinIntegrationTest { val seed = 1 val workload = createTestWorkload(0.25, seed) - val simulator = ComputeWorkloadRunner( + val simulator = ComputeServiceHelper( coroutineContext, clock, computeScheduler @@ -180,7 +180,7 @@ class CapelinIntegrationTest { .read(perfInterferenceInput) .withSeed(seed.toLong()) - val simulator = ComputeWorkloadRunner( + val simulator = ComputeServiceHelper( coroutineContext, clock, computeScheduler, @@ -222,7 +222,7 @@ class CapelinIntegrationTest { @Test fun testFailures() = runBlockingSimulation { val seed = 1 - val simulator = ComputeWorkloadRunner( + val simulator = ComputeServiceHelper( coroutineContext, clock, computeScheduler, diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index a1bc869e..5d6bc37f 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -185,7 +185,7 @@ class RunnerCli : CliktCommand(name = "runner") { else null - val simulator = ComputeWorkloadRunner( + val simulator = ComputeServiceHelper( coroutineContext, clock, computeScheduler, diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index 43b64b15..4d8b7d7f 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -37,8 +37,9 @@ dependencies { implementation(projects.opendcUtils) implementation(libs.kotlin.logging) + testImplementation(projects.opendcWorkflow.opendcWorkflowWorkload) + testImplementation(projects.opendcCompute.opendcComputeWorkload) testImplementation(projects.opendcSimulator.opendcSimulatorCore) - testImplementation(projects.opendcCompute.opendcComputeSimulator) testImplementation(projects.opendcTrace.opendcTraceApi) testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) testRuntimeOnly(projects.opendcTrace.opendcTraceGwf) diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt deleted file mode 100644 index 9ee3736e..00000000 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflow.service - -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.trace.* -import org.opendc.workflow.api.Job -import org.opendc.workflow.api.Task -import org.opendc.workflow.api.WORKFLOW_TASK_CORES -import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE -import java.time.Clock -import java.util.* -import kotlin.collections.HashMap -import kotlin.collections.HashSet -import kotlin.math.max -import kotlin.math.min - -/** - * Helper tool to replay workflow trace. - */ -internal class TraceReplayer(private val trace: Trace) { - /** - * Replay the workload. - */ - public suspend fun replay(clock: Clock, service: WorkflowService) { - val jobs = parseTrace(trace) - - // Sort jobs by their arrival time - (jobs as MutableList<Job>).sortBy { it.metadata["WORKFLOW_SUBMIT_TIME"] as Long } - - // Wait until the trace is started - val startTime = jobs[0].metadata["WORKFLOW_SUBMIT_TIME"] as Long - delay(min(0L, startTime - clock.millis())) - - val offset = startTime - clock.millis() - - coroutineScope { - for (job in jobs) { - val submitTime = job.metadata["WORKFLOW_SUBMIT_TIME"] as Long - delay(max(0, (submitTime - offset) - clock.millis())) - - launch { service.run(job) } - } - } - } - - /** - * Convert [trace] into a list of [Job]s that can be submitted to the workflow service. - */ - public fun parseTrace(trace: Trace): List<Job> { - val table = checkNotNull(trace.getTable(TABLE_TASKS)) - val reader = table.newReader() - - val jobs = mutableMapOf<Long, Job>() - val tasks = mutableMapOf<Long, Task>() - val taskDependencies = mutableMapOf<Task, Set<Long>>() - - try { - while (reader.nextRow()) { - // Bag of tasks without workflow ID all share the same workflow - val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.get(TASK_WORKFLOW_ID).toLong() else 0L - val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) } - - val id = reader.get(TASK_ID).toLong() - val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS)) - reader.getInt(TASK_ALLOC_NCPUS) - else - reader.getInt(TASK_REQ_NCPUS) - val submitTime = reader.get(TASK_SUBMIT_TIME) - val runtime = reader.get(TASK_RUNTIME) - val flops: Long = 4000 * runtime.seconds * grantedCpus - val workload = SimFlopsWorkload(flops) - val task = Task( - UUID(0L, id), - "<unnamed>", - HashSet(), - mapOf( - "workload" to workload, - WORKFLOW_TASK_CORES to grantedCpus, - WORKFLOW_TASK_DEADLINE to runtime.toMillis() - ), - ) - - tasks[id] = task - taskDependencies[task] = reader.get(TASK_PARENTS).map { it.toLong() }.toSet() - - (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) } - (workflow.tasks as MutableSet<Task>).add(task) - } - - // Resolve dependencies for all tasks - for ((task, deps) in taskDependencies) { - for (dep in deps) { - val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" } - (task.dependencies as MutableSet<Task>).add(parent) - } - } - } finally { - reader.close() - } - - return jobs.values.toList() - } -} diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 04f54e58..d80c098b 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -22,41 +22,41 @@ package org.opendc.workflow.service -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.SdkMeterProvider import io.opentelemetry.sdk.metrics.export.MetricProducer import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.VCpuWeigher -import org.opendc.compute.simulator.SimHost +import org.opendc.compute.workload.ComputeServiceHelper +import org.opendc.compute.workload.topology.HostSpec import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.power.ConstantPowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.flow.FlowEngine -import org.opendc.telemetry.sdk.toOtelClock import org.opendc.trace.Trace -import org.opendc.workflow.service.internal.WorkflowServiceImpl import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode import org.opendc.workflow.service.scheduler.job.NullJobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.SubmissionTimeJobOrderPolicy import org.opendc.workflow.service.scheduler.task.NullTaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.SubmissionTimeTaskOrderPolicy +import org.opendc.workflow.workload.WorkflowSchedulerSpec +import org.opendc.workflow.workload.WorkflowServiceHelper +import org.opendc.workflow.workload.toJobs import java.nio.file.Paths import java.time.Duration import java.util.* /** - * Integration test suite for the [WorkflowServiceImpl]. + * Integration test suite for the [WorkflowService]. */ @DisplayName("WorkflowService") internal class WorkflowServiceTest { @@ -65,60 +65,39 @@ internal class WorkflowServiceTest { */ @Test fun testTrace() = runBlockingSimulation { - val meterProvider: MeterProvider = SdkMeterProvider - .builder() - .setClock(clock.toOtelClock()) - .build() - - val interpreter = FlowEngine(coroutineContext, clock) - val machineModel = createMachineModel() - val hvProvider = SimSpaceSharedHypervisorProvider() - val hosts = List(4) { id -> - SimHost( - UUID(0, id.toLong()), - "node-$id", - machineModel, - emptyMap(), - coroutineContext, - interpreter, - MeterProvider.noop(), - hvProvider, - ) - } - + // Configure the ComputeService that is responsible for mapping virtual machines onto physical hosts + val HOST_COUNT = 4 val computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) ) - val compute = ComputeService(coroutineContext, clock, MeterProvider.noop(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1)) + val computeHelper = ComputeServiceHelper(coroutineContext, clock, computeScheduler, schedulingQuantum = Duration.ofSeconds(1)) - hosts.forEach { compute.addHost(it) } + repeat(HOST_COUNT) { computeHelper.registerHost(createHostSpec(it)) } - val scheduler = WorkflowService( - coroutineContext, - clock, - meterProvider, - compute.newClient(), - mode = WorkflowSchedulerMode.Batch(100), + // Configure the WorkflowService that is responsible for scheduling the workflow tasks onto machines + val workflowScheduler = WorkflowSchedulerSpec( + batchMode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, jobOrderPolicy = SubmissionTimeJobOrderPolicy(), taskEligibilityPolicy = NullTaskEligibilityPolicy, taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), ) + val workflowHelper = WorkflowServiceHelper(coroutineContext, clock, computeHelper.service.newClient(), workflowScheduler) - val trace = Trace.open( - Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()), - format = "gwf" - ) - val replayer = TraceReplayer(trace) - - replayer.replay(clock, scheduler) + try { + val trace = Trace.open( + Paths.get(checkNotNull(WorkflowServiceTest::class.java.getResource("/trace.gwf")).toURI()), + format = "gwf" + ) - hosts.forEach(SimHost::close) - scheduler.close() - compute.close() + workflowHelper.replay(trace.toJobs()) + } finally { + workflowHelper.close() + computeHelper.close() + } - val metrics = collectMetrics(meterProvider as MetricProducer) + val metrics = collectMetrics(workflowHelper.metricProducer) assertAll( { assertEquals(758, metrics.jobsSubmitted, "No jobs submitted") }, @@ -126,19 +105,29 @@ internal class WorkflowServiceTest { { assertEquals(metrics.jobsSubmitted, metrics.jobsFinished, "Not all started jobs finished") }, { assertEquals(0, metrics.tasksActive, "Not all started tasks finished") }, { assertEquals(metrics.tasksSubmitted, metrics.tasksFinished, "Not all started tasks finished") }, - { assertEquals(33213236L, clock.millis()) } + { assertEquals(33214236L, clock.millis()) { "Total duration incorrect" } } ) } /** - * The machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html + * Construct a [HostSpec] for a simulated host. */ - private fun createMachineModel(): MachineModel { + private fun createHostSpec(uid: Int): HostSpec { + // Machine model based on: https://www.spec.org/power_ssj2008/results/res2020q1/power_ssj2008-20191125-01012.html val node = ProcessingNode("AMD", "am64", "EPYC 7742", 32) - val cpus = List(node.coreCount) { id -> ProcessingUnit(node, id, 3400.0) } + val cpus = List(node.coreCount) { ProcessingUnit(node, it, 3400.0) } val memory = List(8) { MemoryUnit("Samsung", "Unknown", 2933.0, 16_000) } - return MachineModel(cpus, memory) + val machineModel = MachineModel(cpus, memory) + + return HostSpec( + UUID(0, uid.toLong()), + "host-$uid", + emptyMap(), + machineModel, + SimplePowerDriver(ConstantPowerModel(0.0)), + SimSpaceSharedHypervisorProvider() + ) } class WorkflowMetrics { diff --git a/opendc-workflow/opendc-workflow-workload/build.gradle.kts b/opendc-workflow/opendc-workflow-workload/build.gradle.kts new file mode 100644 index 00000000..dfb77a39 --- /dev/null +++ b/opendc-workflow/opendc-workflow-workload/build.gradle.kts @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Support library for simulating workflows with OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcWorkflow.opendcWorkflowService) + + implementation(projects.opendcSimulator.opendcSimulatorCompute) + implementation(projects.opendcTrace.opendcTraceApi) + implementation(projects.opendcTelemetry.opendcTelemetrySdk) + implementation(libs.opentelemetry.semconv) +} diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt new file mode 100644 index 00000000..73995d08 --- /dev/null +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("TraceHelpers") +package org.opendc.workflow.workload + +import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.trace.* +import org.opendc.workflow.api.Job +import org.opendc.workflow.api.Task +import org.opendc.workflow.api.WORKFLOW_TASK_CORES +import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import java.util.* +import kotlin.collections.HashMap +import kotlin.collections.HashSet +import kotlin.math.min + +/** + * Convert [Trace] into a list of [Job]s that can be submitted to the workflow service. + */ +public fun Trace.toJobs(): List<Job> { + val table = checkNotNull(getTable(TABLE_TASKS)) + val reader = table.newReader() + + val jobs = mutableMapOf<Long, Job>() + val tasks = mutableMapOf<Long, Task>() + val taskDependencies = mutableMapOf<Task, Set<Long>>() + + try { + while (reader.nextRow()) { + // Bag of tasks without workflow ID all share the same workflow + val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.get(TASK_WORKFLOW_ID).toLong() else 0L + val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) } + + val id = reader.get(TASK_ID).toLong() + val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS)) + reader.getInt(TASK_ALLOC_NCPUS) + else + reader.getInt(TASK_REQ_NCPUS) + val submitTime = reader.get(TASK_SUBMIT_TIME) + val runtime = reader.get(TASK_RUNTIME) + val flops: Long = 4000 * runtime.seconds * grantedCpus + val workload = SimFlopsWorkload(flops) + val task = Task( + UUID(0L, id), + "<unnamed>", + HashSet(), + mapOf( + "workload" to workload, + WORKFLOW_TASK_CORES to grantedCpus, + WORKFLOW_TASK_DEADLINE to runtime.toMillis() + ), + ) + + tasks[id] = task + taskDependencies[task] = reader.get(TASK_PARENTS).map { it.toLong() }.toSet() + + (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) } + (workflow.tasks as MutableSet<Task>).add(task) + } + + // Resolve dependencies for all tasks + for ((task, deps) in taskDependencies) { + for (dep in deps) { + val parent = requireNotNull(tasks[dep]) { "Dependency task with id $dep not found" } + (task.dependencies as MutableSet<Task>).add(parent) + } + } + } finally { + reader.close() + } + + return jobs.values.toList() +} diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt new file mode 100644 index 00000000..b22e16d9 --- /dev/null +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowSchedulerSpec.kt @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.workload + +import org.opendc.workflow.service.scheduler.WorkflowSchedulerMode +import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy +import org.opendc.workflow.service.scheduler.job.JobOrderPolicy +import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy +import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy + +/** + * Specification of the scheduling policies of the workflow scheduler. + */ +public data class WorkflowSchedulerSpec( + val batchMode: WorkflowSchedulerMode, + val jobAdmissionPolicy: JobAdmissionPolicy, + val jobOrderPolicy: JobOrderPolicy, + val taskEligibilityPolicy: TaskEligibilityPolicy, + val taskOrderPolicy: TaskOrderPolicy, +) diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt new file mode 100644 index 00000000..236a036b --- /dev/null +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.workload + +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import org.opendc.compute.api.ComputeClient +import org.opendc.telemetry.sdk.toOtelClock +import org.opendc.workflow.api.Job +import org.opendc.workflow.service.WorkflowService +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * Helper class to simulate workflow-based workloads in OpenDC. + * + * @param context [CoroutineContext] to run the simulation in. + * @param clock [Clock] instance tracking simulation time. + * @param computeClient A [ComputeClient] instance to communicate with the cluster scheduler. + * @param schedulerSpec The configuration of the workflow scheduler. + */ +public class WorkflowServiceHelper( + private val context: CoroutineContext, + private val clock: Clock, + private val computeClient: ComputeClient, + private val schedulerSpec: WorkflowSchedulerSpec + ) : AutoCloseable { + /** + * The [WorkflowService] that is constructed by this runner. + */ + public val service: WorkflowService + + /** + * The [MetricProducer] exposed by the [WorkflowService]. + */ + public val metricProducer: MetricProducer + + init { + val resource = Resource.builder() + .put(ResourceAttributes.SERVICE_NAME, "opendc-workflow") + .build() + + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .build() + metricProducer = meterProvider + + service = WorkflowService( + context, + clock, + meterProvider, + computeClient, + mode = schedulerSpec.batchMode, + jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy, + jobOrderPolicy = schedulerSpec.jobOrderPolicy, + taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy, + taskOrderPolicy = schedulerSpec.taskOrderPolicy, + ) + } + + /** + * Run the specified list of [jobs] using the workflow service and suspend execution until all jobs have + * finished. + */ + public suspend fun replay(jobs: List<Job>) { + // Sort jobs by their arrival time + val orderedJobs = jobs.sortedBy { it.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long } + if (orderedJobs.isEmpty()) { + return + } + + // Wait until the trace is started + val startTime = orderedJobs[0].metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long + var offset = 0L + + if (startTime != Long.MAX_VALUE) { + offset = startTime - clock.millis() + delay(offset.coerceAtLeast(0)) + } + + coroutineScope { + for (job in orderedJobs) { + val submitTime = job.metadata.getOrDefault("WORKFLOW_SUBMIT_TIME", Long.MAX_VALUE) as Long + if (submitTime != Long.MAX_VALUE) { + delay(((submitTime - offset) - clock.millis()).coerceAtLeast(0)) + } + + launch { service.run(job) } + } + } + } + + override fun close() { + computeClient.close() + service.close() + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index d9b3d940..170a687d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -28,6 +28,7 @@ include(":opendc-compute:opendc-compute-simulator") include(":opendc-compute:opendc-compute-workload") include(":opendc-workflow:opendc-workflow-api") include(":opendc-workflow:opendc-workflow-service") +include(":opendc-workflow:opendc-workflow-workload") include(":opendc-faas:opendc-faas-api") include(":opendc-faas:opendc-faas-service") include(":opendc-faas:opendc-faas-simulator") |
