summaryrefslogtreecommitdiff
path: root/opendc-web/opendc-web-runner/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-08-03 11:11:58 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-08-03 11:33:44 +0200
commita424aa5e81c31f8cc6ba8846f0a6af29623588d4 (patch)
tree18006e7c5b3c9c2ff4faa9a988a29a8a6c0499c3 /opendc-web/opendc-web-runner/src
parentf6932d264db5b2e185ec7ea7aaec84dfb83f8fe9 (diff)
refactor(web/runner): Support pluggable job manager
This change introduces a new interface `JobManager` that is responsible for communicating with the backend about the available jobs and updating their status when the runner is simulating a job. This manager can be injected into the `OpenDCRunner` class and allows users to provide different sources for the jobs, not only the current REST API.
Diffstat (limited to 'opendc-web/opendc-web-runner/src')
-rw-r--r--opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt3
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt68
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt35
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt90
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt58
5 files changed, 152 insertions, 102 deletions
diff --git a/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt
index 348a838c..4cfbdd7c 100644
--- a/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/cli/kotlin/org/opendc/web/runner/Main.kt
@@ -114,7 +114,8 @@ class RunnerCli : CliktCommand(name = "opendc-runner") {
logger.info { "Starting OpenDC web runner" }
val client = OpenDCRunnerClient(baseUrl = apiUrl, OpenIdAuthController(authDomain, authClientId, authClientSecret, authAudience))
- val runner = OpenDCRunner(client, tracePath, parallelism = parallelism)
+ val manager = JobManager(client)
+ val runner = OpenDCRunner(manager, tracePath, parallelism = parallelism)
logger.info { "Watching for queued scenarios" }
runner.run()
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt
new file mode 100644
index 00000000..50aa03d8
--- /dev/null
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/JobManager.kt
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.web.runner
+
+import org.opendc.web.client.runner.OpenDCRunnerClient
+import org.opendc.web.proto.runner.Job
+import org.opendc.web.runner.internal.JobManagerImpl
+
+/**
+ * Interface used by the [OpenDCRunner] to manage the available jobs to be processed.
+ */
+public interface JobManager {
+ /**
+ * Find the next job that the simulator needs to process.
+ */
+ public fun findNext(): Job?
+
+ /**
+ * Claim the simulation job with the specified id.
+ */
+ public fun claim(id: Long): Boolean
+
+ /**
+ * Update the heartbeat of the specified job.
+ */
+ public fun heartbeat(id: Long)
+
+ /**
+ * Mark the job as failed.
+ */
+ public fun fail(id: Long)
+
+ /**
+ * Persist the specified results for the specified job.
+ */
+ public fun finish(id: Long, results: Map<String, Any>)
+
+ public companion object {
+ /**
+ * Create a [JobManager] given the specified [client].
+ */
+ @JvmStatic
+ @JvmName("create")
+ public operator fun invoke(client: OpenDCRunnerClient): JobManager {
+ return JobManagerImpl(client)
+ }
+ }
+}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
index 7e0133d0..c958bdb2 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt
@@ -36,10 +36,8 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.LinearPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.web.client.runner.OpenDCRunnerClient
import org.opendc.web.proto.runner.Job
import org.opendc.web.proto.runner.Scenario
-import org.opendc.web.runner.internal.JobManager
import org.opendc.web.runner.internal.WebComputeMonitor
import java.io.File
import java.time.Duration
@@ -50,14 +48,14 @@ import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory
/**
* Class to execute the pending jobs via the OpenDC web API.
*
- * @param client The [OpenDCRunnerClient] to connect to the OpenDC web API.
+ * @param manager The underlying [JobManager] to manage the available jobs.
* @param tracePath The directory where the traces are located.
* @param jobTimeout The maximum duration of a simulation job.
* @param pollInterval The interval to poll the API with.
* @param heartbeatInterval The interval to send a heartbeat to the API server.
*/
public class OpenDCRunner(
- client: OpenDCRunnerClient,
+ private val manager: JobManager,
private val tracePath: File,
parallelism: Int = Runtime.getRuntime().availableProcessors(),
private val jobTimeout: Duration = Duration.ofMillis(10),
@@ -70,11 +68,6 @@ public class OpenDCRunner(
private val logger = KotlinLogging.logger {}
/**
- * Helper class to manage the available jobs.
- */
- private val manager = JobManager(client)
-
- /**
* Helper class to load the workloads.
*/
private val workloadLoader = ComputeWorkloadLoader(tracePath)
@@ -142,12 +135,32 @@ public class OpenDCRunner(
try {
val topology = convertTopology(scenario.topology)
val jobs = (0 until scenario.portfolio.targets.repeats).map { repeat -> SimulationTask(scenario, repeat, topology) }
- val results = invokeAll(jobs)
+ val results = invokeAll(jobs).map { it.rawResult }
logger.info { "Finished simulation for job $id" }
heartbeat.cancel(true)
- manager.finish(id, results.map { it.rawResult })
+
+ manager.finish(
+ id,
+ mapOf(
+ "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime },
+ "total_granted_burst" to results.map { it.totalActiveTime },
+ "total_overcommitted_burst" to results.map { it.totalStealTime },
+ "total_interfered_burst" to results.map { it.totalLostTime },
+ "mean_cpu_usage" to results.map { it.meanCpuUsage },
+ "mean_cpu_demand" to results.map { it.meanCpuDemand },
+ "mean_num_deployed_images" to results.map { it.meanNumDeployedImages },
+ "max_num_deployed_images" to results.map { it.maxNumDeployedImages },
+ "total_power_draw" to results.map { it.totalPowerDraw },
+ "total_failure_slices" to results.map { it.totalFailureSlices },
+ "total_failure_vm_slices" to results.map { it.totalFailureVmSlices },
+ "total_vms_submitted" to results.map { it.totalVmsSubmitted },
+ "total_vms_queued" to results.map { it.totalVmsQueued },
+ "total_vms_finished" to results.map { it.totalVmsFinished },
+ "total_vms_failed" to results.map { it.totalVmsFailed }
+ )
+ )
} catch (e: Exception) {
// Check whether the job failed due to exceeding its time budget
if (Thread.interrupted()) {
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt
deleted file mode 100644
index 99b8aaf1..00000000
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.web.runner.internal
-
-import org.opendc.web.client.runner.OpenDCRunnerClient
-import org.opendc.web.proto.JobState
-import org.opendc.web.proto.runner.Job
-
-/**
- * Helper class to manage the queue of jobs that need to be simulated.
- */
-internal class JobManager(private val client: OpenDCRunnerClient) {
- /**
- * Find the next job that the simulator needs to process.
- */
- fun findNext(): Job? {
- return client.jobs.queryPending().firstOrNull()
- }
-
- /**
- * Claim the simulation job with the specified id.
- */
- fun claim(id: Long): Boolean {
- client.jobs.update(id, Job.Update(JobState.CLAIMED)) // TODO Handle conflict
- return true
- }
-
- /**
- * Update the heartbeat of the specified scenario.
- */
- fun heartbeat(id: Long) {
- client.jobs.update(id, Job.Update(JobState.RUNNING))
- }
-
- /**
- * Mark the scenario as failed.
- */
- fun fail(id: Long) {
- client.jobs.update(id, Job.Update(JobState.FAILED))
- }
-
- /**
- * Persist the specified results.
- */
- fun finish(id: Long, results: List<WebComputeMonitor.Results>) {
- client.jobs.update(
- id,
- Job.Update(
- JobState.FINISHED,
- mapOf(
- "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime },
- "total_granted_burst" to results.map { it.totalActiveTime },
- "total_overcommitted_burst" to results.map { it.totalStealTime },
- "total_interfered_burst" to results.map { it.totalLostTime },
- "mean_cpu_usage" to results.map { it.meanCpuUsage },
- "mean_cpu_demand" to results.map { it.meanCpuDemand },
- "mean_num_deployed_images" to results.map { it.meanNumDeployedImages },
- "max_num_deployed_images" to results.map { it.maxNumDeployedImages },
- "total_power_draw" to results.map { it.totalPowerDraw },
- "total_failure_slices" to results.map { it.totalFailureSlices },
- "total_failure_vm_slices" to results.map { it.totalFailureVmSlices },
- "total_vms_submitted" to results.map { it.totalVmsSubmitted },
- "total_vms_queued" to results.map { it.totalVmsQueued },
- "total_vms_finished" to results.map { it.totalVmsFinished },
- "total_vms_failed" to results.map { it.totalVmsFailed }
- )
- )
- )
- }
-}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt
new file mode 100644
index 00000000..39a6851c
--- /dev/null
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManagerImpl.kt
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.web.runner.internal
+
+import org.opendc.web.client.runner.OpenDCRunnerClient
+import org.opendc.web.proto.JobState
+import org.opendc.web.proto.runner.Job
+import org.opendc.web.runner.JobManager
+
+/**
+ * Default implementation of [JobManager] that uses the OpenDC client to receive jobs.
+ */
+internal class JobManagerImpl(private val client: OpenDCRunnerClient) : JobManager {
+ override fun findNext(): Job? {
+ return client.jobs.queryPending().firstOrNull()
+ }
+
+ override fun claim(id: Long): Boolean {
+ return try {
+ client.jobs.update(id, Job.Update(JobState.CLAIMED))
+ true
+ } catch (e: IllegalStateException) {
+ false
+ }
+ }
+
+ override fun heartbeat(id: Long) {
+ client.jobs.update(id, Job.Update(JobState.RUNNING))
+ }
+
+ override fun fail(id: Long) {
+ client.jobs.update(id, Job.Update(JobState.FAILED))
+ }
+
+ override fun finish(id: Long, results: Map<String, Any>) {
+ client.jobs.update(id, Job.Update(JobState.FINISHED, results))
+ }
+}