diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-03-07 17:58:08 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-04-04 12:48:05 +0200 |
| commit | d12efc754a1611a624d170b4d1fa6085e6bb177b (patch) | |
| tree | 13f70f3c2db7fa82f89a400a9001666bee81ec87 /opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner | |
| parent | abac46fe742484c6e0b90bebe3c86d88231540b2 (diff) | |
refactor(web/runner): Update runner to use new web client
This change updates the web runner implementation to use the new API
client introduced in the previous commit.
Diffstat (limited to 'opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner')
| -rw-r--r-- | opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt | 57 | ||||
| -rw-r--r-- | opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt | 66 |
2 files changed, 62 insertions, 61 deletions
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index 94ef8f8e..561dcd59 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -44,15 +44,15 @@ import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader -import org.opendc.web.client.ApiClient -import org.opendc.web.client.AuthConfiguration -import org.opendc.web.client.model.Scenario +import org.opendc.web.client.auth.OpenIdAuthController +import org.opendc.web.client.runner.OpenDCRunnerClient +import org.opendc.web.proto.runner.Job +import org.opendc.web.proto.runner.Scenario import java.io.File import java.net.URI import java.time.Duration import java.util.* -import org.opendc.web.client.model.Portfolio as ClientPortfolio -import org.opendc.web.client.model.Topology as ClientTopology +import org.opendc.web.proto.runner.Topology as ClientTopology private val logger = KotlinLogging.logger {} @@ -134,18 +134,18 @@ class RunnerCli : CliktCommand(name = "runner") { .default(60L * 3) // Experiment may run for a maximum of three minutes /** - * Converge a single scenario. + * Run a simulation job. */ - private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, topology: Topology): List<WebComputeMetricExporter.Result> { - val id = scenario.id + private suspend fun runJob(job: Job, topology: Topology): List<WebComputeMetricExporter.Result> { + val id = job.id + val scenario = job.scenario logger.info { "Constructing performance interference model" } val workloadLoader = ComputeWorkloadLoader(tracePath) val interferenceModel = let { - val path = tracePath.resolve(scenario.trace.traceId).resolve("performance-interference-model.json") - val operational = scenario.operationalPhenomena - val enabled = operational.performanceInterferenceEnabled + val path = tracePath.resolve(scenario.workload.trace.id).resolve("performance-interference-model.json") + val enabled = scenario.phenomena.interference if (!enabled || !path.exists()) { return@let null @@ -154,8 +154,7 @@ class RunnerCli : CliktCommand(name = "runner") { VmInterferenceModelReader().read(path.inputStream()) } - val targets = portfolio.targets - val results = (0 until targets.repeatsPerScenario).map { repeat -> + val results = (0 until scenario.portfolio.targets.repeats).map { repeat -> logger.info { "Starting repeat $repeat" } withTimeout(runTimeout * 1000) { runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel?.withSeed(repeat.toLong())) @@ -168,7 +167,7 @@ class RunnerCli : CliktCommand(name = "runner") { } /** - * Converge a single repeat. + * Run a single repeat. */ private suspend fun runRepeat( scenario: Scenario, @@ -181,17 +180,17 @@ class RunnerCli : CliktCommand(name = "runner") { try { runBlockingSimulation { - val workloadName = scenario.trace.traceId - val workloadFraction = scenario.trace.loadSamplingFraction + val workloadName = scenario.workload.trace.id + val workloadFraction = scenario.workload.samplingFraction val seeder = Random(repeat.toLong()) - val operational = scenario.operationalPhenomena - val computeScheduler = createComputeScheduler(operational.schedulerName, seeder) + val phenomena = scenario.phenomena + val computeScheduler = createComputeScheduler(scenario.schedulerName, seeder) val workload = trace(workloadName).sampleByLoad(workloadFraction) val failureModel = - if (operational.failuresEnabled) + if (phenomena.failures) grid5000(Duration.ofDays(7)) else null @@ -203,7 +202,7 @@ class RunnerCli : CliktCommand(name = "runner") { telemetry, computeScheduler, failureModel, - interferenceModel.takeIf { operational.performanceInterferenceEnabled } + interferenceModel ) telemetry.registerMetricReader(CoroutineMetricReader(this, exporter, exportInterval = Duration.ofHours(1))) @@ -241,20 +240,19 @@ class RunnerCli : CliktCommand(name = "runner") { override fun run(): Unit = runBlocking(Dispatchers.Default) { logger.info { "Starting OpenDC web runner" } - val client = ApiClient(baseUrl = apiUrl, AuthConfiguration(authDomain, authClientId, authClientSecret), authAudience) + val client = OpenDCRunnerClient(baseUrl = apiUrl, OpenIdAuthController(authDomain, authClientId, authClientSecret, authAudience)) val manager = ScenarioManager(client) logger.info { "Watching for queued scenarios" } while (true) { - val scenario = manager.findNext() - - if (scenario == null) { + val job = manager.findNext() + if (job == null) { delay(POLL_INTERVAL) continue } - val id = scenario.id + val id = job.id logger.info { "Found queued scenario $id: attempting to claim" } @@ -273,10 +271,8 @@ class RunnerCli : CliktCommand(name = "runner") { } try { - val scenarioModel = client.getScenario(id)!! - val portfolio = client.getPortfolio(scenarioModel.portfolioId)!! - val environment = convert(client.getTopology(scenarioModel.topology.topologyId)!!) - val results = runScenario(portfolio, scenarioModel, environment) + val environment = convert(job.scenario.topology) + val results = runJob(job, environment) logger.info { "Writing results to database" } @@ -306,7 +302,8 @@ class RunnerCli : CliktCommand(name = "runner") { val machines = topology.rooms.asSequence() .flatMap { room -> room.tiles.flatMap { tile -> - tile.rack?.machines?.map { machine -> tile.rack to machine } ?: emptyList() + val rack = tile.rack + rack?.machines?.map { machine -> rack to machine } ?: emptyList() } } for ((rack, machine) in machines) { diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt index 1ee835a6..7374f0c9 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt @@ -22,64 +22,68 @@ package org.opendc.web.runner -import org.opendc.web.client.ApiClient -import org.opendc.web.client.model.Job -import org.opendc.web.client.model.SimulationState +import org.opendc.web.client.runner.OpenDCRunnerClient +import org.opendc.web.proto.JobState +import org.opendc.web.proto.runner.Job /** * Manages the queue of scenarios that need to be processed. */ -public class ScenarioManager(private val client: ApiClient) { +class ScenarioManager(private val client: OpenDCRunnerClient) { /** * Find the next job that the simulator needs to process. */ - public suspend fun findNext(): Job? { - return client.getJobs().firstOrNull() + fun findNext(): Job? { + return client.jobs.queryPending().firstOrNull() } /** * Claim the simulation job with the specified id. */ - public suspend fun claim(id: String): Boolean { - return client.updateJob(id, SimulationState.CLAIMED) + fun claim(id: Long): Boolean { + client.jobs.update(id, Job.Update(JobState.CLAIMED)) // TODO Handle conflict + return true } /** * Update the heartbeat of the specified scenario. */ - public suspend fun heartbeat(id: String) { - client.updateJob(id, SimulationState.RUNNING) + fun heartbeat(id: Long) { + client.jobs.update(id, Job.Update(JobState.RUNNING)) } /** * Mark the scenario as failed. */ - public suspend fun fail(id: String) { - client.updateJob(id, SimulationState.FAILED) + fun fail(id: Long) { + client.jobs.update(id, Job.Update(JobState.FAILED)) } /** * Persist the specified results. */ - public suspend fun finish(id: String, results: List<WebComputeMetricExporter.Result>) { - client.updateJob( - id, SimulationState.FINISHED, - mapOf( - "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime }, - "total_granted_burst" to results.map { it.totalActiveTime }, - "total_overcommitted_burst" to results.map { it.totalStealTime }, - "total_interfered_burst" to results.map { it.totalLostTime }, - "mean_cpu_usage" to results.map { it.meanCpuUsage }, - "mean_cpu_demand" to results.map { it.meanCpuDemand }, - "mean_num_deployed_images" to results.map { it.meanNumDeployedImages }, - "max_num_deployed_images" to results.map { it.maxNumDeployedImages }, - "total_power_draw" to results.map { it.totalPowerDraw }, - "total_failure_slices" to results.map { it.totalFailureSlices }, - "total_failure_vm_slices" to results.map { it.totalFailureVmSlices }, - "total_vms_submitted" to results.map { it.totalVmsSubmitted }, - "total_vms_queued" to results.map { it.totalVmsQueued }, - "total_vms_finished" to results.map { it.totalVmsFinished }, - "total_vms_failed" to results.map { it.totalVmsFailed } + public fun finish(id: Long, results: List<WebComputeMetricExporter.Result>) { + client.jobs.update( + id, + Job.Update( + JobState.FINISHED, + mapOf( + "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime }, + "total_granted_burst" to results.map { it.totalActiveTime }, + "total_overcommitted_burst" to results.map { it.totalStealTime }, + "total_interfered_burst" to results.map { it.totalLostTime }, + "mean_cpu_usage" to results.map { it.meanCpuUsage }, + "mean_cpu_demand" to results.map { it.meanCpuDemand }, + "mean_num_deployed_images" to results.map { it.meanNumDeployedImages }, + "max_num_deployed_images" to results.map { it.maxNumDeployedImages }, + "total_power_draw" to results.map { it.totalPowerDraw }, + "total_failure_slices" to results.map { it.totalFailureSlices }, + "total_failure_vm_slices" to results.map { it.totalFailureVmSlices }, + "total_vms_submitted" to results.map { it.totalVmsSubmitted }, + "total_vms_queued" to results.map { it.totalVmsQueued }, + "total_vms_finished" to results.map { it.totalVmsFinished }, + "total_vms_failed" to results.map { it.totalVmsFailed } + ) ) ) } |
