diff options
Diffstat (limited to 'opendc-web/opendc-web-runner/src')
5 files changed, 152 insertions, 102 deletions
diff --git a/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt index 348a838c..4cfbdd7c 100644 --- a/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt @@ -114,7 +114,8 @@ class RunnerCli : CliktCommand(name = "opendc-runner") { logger.info { "Starting OpenDC web runner" } val client = OpenDCRunnerClient(baseUrl = apiUrl, OpenIdAuthController(authDomain, authClientId, authClientSecret, authAudience)) - val runner = OpenDCRunner(client, tracePath, parallelism = parallelism) + val manager = JobManager(client) + val runner = OpenDCRunner(manager, tracePath, parallelism = parallelism) logger.info { "Watching for queued scenarios" } runner.run() diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt new file mode 100644 index 00000000..50aa03d8 --- /dev/null +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.runner + +import org.opendc.web.client.runner.OpenDCRunnerClient +import org.opendc.web.proto.runner.Job +import org.opendc.web.runner.internal.JobManagerImpl + +/** + * Interface used by the [OpenDCRunner] to manage the available jobs to be processed. + */ +public interface JobManager { + /** + * Find the next job that the simulator needs to process. + */ + public fun findNext(): Job? + + /** + * Claim the simulation job with the specified id. + */ + public fun claim(id: Long): Boolean + + /** + * Update the heartbeat of the specified job. + */ + public fun heartbeat(id: Long) + + /** + * Mark the job as failed. + */ + public fun fail(id: Long) + + /** + * Persist the specified results for the specified job. + */ + public fun finish(id: Long, results: Map<String, Any>) + + public companion object { + /** + * Create a [JobManager] given the specified [client]. + */ + @JvmStatic + @JvmName("create") + public operator fun invoke(client: OpenDCRunnerClient): JobManager { + return JobManagerImpl(client) + } + } +} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index 7e0133d0..c958bdb2 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -36,10 +36,8 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.LinearPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.web.client.runner.OpenDCRunnerClient import org.opendc.web.proto.runner.Job import org.opendc.web.proto.runner.Scenario -import org.opendc.web.runner.internal.JobManager import org.opendc.web.runner.internal.WebComputeMonitor import java.io.File import java.time.Duration @@ -50,14 +48,14 @@ import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory /** * Class to execute the pending jobs via the OpenDC web API. * - * @param client The [OpenDCRunnerClient] to connect to the OpenDC web API. + * @param manager The underlying [JobManager] to manage the available jobs. * @param tracePath The directory where the traces are located. * @param jobTimeout The maximum duration of a simulation job. * @param pollInterval The interval to poll the API with. * @param heartbeatInterval The interval to send a heartbeat to the API server. */ public class OpenDCRunner( - client: OpenDCRunnerClient, + private val manager: JobManager, private val tracePath: File, parallelism: Int = Runtime.getRuntime().availableProcessors(), private val jobTimeout: Duration = Duration.ofMillis(10), @@ -70,11 +68,6 @@ public class OpenDCRunner( private val logger = KotlinLogging.logger {} /** - * Helper class to manage the available jobs. - */ - private val manager = JobManager(client) - - /** * Helper class to load the workloads. */ private val workloadLoader = ComputeWorkloadLoader(tracePath) @@ -142,12 +135,32 @@ public class OpenDCRunner( try { val topology = convertTopology(scenario.topology) val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> SimulationTask(scenario, repeat, topology) } - val results = invokeAll(jobs) + val results = invokeAll(jobs).map { it.rawResult } logger.info { "Finished simulation for job $id" } heartbeat.cancel(true) - manager.finish(id, results.map { it.rawResult }) + + manager.finish( + id, + mapOf( + "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime }, + "total_granted_burst" to results.map { it.totalActiveTime }, + "total_overcommitted_burst" to results.map { it.totalStealTime }, + "total_interfered_burst" to results.map { it.totalLostTime }, + "mean_cpu_usage" to results.map { it.meanCpuUsage }, + "mean_cpu_demand" to results.map { it.meanCpuDemand }, + "mean_num_deployed_images" to results.map { it.meanNumDeployedImages }, + "max_num_deployed_images" to results.map { it.maxNumDeployedImages }, + "total_power_draw" to results.map { it.totalPowerDraw }, + "total_failure_slices" to results.map { it.totalFailureSlices }, + "total_failure_vm_slices" to results.map { it.totalFailureVmSlices }, + "total_vms_submitted" to results.map { it.totalVmsSubmitted }, + "total_vms_queued" to results.map { it.totalVmsQueued }, + "total_vms_finished" to results.map { it.totalVmsFinished }, + "total_vms_failed" to results.map { it.totalVmsFailed } + ) + ) } catch (e: Exception) { // Check whether the job failed due to exceeding its time budget if (Thread.interrupted()) { diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt deleted file mode 100644 index 99b8aaf1..00000000 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.web.runner.internal - -import org.opendc.web.client.runner.OpenDCRunnerClient -import org.opendc.web.proto.JobState -import org.opendc.web.proto.runner.Job - -/** - * Helper class to manage the queue of jobs that need to be simulated. - */ -internal class JobManager(private val client: OpenDCRunnerClient) { - /** - * Find the next job that the simulator needs to process. - */ - fun findNext(): Job? { - return client.jobs.queryPending().firstOrNull() - } - - /** - * Claim the simulation job with the specified id. - */ - fun claim(id: Long): Boolean { - client.jobs.update(id, Job.Update(JobState.CLAIMED)) // TODO Handle conflict - return true - } - - /** - * Update the heartbeat of the specified scenario. - */ - fun heartbeat(id: Long) { - client.jobs.update(id, Job.Update(JobState.RUNNING)) - } - - /** - * Mark the scenario as failed. - */ - fun fail(id: Long) { - client.jobs.update(id, Job.Update(JobState.FAILED)) - } - - /** - * Persist the specified results. - */ - fun finish(id: Long, results: List<WebComputeMonitor.Results>) { - client.jobs.update( - id, - Job.Update( - JobState.FINISHED, - mapOf( - "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime }, - "total_granted_burst" to results.map { it.totalActiveTime }, - "total_overcommitted_burst" to results.map { it.totalStealTime }, - "total_interfered_burst" to results.map { it.totalLostTime }, - "mean_cpu_usage" to results.map { it.meanCpuUsage }, - "mean_cpu_demand" to results.map { it.meanCpuDemand }, - "mean_num_deployed_images" to results.map { it.meanNumDeployedImages }, - "max_num_deployed_images" to results.map { it.maxNumDeployedImages }, - "total_power_draw" to results.map { it.totalPowerDraw }, - "total_failure_slices" to results.map { it.totalFailureSlices }, - "total_failure_vm_slices" to results.map { it.totalFailureVmSlices }, - "total_vms_submitted" to results.map { it.totalVmsSubmitted }, - "total_vms_queued" to results.map { it.totalVmsQueued }, - "total_vms_finished" to results.map { it.totalVmsFinished }, - "total_vms_failed" to results.map { it.totalVmsFailed } - ) - ) - ) - } -} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt new file mode 100644 index 00000000..39a6851c --- /dev/null +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.runner.internal + +import org.opendc.web.client.runner.OpenDCRunnerClient +import org.opendc.web.proto.JobState +import org.opendc.web.proto.runner.Job +import org.opendc.web.runner.JobManager + +/** + * Default implementation of [JobManager] that uses the OpenDC client to receive jobs. + */ +internal class JobManagerImpl(private val client: OpenDCRunnerClient) : JobManager { + override fun findNext(): Job? { + return client.jobs.queryPending().firstOrNull() + } + + override fun claim(id: Long): Boolean { + return try { + client.jobs.update(id, Job.Update(JobState.CLAIMED)) + true + } catch (e: IllegalStateException) { + false + } + } + + override fun heartbeat(id: Long) { + client.jobs.update(id, Job.Update(JobState.RUNNING)) + } + + override fun fail(id: Long) { + client.jobs.update(id, Job.Update(JobState.FAILED)) + } + + override fun finish(id: Long, results: Map<String, Any>) { + client.jobs.update(id, Job.Update(JobState.FINISHED, results)) + } +} |
