diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-08-03 14:42:49 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-08-03 14:42:49 +0200 |
| commit | 2692a032b8ecbe2ddccfb88628cf37af56c3ea36 (patch) | |
| tree | 5dde4331c0a10dd8e843c75cf68d2c7d3cd45906 /opendc-web/opendc-web-runner/src | |
| parent | b023b085425c0d0f7952c2c331576b8d0fc8c857 (diff) | |
| parent | 87df18bb691260ee69d2e48cf1598e6a4acc329b (diff) | |
merge: Simplify OpenDC deployment process
This pull request attempts to simplify the deployment process necessary for
deploying OpenDC locally or using Docker. There was currently not a clear
and simple way to deploy OpenDC locally, yet the Docker-based deployment
was also out-of-sync.
## Implementation Notes :hammer_and_pick:
* Address several bugs in the OpenDC web runner
* Fix dependency related issues
* Rename `opendc-web-api` to `opendc-web-server`
* Create a local distribution of `opendc-web-server`.
* Fix Docker deployment
* Update deployment guide
## External Dependencies :four_leaf_clover:
* Quarkus 2.11.1
* `jandex-gradle-plugin` 0.13.2
## Breaking API Changes :warning:
* `TraceFormat.installedProviders` has been changed to `TraceFormat.getInstalledProviders`.
The list of installed providers is now not cached at first access, but queried every time the
method is invoked and its results depend on the caller context (e.g., context class loader).
* `OpenDCRunner` now requires a `JobManager` as constructor argument, which can be
constructed as follows: `JobManager.create(client)`
* `opendc-web-api` is now renamed to `opendc-web-server`.
Diffstat (limited to 'opendc-web/opendc-web-runner/src')
6 files changed, 170 insertions, 111 deletions
diff --git a/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt index 348a838c..4cfbdd7c 100644 --- a/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt @@ -114,7 +114,8 @@ class RunnerCli : CliktCommand(name = "opendc-runner") { logger.info { "Starting OpenDC web runner" } val client = OpenDCRunnerClient(baseUrl = apiUrl, OpenIdAuthController(authDomain, authClientId, authClientSecret, authAudience)) - val runner = OpenDCRunner(client, tracePath, parallelism = parallelism) + val manager = JobManager(client) + val runner = OpenDCRunner(manager, tracePath, parallelism = parallelism) logger.info { "Watching for queued scenarios" } runner.run() diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt new file mode 100644 index 00000000..50aa03d8 --- /dev/null +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.runner + +import org.opendc.web.client.runner.OpenDCRunnerClient +import org.opendc.web.proto.runner.Job +import org.opendc.web.runner.internal.JobManagerImpl + +/** + * Interface used by the [OpenDCRunner] to manage the available jobs to be processed. + */ +public interface JobManager { + /** + * Find the next job that the simulator needs to process. + */ + public fun findNext(): Job? + + /** + * Claim the simulation job with the specified id. + */ + public fun claim(id: Long): Boolean + + /** + * Update the heartbeat of the specified job. + */ + public fun heartbeat(id: Long) + + /** + * Mark the job as failed. + */ + public fun fail(id: Long) + + /** + * Persist the specified results for the specified job. + */ + public fun finish(id: Long, results: Map<String, Any>) + + public companion object { + /** + * Create a [JobManager] given the specified [client]. + */ + @JvmStatic + @JvmName("create") + public operator fun invoke(client: OpenDCRunnerClient): JobManager { + return JobManagerImpl(client) + } + } +} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index 9c9a866d..c958bdb2 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -36,27 +36,26 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.LinearPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.web.client.runner.OpenDCRunnerClient import org.opendc.web.proto.runner.Job import org.opendc.web.proto.runner.Scenario -import org.opendc.web.runner.internal.JobManager import org.opendc.web.runner.internal.WebComputeMonitor import java.io.File import java.time.Duration import java.util.* import java.util.concurrent.* +import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory /** * Class to execute the pending jobs via the OpenDC web API. * - * @param client The [OpenDCRunnerClient] to connect to the OpenDC web API. + * @param manager The underlying [JobManager] to manage the available jobs. * @param tracePath The directory where the traces are located. * @param jobTimeout The maximum duration of a simulation job. * @param pollInterval The interval to poll the API with. * @param heartbeatInterval The interval to send a heartbeat to the API server. */ public class OpenDCRunner( - client: OpenDCRunnerClient, + private val manager: JobManager, private val tracePath: File, parallelism: Int = Runtime.getRuntime().availableProcessors(), private val jobTimeout: Duration = Duration.ofMillis(10), @@ -69,11 +68,6 @@ public class OpenDCRunner( private val logger = KotlinLogging.logger {} /** - * Helper class to manage the available jobs. - */ - private val manager = JobManager(client) - - /** * Helper class to load the workloads. */ private val workloadLoader = ComputeWorkloadLoader(tracePath) @@ -81,7 +75,7 @@ public class OpenDCRunner( /** * The [ForkJoinPool] that is used to execute the simulation jobs. */ - private val pool = ForkJoinPool(parallelism) + private val pool = ForkJoinPool(parallelism, RunnerThreadFactory(Thread.currentThread().contextClassLoader), null, false) /** * A [ScheduledExecutorService] to manage the heartbeat of simulation jobs as well as tracking the deadline of @@ -97,11 +91,6 @@ public class OpenDCRunner( override fun run() { try { while (true) { - // Check if anyone has interrupted the thread - if (Thread.interrupted()) { - throw InterruptedException() - } - val job = manager.findNext() if (job == null) { Thread.sleep(pollInterval.toMillis()) @@ -119,6 +108,8 @@ public class OpenDCRunner( pool.submit(JobAction(job)) } + } catch (_: InterruptedException) { + // Gracefully exit when the thread is interrupted } finally { workloadLoader.reset() @@ -144,12 +135,32 @@ public class OpenDCRunner( try { val topology = convertTopology(scenario.topology) val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> SimulationTask(scenario, repeat, topology) } - val results = invokeAll(jobs) + val results = invokeAll(jobs).map { it.rawResult } logger.info { "Finished simulation for job $id" } heartbeat.cancel(true) - manager.finish(id, results.map { it.rawResult }) + + manager.finish( + id, + mapOf( + "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime }, + "total_granted_burst" to results.map { it.totalActiveTime }, + "total_overcommitted_burst" to results.map { it.totalStealTime }, + "total_interfered_burst" to results.map { it.totalLostTime }, + "mean_cpu_usage" to results.map { it.meanCpuUsage }, + "mean_cpu_demand" to results.map { it.meanCpuDemand }, + "mean_num_deployed_images" to results.map { it.meanNumDeployedImages }, + "max_num_deployed_images" to results.map { it.maxNumDeployedImages }, + "total_power_draw" to results.map { it.totalPowerDraw }, + "total_failure_slices" to results.map { it.totalFailureSlices }, + "total_failure_vm_slices" to results.map { it.totalFailureVmSlices }, + "total_vms_submitted" to results.map { it.totalVmsSubmitted }, + "total_vms_queued" to results.map { it.totalVmsQueued }, + "total_vms_finished" to results.map { it.totalVmsFinished }, + "total_vms_failed" to results.map { it.totalVmsFailed } + ) + ) } catch (e: Exception) { // Check whether the job failed due to exceeding its time budget if (Thread.interrupted()) { @@ -303,4 +314,15 @@ public class OpenDCRunner( override fun toString(): String = "WebRunnerTopologyFactory" } } + + /** + * A custom [ForkJoinWorkerThreadFactory] that uses the [ClassLoader] of specified by the runner. + */ + private class RunnerThreadFactory(private val classLoader: ClassLoader) : ForkJoinWorkerThreadFactory { + override fun newThread(pool: ForkJoinPool): ForkJoinWorkerThread = object : ForkJoinWorkerThread(pool) { + init { + contextClassLoader = classLoader + } + } + } } diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt deleted file mode 100644 index 99b8aaf1..00000000 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.web.runner.internal - -import org.opendc.web.client.runner.OpenDCRunnerClient -import org.opendc.web.proto.JobState -import org.opendc.web.proto.runner.Job - -/** - * Helper class to manage the queue of jobs that need to be simulated. - */ -internal class JobManager(private val client: OpenDCRunnerClient) { - /** - * Find the next job that the simulator needs to process. - */ - fun findNext(): Job? { - return client.jobs.queryPending().firstOrNull() - } - - /** - * Claim the simulation job with the specified id. - */ - fun claim(id: Long): Boolean { - client.jobs.update(id, Job.Update(JobState.CLAIMED)) // TODO Handle conflict - return true - } - - /** - * Update the heartbeat of the specified scenario. - */ - fun heartbeat(id: Long) { - client.jobs.update(id, Job.Update(JobState.RUNNING)) - } - - /** - * Mark the scenario as failed. - */ - fun fail(id: Long) { - client.jobs.update(id, Job.Update(JobState.FAILED)) - } - - /** - * Persist the specified results. - */ - fun finish(id: Long, results: List<WebComputeMonitor.Results>) { - client.jobs.update( - id, - Job.Update( - JobState.FINISHED, - mapOf( - "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime }, - "total_granted_burst" to results.map { it.totalActiveTime }, - "total_overcommitted_burst" to results.map { it.totalStealTime }, - "total_interfered_burst" to results.map { it.totalLostTime }, - "mean_cpu_usage" to results.map { it.meanCpuUsage }, - "mean_cpu_demand" to results.map { it.meanCpuDemand }, - "mean_num_deployed_images" to results.map { it.meanNumDeployedImages }, - "max_num_deployed_images" to results.map { it.maxNumDeployedImages }, - "total_power_draw" to results.map { it.totalPowerDraw }, - "total_failure_slices" to results.map { it.totalFailureSlices }, - "total_failure_vm_slices" to results.map { it.totalFailureVmSlices }, - "total_vms_submitted" to results.map { it.totalVmsSubmitted }, - "total_vms_queued" to results.map { it.totalVmsQueued }, - "total_vms_finished" to results.map { it.totalVmsFinished }, - "total_vms_failed" to results.map { it.totalVmsFailed } - ) - ) - ) - } -} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt new file mode 100644 index 00000000..39a6851c --- /dev/null +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.runner.internal + +import org.opendc.web.client.runner.OpenDCRunnerClient +import org.opendc.web.proto.JobState +import org.opendc.web.proto.runner.Job +import org.opendc.web.runner.JobManager + +/** + * Default implementation of [JobManager] that uses the OpenDC client to receive jobs. + */ +internal class JobManagerImpl(private val client: OpenDCRunnerClient) : JobManager { + override fun findNext(): Job? { + return client.jobs.queryPending().firstOrNull() + } + + override fun claim(id: Long): Boolean { + return try { + client.jobs.update(id, Job.Update(JobState.CLAIMED)) + true + } catch (e: IllegalStateException) { + false + } + } + + override fun heartbeat(id: Long) { + client.jobs.update(id, Job.Update(JobState.RUNNING)) + } + + override fun fail(id: Long) { + client.jobs.update(id, Job.Update(JobState.FAILED)) + } + + override fun finish(id: Long, results: Map<String, Any>) { + client.jobs.update(id, Job.Update(JobState.FINISHED, results)) + } +} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt index 01002c70..4c3d1cfa 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt @@ -105,9 +105,9 @@ internal class WebComputeMonitor : ComputeMonitor { hostAggregateMetrics.totalIdleTime, hostAggregateMetrics.totalStealTime, hostAggregateMetrics.totalLostTime, - hostMetrics.map { it.value.cpuUsage / it.value.count }.average(), - hostMetrics.map { it.value.cpuDemand / it.value.count }.average(), - hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(), + hostMetrics.map { it.value.cpuUsage / it.value.count }.average().let { if (it.isNaN()) 0.0 else it }, + hostMetrics.map { it.value.cpuDemand / it.value.count }.average().let { if (it.isNaN()) 0.0 else it }, + hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average().let { if (it.isNaN()) 0.0 else it }, hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, hostAggregateMetrics.totalPowerDraw, hostAggregateMetrics.totalFailureSlices.roundToLong(), |
