From ebc6cdd08a0d8dac045305839421b726ae5c91b3 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 29 Jul 2022 17:06:24 +0200 Subject: fix(web/runner): Use correct context ClassLoader for ForkJoinPool This change updates the OpenDC web runner implementation to use the correct context ClassLoader for simulation jobs running inside a ForkJoinPool. By default, the ForkJoinPool will use the system class loader which does not have access to the services needed by the web runner. --- .../src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) (limited to 'opendc-web/opendc-web-runner/src') diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index 9c9a866d..ccc7f03a 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -45,6 +45,7 @@ import java.io.File import java.time.Duration import java.util.* import java.util.concurrent.* +import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory /** * Class to execute the pending jobs via the OpenDC web API. @@ -81,7 +82,7 @@ public class OpenDCRunner( /** * The [ForkJoinPool] that is used to execute the simulation jobs. */ - private val pool = ForkJoinPool(parallelism) + private val pool = ForkJoinPool(parallelism, RunnerThreadFactory(Thread.currentThread().contextClassLoader), null, false) /** * A [ScheduledExecutorService] to manage the heartbeat of simulation jobs as well as tracking the deadline of @@ -303,4 +304,15 @@ public class OpenDCRunner( override fun toString(): String = "WebRunnerTopologyFactory" } } + + /** + * A custom [ForkJoinWorkerThreadFactory] that uses the [ClassLoader] of specified by the runner. + */ + private class RunnerThreadFactory(private val classLoader: ClassLoader) : ForkJoinWorkerThreadFactory { + override fun newThread(pool: ForkJoinPool): ForkJoinWorkerThread = object : ForkJoinWorkerThread(pool) { + init { + contextClassLoader = classLoader + } + } + } } -- cgit v1.2.3 From fab42945e8e5a1e9a8296f5e4bcbe476a6a5bbd6 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 30 Jul 2022 13:11:38 +0200 Subject: fix(web/runner): Gracefully exit on interrupt This change updates the web runner implementation to gracefully exit the current thread when interrupted. --- .../src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) (limited to 'opendc-web/opendc-web-runner/src') diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index ccc7f03a..7e0133d0 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -98,11 +98,6 @@ public class OpenDCRunner( override fun run() { try { while (true) { - // Check if anyone has interrupted the thread - if (Thread.interrupted()) { - throw InterruptedException() - } - val job = manager.findNext() if (job == null) { Thread.sleep(pollInterval.toMillis()) @@ -120,6 +115,8 @@ public class OpenDCRunner( pool.submit(JobAction(job)) } + } catch (_: InterruptedException) { + // Gracefully exit when the thread is interrupted } finally { workloadLoader.reset() -- cgit v1.2.3 From 5deb055565606c94fc29bd594832586f3dfdf3de Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 2 Aug 2022 10:07:17 +0200 Subject: fix(web/runner): Prevent reporting NaN values This change fixes an issue with the OpenDC web runner where it would report NaN values for some of the metrics due to the topology being empty. This in turn causes issues in the frontend. --- .../main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'opendc-web/opendc-web-runner/src') diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt index 01002c70..4c3d1cfa 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt @@ -105,9 +105,9 @@ internal class WebComputeMonitor : ComputeMonitor { hostAggregateMetrics.totalIdleTime, hostAggregateMetrics.totalStealTime, hostAggregateMetrics.totalLostTime, - hostMetrics.map { it.value.cpuUsage / it.value.count }.average(), - hostMetrics.map { it.value.cpuDemand / it.value.count }.average(), - hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(), + hostMetrics.map { it.value.cpuUsage / it.value.count }.average().let { if (it.isNaN()) 0.0 else it }, + hostMetrics.map { it.value.cpuDemand / it.value.count }.average().let { if (it.isNaN()) 0.0 else it }, + hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average().let { if (it.isNaN()) 0.0 else it }, hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, hostAggregateMetrics.totalPowerDraw, hostAggregateMetrics.totalFailureSlices.roundToLong(), -- cgit v1.2.3 From a424aa5e81c31f8cc6ba8846f0a6af29623588d4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 3 Aug 2022 11:11:58 +0200 Subject: refactor(web/runner): Support pluggable job manager This change introduces a new interface `JobManager` that is responsible for communicating with the backend about the available jobs and updating their status when the runner is simulating a job. This manager can be injected into the `OpenDCRunner` class and allows users to provide different sources for the jobs, not only the current REST API. --- .../src/cli/kotlin/org/opendc/web/runner/Main.kt | 3 +- .../kotlin/org/opendc/web/runner/JobManager.kt | 68 ++++++++++++++++ .../kotlin/org/opendc/web/runner/OpenDCRunner.kt | 35 ++++++--- .../org/opendc/web/runner/internal/JobManager.kt | 90 ---------------------- .../opendc/web/runner/internal/JobManagerImpl.kt | 58 ++++++++++++++ 5 files changed, 152 insertions(+), 102 deletions(-) create mode 100644 opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt delete mode 100644 opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt create mode 100644 opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt (limited to 'opendc-web/opendc-web-runner/src') diff --git a/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt index 348a838c..4cfbdd7c 100644 --- a/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt @@ -114,7 +114,8 @@ class RunnerCli : CliktCommand(name = "opendc-runner") { logger.info { "Starting OpenDC web runner" } val client = OpenDCRunnerClient(baseUrl = apiUrl, OpenIdAuthController(authDomain, authClientId, authClientSecret, authAudience)) - val runner = OpenDCRunner(client, tracePath, parallelism = parallelism) + val manager = JobManager(client) + val runner = OpenDCRunner(manager, tracePath, parallelism = parallelism) logger.info { "Watching for queued scenarios" } runner.run() diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt new file mode 100644 index 00000000..50aa03d8 --- /dev/null +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.runner + +import org.opendc.web.client.runner.OpenDCRunnerClient +import org.opendc.web.proto.runner.Job +import org.opendc.web.runner.internal.JobManagerImpl + +/** + * Interface used by the [OpenDCRunner] to manage the available jobs to be processed. + */ +public interface JobManager { + /** + * Find the next job that the simulator needs to process. + */ + public fun findNext(): Job? + + /** + * Claim the simulation job with the specified id. + */ + public fun claim(id: Long): Boolean + + /** + * Update the heartbeat of the specified job. + */ + public fun heartbeat(id: Long) + + /** + * Mark the job as failed. + */ + public fun fail(id: Long) + + /** + * Persist the specified results for the specified job. + */ + public fun finish(id: Long, results: Map) + + public companion object { + /** + * Create a [JobManager] given the specified [client]. + */ + @JvmStatic + @JvmName("create") + public operator fun invoke(client: OpenDCRunnerClient): JobManager { + return JobManagerImpl(client) + } + } +} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index 7e0133d0..c958bdb2 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -36,10 +36,8 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.LinearPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.web.client.runner.OpenDCRunnerClient import org.opendc.web.proto.runner.Job import org.opendc.web.proto.runner.Scenario -import org.opendc.web.runner.internal.JobManager import org.opendc.web.runner.internal.WebComputeMonitor import java.io.File import java.time.Duration @@ -50,14 +48,14 @@ import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory /** * Class to execute the pending jobs via the OpenDC web API. * - * @param client The [OpenDCRunnerClient] to connect to the OpenDC web API. + * @param manager The underlying [JobManager] to manage the available jobs. * @param tracePath The directory where the traces are located. * @param jobTimeout The maximum duration of a simulation job. * @param pollInterval The interval to poll the API with. * @param heartbeatInterval The interval to send a heartbeat to the API server. */ public class OpenDCRunner( - client: OpenDCRunnerClient, + private val manager: JobManager, private val tracePath: File, parallelism: Int = Runtime.getRuntime().availableProcessors(), private val jobTimeout: Duration = Duration.ofMillis(10), @@ -69,11 +67,6 @@ public class OpenDCRunner( */ private val logger = KotlinLogging.logger {} - /** - * Helper class to manage the available jobs. - */ - private val manager = JobManager(client) - /** * Helper class to load the workloads. */ @@ -142,12 +135,32 @@ public class OpenDCRunner( try { val topology = convertTopology(scenario.topology) val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> SimulationTask(scenario, repeat, topology) } - val results = invokeAll(jobs) + val results = invokeAll(jobs).map { it.rawResult } logger.info { "Finished simulation for job $id" } heartbeat.cancel(true) - manager.finish(id, results.map { it.rawResult }) + + manager.finish( + id, + mapOf( + "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime }, + "total_granted_burst" to results.map { it.totalActiveTime }, + "total_overcommitted_burst" to results.map { it.totalStealTime }, + "total_interfered_burst" to results.map { it.totalLostTime }, + "mean_cpu_usage" to results.map { it.meanCpuUsage }, + "mean_cpu_demand" to results.map { it.meanCpuDemand }, + "mean_num_deployed_images" to results.map { it.meanNumDeployedImages }, + "max_num_deployed_images" to results.map { it.maxNumDeployedImages }, + "total_power_draw" to results.map { it.totalPowerDraw }, + "total_failure_slices" to results.map { it.totalFailureSlices }, + "total_failure_vm_slices" to results.map { it.totalFailureVmSlices }, + "total_vms_submitted" to results.map { it.totalVmsSubmitted }, + "total_vms_queued" to results.map { it.totalVmsQueued }, + "total_vms_finished" to results.map { it.totalVmsFinished }, + "total_vms_failed" to results.map { it.totalVmsFailed } + ) + ) } catch (e: Exception) { // Check whether the job failed due to exceeding its time budget if (Thread.interrupted()) { diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt deleted file mode 100644 index 99b8aaf1..00000000 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.web.runner.internal - -import org.opendc.web.client.runner.OpenDCRunnerClient -import org.opendc.web.proto.JobState -import org.opendc.web.proto.runner.Job - -/** - * Helper class to manage the queue of jobs that need to be simulated. - */ -internal class JobManager(private val client: OpenDCRunnerClient) { - /** - * Find the next job that the simulator needs to process. - */ - fun findNext(): Job? { - return client.jobs.queryPending().firstOrNull() - } - - /** - * Claim the simulation job with the specified id. - */ - fun claim(id: Long): Boolean { - client.jobs.update(id, Job.Update(JobState.CLAIMED)) // TODO Handle conflict - return true - } - - /** - * Update the heartbeat of the specified scenario. - */ - fun heartbeat(id: Long) { - client.jobs.update(id, Job.Update(JobState.RUNNING)) - } - - /** - * Mark the scenario as failed. - */ - fun fail(id: Long) { - client.jobs.update(id, Job.Update(JobState.FAILED)) - } - - /** - * Persist the specified results. - */ - fun finish(id: Long, results: List) { - client.jobs.update( - id, - Job.Update( - JobState.FINISHED, - mapOf( - "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime }, - "total_granted_burst" to results.map { it.totalActiveTime }, - "total_overcommitted_burst" to results.map { it.totalStealTime }, - "total_interfered_burst" to results.map { it.totalLostTime }, - "mean_cpu_usage" to results.map { it.meanCpuUsage }, - "mean_cpu_demand" to results.map { it.meanCpuDemand }, - "mean_num_deployed_images" to results.map { it.meanNumDeployedImages }, - "max_num_deployed_images" to results.map { it.maxNumDeployedImages }, - "total_power_draw" to results.map { it.totalPowerDraw }, - "total_failure_slices" to results.map { it.totalFailureSlices }, - "total_failure_vm_slices" to results.map { it.totalFailureVmSlices }, - "total_vms_submitted" to results.map { it.totalVmsSubmitted }, - "total_vms_queued" to results.map { it.totalVmsQueued }, - "total_vms_finished" to results.map { it.totalVmsFinished }, - "total_vms_failed" to results.map { it.totalVmsFailed } - ) - ) - ) - } -} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt new file mode 100644 index 00000000..39a6851c --- /dev/null +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.runner.internal + +import org.opendc.web.client.runner.OpenDCRunnerClient +import org.opendc.web.proto.JobState +import org.opendc.web.proto.runner.Job +import org.opendc.web.runner.JobManager + +/** + * Default implementation of [JobManager] that uses the OpenDC client to receive jobs. + */ +internal class JobManagerImpl(private val client: OpenDCRunnerClient) : JobManager { + override fun findNext(): Job? { + return client.jobs.queryPending().firstOrNull() + } + + override fun claim(id: Long): Boolean { + return try { + client.jobs.update(id, Job.Update(JobState.CLAIMED)) + true + } catch (e: IllegalStateException) { + false + } + } + + override fun heartbeat(id: Long) { + client.jobs.update(id, Job.Update(JobState.RUNNING)) + } + + override fun fail(id: Long) { + client.jobs.update(id, Job.Update(JobState.FAILED)) + } + + override fun finish(id: Long, results: Map) { + client.jobs.update(id, Job.Update(JobState.FINISHED, results)) + } +} -- cgit v1.2.3