diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-04-04 17:00:31 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-04-04 17:00:31 +0200 |
| commit | 38769373c7e89783d33849283586bfa0b62e8251 (patch) | |
| tree | 4fda128ee6b30018c1aa14c584cc53ade80e67f7 /opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner | |
| parent | 6021aa4278bebb34bf5603ead4b5daeabcdc4c19 (diff) | |
| parent | 527ae2230f5c2dd22f496f45d5d8e3bd4acdb854 (diff) | |
merge: Migrate to Quarkus-based web API
This pull request changes the web API to a Quarkus-based version. Currently, the OpenDC web API is written in Python (using Flask). Although Python is a powerful language to develop web services, having another language next to Kotlin/Java and JavaScript introduces some challenges.
For instance, the web API and UI lack integration with our Gradle-based build pipeline and require additional steps from the developer to start working with. Furthermore, deploying OpenDC requires having Python installed in addition to the JVM.
By converting the web API into a Quarkus application, we can enjoy further integration with our Gradle-based build pipeline and simplify the development/deployment process of OpenDC, by requiring only the JVM and Node to work with OpenDC.
## Implementation Notes :hammer_and_pick:
* Move build dependencies into version catalog
* Design unified communication protocol
* Add Quarkus API implementation
* Add new web client implementation
* Update runner to use new web client
* Fix compatibility with React.js UI
* Remove Python build steps from CI pipeline
* Update Docker deployment for new web API
* Remove obsolete database configuration
## External Dependencies :four_leaf_clover:
* Quarkus
## Breaking API Changes :warning:
* The new web API only supports SQL-based databases for storing user-data, as opposed to MongoDB currently. We intend to use H2 for development and Postgres for production.
Diffstat (limited to 'opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner')
| -rw-r--r-- | opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt | 57 | ||||
| -rw-r--r-- | opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt | 66 |
2 files changed, 62 insertions, 61 deletions
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index 94ef8f8e..561dcd59 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -44,15 +44,15 @@ import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader -import org.opendc.web.client.ApiClient -import org.opendc.web.client.AuthConfiguration -import org.opendc.web.client.model.Scenario +import org.opendc.web.client.auth.OpenIdAuthController +import org.opendc.web.client.runner.OpenDCRunnerClient +import org.opendc.web.proto.runner.Job +import org.opendc.web.proto.runner.Scenario import java.io.File import java.net.URI import java.time.Duration import java.util.* -import org.opendc.web.client.model.Portfolio as ClientPortfolio -import org.opendc.web.client.model.Topology as ClientTopology +import org.opendc.web.proto.runner.Topology as ClientTopology private val logger = KotlinLogging.logger {} @@ -134,18 +134,18 @@ class RunnerCli : CliktCommand(name = "runner") { .default(60L * 3) // Experiment may run for a maximum of three minutes /** - * Converge a single scenario. + * Run a simulation job. */ - private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, topology: Topology): List<WebComputeMetricExporter.Result> { - val id = scenario.id + private suspend fun runJob(job: Job, topology: Topology): List<WebComputeMetricExporter.Result> { + val id = job.id + val scenario = job.scenario logger.info { "Constructing performance interference model" } val workloadLoader = ComputeWorkloadLoader(tracePath) val interferenceModel = let { - val path = tracePath.resolve(scenario.trace.traceId).resolve("performance-interference-model.json") - val operational = scenario.operationalPhenomena - val enabled = operational.performanceInterferenceEnabled + val path = tracePath.resolve(scenario.workload.trace.id).resolve("performance-interference-model.json") + val enabled = scenario.phenomena.interference if (!enabled || !path.exists()) { return@let null @@ -154,8 +154,7 @@ class RunnerCli : CliktCommand(name = "runner") { VmInterferenceModelReader().read(path.inputStream()) } - val targets = portfolio.targets - val results = (0 until targets.repeatsPerScenario).map { repeat -> + val results = (0 until scenario.portfolio.targets.repeats).map { repeat -> logger.info { "Starting repeat $repeat" } withTimeout(runTimeout * 1000) { runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel?.withSeed(repeat.toLong())) @@ -168,7 +167,7 @@ class RunnerCli : CliktCommand(name = "runner") { } /** - * Converge a single repeat. + * Run a single repeat. */ private suspend fun runRepeat( scenario: Scenario, @@ -181,17 +180,17 @@ class RunnerCli : CliktCommand(name = "runner") { try { runBlockingSimulation { - val workloadName = scenario.trace.traceId - val workloadFraction = scenario.trace.loadSamplingFraction + val workloadName = scenario.workload.trace.id + val workloadFraction = scenario.workload.samplingFraction val seeder = Random(repeat.toLong()) - val operational = scenario.operationalPhenomena - val computeScheduler = createComputeScheduler(operational.schedulerName, seeder) + val phenomena = scenario.phenomena + val computeScheduler = createComputeScheduler(scenario.schedulerName, seeder) val workload = trace(workloadName).sampleByLoad(workloadFraction) val failureModel = - if (operational.failuresEnabled) + if (phenomena.failures) grid5000(Duration.ofDays(7)) else null @@ -203,7 +202,7 @@ class RunnerCli : CliktCommand(name = "runner") { telemetry, computeScheduler, failureModel, - interferenceModel.takeIf { operational.performanceInterferenceEnabled } + interferenceModel ) telemetry.registerMetricReader(CoroutineMetricReader(this, exporter, exportInterval = Duration.ofHours(1))) @@ -241,20 +240,19 @@ class RunnerCli : CliktCommand(name = "runner") { override fun run(): Unit = runBlocking(Dispatchers.Default) { logger.info { "Starting OpenDC web runner" } - val client = ApiClient(baseUrl = apiUrl, AuthConfiguration(authDomain, authClientId, authClientSecret), authAudience) + val client = OpenDCRunnerClient(baseUrl = apiUrl, OpenIdAuthController(authDomain, authClientId, authClientSecret, authAudience)) val manager = ScenarioManager(client) logger.info { "Watching for queued scenarios" } while (true) { - val scenario = manager.findNext() - - if (scenario == null) { + val job = manager.findNext() + if (job == null) { delay(POLL_INTERVAL) continue } - val id = scenario.id + val id = job.id logger.info { "Found queued scenario $id: attempting to claim" } @@ -273,10 +271,8 @@ class RunnerCli : CliktCommand(name = "runner") { } try { - val scenarioModel = client.getScenario(id)!! - val portfolio = client.getPortfolio(scenarioModel.portfolioId)!! - val environment = convert(client.getTopology(scenarioModel.topology.topologyId)!!) - val results = runScenario(portfolio, scenarioModel, environment) + val environment = convert(job.scenario.topology) + val results = runJob(job, environment) logger.info { "Writing results to database" } @@ -306,7 +302,8 @@ class RunnerCli : CliktCommand(name = "runner") { val machines = topology.rooms.asSequence() .flatMap { room -> room.tiles.flatMap { tile -> - tile.rack?.machines?.map { machine -> tile.rack to machine } ?: emptyList() + val rack = tile.rack + rack?.machines?.map { machine -> rack to machine } ?: emptyList() } } for ((rack, machine) in machines) { diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt index 1ee835a6..7374f0c9 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt @@ -22,64 +22,68 @@ package org.opendc.web.runner -import org.opendc.web.client.ApiClient -import org.opendc.web.client.model.Job -import org.opendc.web.client.model.SimulationState +import org.opendc.web.client.runner.OpenDCRunnerClient +import org.opendc.web.proto.JobState +import org.opendc.web.proto.runner.Job /** * Manages the queue of scenarios that need to be processed. */ -public class ScenarioManager(private val client: ApiClient) { +class ScenarioManager(private val client: OpenDCRunnerClient) { /** * Find the next job that the simulator needs to process. */ - public suspend fun findNext(): Job? { - return client.getJobs().firstOrNull() + fun findNext(): Job? { + return client.jobs.queryPending().firstOrNull() } /** * Claim the simulation job with the specified id. */ - public suspend fun claim(id: String): Boolean { - return client.updateJob(id, SimulationState.CLAIMED) + fun claim(id: Long): Boolean { + client.jobs.update(id, Job.Update(JobState.CLAIMED)) // TODO Handle conflict + return true } /** * Update the heartbeat of the specified scenario. */ - public suspend fun heartbeat(id: String) { - client.updateJob(id, SimulationState.RUNNING) + fun heartbeat(id: Long) { + client.jobs.update(id, Job.Update(JobState.RUNNING)) } /** * Mark the scenario as failed. */ - public suspend fun fail(id: String) { - client.updateJob(id, SimulationState.FAILED) + fun fail(id: Long) { + client.jobs.update(id, Job.Update(JobState.FAILED)) } /** * Persist the specified results. */ - public suspend fun finish(id: String, results: List<WebComputeMetricExporter.Result>) { - client.updateJob( - id, SimulationState.FINISHED, - mapOf( - "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime }, - "total_granted_burst" to results.map { it.totalActiveTime }, - "total_overcommitted_burst" to results.map { it.totalStealTime }, - "total_interfered_burst" to results.map { it.totalLostTime }, - "mean_cpu_usage" to results.map { it.meanCpuUsage }, - "mean_cpu_demand" to results.map { it.meanCpuDemand }, - "mean_num_deployed_images" to results.map { it.meanNumDeployedImages }, - "max_num_deployed_images" to results.map { it.maxNumDeployedImages }, - "total_power_draw" to results.map { it.totalPowerDraw }, - "total_failure_slices" to results.map { it.totalFailureSlices }, - "total_failure_vm_slices" to results.map { it.totalFailureVmSlices }, - "total_vms_submitted" to results.map { it.totalVmsSubmitted }, - "total_vms_queued" to results.map { it.totalVmsQueued }, - "total_vms_finished" to results.map { it.totalVmsFinished }, - "total_vms_failed" to results.map { it.totalVmsFailed } + public fun finish(id: Long, results: List<WebComputeMetricExporter.Result>) { + client.jobs.update( + id, + Job.Update( + JobState.FINISHED, + mapOf( + "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime }, + "total_granted_burst" to results.map { it.totalActiveTime }, + "total_overcommitted_burst" to results.map { it.totalStealTime }, + "total_interfered_burst" to results.map { it.totalLostTime }, + "mean_cpu_usage" to results.map { it.meanCpuUsage }, + "mean_cpu_demand" to results.map { it.meanCpuDemand }, + "mean_num_deployed_images" to results.map { it.meanNumDeployedImages }, + "max_num_deployed_images" to results.map { it.maxNumDeployedImages }, + "total_power_draw" to results.map { it.totalPowerDraw }, + "total_failure_slices" to results.map { it.totalFailureSlices }, + "total_failure_vm_slices" to results.map { it.totalFailureVmSlices }, + "total_vms_submitted" to results.map { it.totalVmsSubmitted }, + "total_vms_queued" to results.map { it.totalVmsQueued }, + "total_vms_finished" to results.map { it.totalVmsFinished }, + "total_vms_failed" to results.map { it.totalVmsFailed } + ) ) ) } |
