summaryrefslogtreecommitdiff
path: root/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-03-07 17:58:08 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-04-04 12:48:05 +0200
commitd12efc754a1611a624d170b4d1fa6085e6bb177b (patch)
tree13f70f3c2db7fa82f89a400a9001666bee81ec87 /opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner
parentabac46fe742484c6e0b90bebe3c86d88231540b2 (diff)
refactor(web/runner): Update runner to use new web client
This change updates the web runner implementation to use the new API client introduced in the previous commit.
Diffstat (limited to 'opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner')
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt57
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt66
2 files changed, 62 insertions, 61 deletions
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index 94ef8f8e..561dcd59 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -44,15 +44,15 @@ import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
-import org.opendc.web.client.ApiClient
-import org.opendc.web.client.AuthConfiguration
-import org.opendc.web.client.model.Scenario
+import org.opendc.web.client.auth.OpenIdAuthController
+import org.opendc.web.client.runner.OpenDCRunnerClient
+import org.opendc.web.proto.runner.Job
+import org.opendc.web.proto.runner.Scenario
import java.io.File
import java.net.URI
import java.time.Duration
import java.util.*
-import org.opendc.web.client.model.Portfolio as ClientPortfolio
-import org.opendc.web.client.model.Topology as ClientTopology
+import org.opendc.web.proto.runner.Topology as ClientTopology
private val logger = KotlinLogging.logger {}
@@ -134,18 +134,18 @@ class RunnerCli : CliktCommand(name = "runner") {
.default(60L * 3) // Experiment may run for a maximum of three minutes
/**
- * Converge a single scenario.
+ * Run a simulation job.
*/
- private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, topology: Topology): List<WebComputeMetricExporter.Result> {
- val id = scenario.id
+ private suspend fun runJob(job: Job, topology: Topology): List<WebComputeMetricExporter.Result> {
+ val id = job.id
+ val scenario = job.scenario
logger.info { "Constructing performance interference model" }
val workloadLoader = ComputeWorkloadLoader(tracePath)
val interferenceModel = let {
- val path = tracePath.resolve(scenario.trace.traceId).resolve("performance-interference-model.json")
- val operational = scenario.operationalPhenomena
- val enabled = operational.performanceInterferenceEnabled
+ val path = tracePath.resolve(scenario.workload.trace.id).resolve("performance-interference-model.json")
+ val enabled = scenario.phenomena.interference
if (!enabled || !path.exists()) {
return@let null
@@ -154,8 +154,7 @@ class RunnerCli : CliktCommand(name = "runner") {
VmInterferenceModelReader().read(path.inputStream())
}
- val targets = portfolio.targets
- val results = (0 until targets.repeatsPerScenario).map { repeat ->
+ val results = (0 until scenario.portfolio.targets.repeats).map { repeat ->
logger.info { "Starting repeat $repeat" }
withTimeout(runTimeout * 1000) {
runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel?.withSeed(repeat.toLong()))
@@ -168,7 +167,7 @@ class RunnerCli : CliktCommand(name = "runner") {
}
/**
- * Converge a single repeat.
+ * Run a single repeat.
*/
private suspend fun runRepeat(
scenario: Scenario,
@@ -181,17 +180,17 @@ class RunnerCli : CliktCommand(name = "runner") {
try {
runBlockingSimulation {
- val workloadName = scenario.trace.traceId
- val workloadFraction = scenario.trace.loadSamplingFraction
+ val workloadName = scenario.workload.trace.id
+ val workloadFraction = scenario.workload.samplingFraction
val seeder = Random(repeat.toLong())
- val operational = scenario.operationalPhenomena
- val computeScheduler = createComputeScheduler(operational.schedulerName, seeder)
+ val phenomena = scenario.phenomena
+ val computeScheduler = createComputeScheduler(scenario.schedulerName, seeder)
val workload = trace(workloadName).sampleByLoad(workloadFraction)
val failureModel =
- if (operational.failuresEnabled)
+ if (phenomena.failures)
grid5000(Duration.ofDays(7))
else
null
@@ -203,7 +202,7 @@ class RunnerCli : CliktCommand(name = "runner") {
telemetry,
computeScheduler,
failureModel,
- interferenceModel.takeIf { operational.performanceInterferenceEnabled }
+ interferenceModel
)
telemetry.registerMetricReader(CoroutineMetricReader(this, exporter, exportInterval = Duration.ofHours(1)))
@@ -241,20 +240,19 @@ class RunnerCli : CliktCommand(name = "runner") {
override fun run(): Unit = runBlocking(Dispatchers.Default) {
logger.info { "Starting OpenDC web runner" }
- val client = ApiClient(baseUrl = apiUrl, AuthConfiguration(authDomain, authClientId, authClientSecret), authAudience)
+ val client = OpenDCRunnerClient(baseUrl = apiUrl, OpenIdAuthController(authDomain, authClientId, authClientSecret, authAudience))
val manager = ScenarioManager(client)
logger.info { "Watching for queued scenarios" }
while (true) {
- val scenario = manager.findNext()
-
- if (scenario == null) {
+ val job = manager.findNext()
+ if (job == null) {
delay(POLL_INTERVAL)
continue
}
- val id = scenario.id
+ val id = job.id
logger.info { "Found queued scenario $id: attempting to claim" }
@@ -273,10 +271,8 @@ class RunnerCli : CliktCommand(name = "runner") {
}
try {
- val scenarioModel = client.getScenario(id)!!
- val portfolio = client.getPortfolio(scenarioModel.portfolioId)!!
- val environment = convert(client.getTopology(scenarioModel.topology.topologyId)!!)
- val results = runScenario(portfolio, scenarioModel, environment)
+ val environment = convert(job.scenario.topology)
+ val results = runJob(job, environment)
logger.info { "Writing results to database" }
@@ -306,7 +302,8 @@ class RunnerCli : CliktCommand(name = "runner") {
val machines = topology.rooms.asSequence()
.flatMap { room ->
room.tiles.flatMap { tile ->
- tile.rack?.machines?.map { machine -> tile.rack to machine } ?: emptyList()
+ val rack = tile.rack
+ rack?.machines?.map { machine -> rack to machine } ?: emptyList()
}
}
for ((rack, machine) in machines) {
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
index 1ee835a6..7374f0c9 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
@@ -22,64 +22,68 @@
package org.opendc.web.runner
-import org.opendc.web.client.ApiClient
-import org.opendc.web.client.model.Job
-import org.opendc.web.client.model.SimulationState
+import org.opendc.web.client.runner.OpenDCRunnerClient
+import org.opendc.web.proto.JobState
+import org.opendc.web.proto.runner.Job
/**
* Manages the queue of scenarios that need to be processed.
*/
-public class ScenarioManager(private val client: ApiClient) {
+class ScenarioManager(private val client: OpenDCRunnerClient) {
/**
* Find the next job that the simulator needs to process.
*/
- public suspend fun findNext(): Job? {
- return client.getJobs().firstOrNull()
+ fun findNext(): Job? {
+ return client.jobs.queryPending().firstOrNull()
}
/**
* Claim the simulation job with the specified id.
*/
- public suspend fun claim(id: String): Boolean {
- return client.updateJob(id, SimulationState.CLAIMED)
+ fun claim(id: Long): Boolean {
+ client.jobs.update(id, Job.Update(JobState.CLAIMED)) // TODO Handle conflict
+ return true
}
/**
* Update the heartbeat of the specified scenario.
*/
- public suspend fun heartbeat(id: String) {
- client.updateJob(id, SimulationState.RUNNING)
+ fun heartbeat(id: Long) {
+ client.jobs.update(id, Job.Update(JobState.RUNNING))
}
/**
* Mark the scenario as failed.
*/
- public suspend fun fail(id: String) {
- client.updateJob(id, SimulationState.FAILED)
+ fun fail(id: Long) {
+ client.jobs.update(id, Job.Update(JobState.FAILED))
}
/**
* Persist the specified results.
*/
- public suspend fun finish(id: String, results: List<WebComputeMetricExporter.Result>) {
- client.updateJob(
- id, SimulationState.FINISHED,
- mapOf(
- "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime },
- "total_granted_burst" to results.map { it.totalActiveTime },
- "total_overcommitted_burst" to results.map { it.totalStealTime },
- "total_interfered_burst" to results.map { it.totalLostTime },
- "mean_cpu_usage" to results.map { it.meanCpuUsage },
- "mean_cpu_demand" to results.map { it.meanCpuDemand },
- "mean_num_deployed_images" to results.map { it.meanNumDeployedImages },
- "max_num_deployed_images" to results.map { it.maxNumDeployedImages },
- "total_power_draw" to results.map { it.totalPowerDraw },
- "total_failure_slices" to results.map { it.totalFailureSlices },
- "total_failure_vm_slices" to results.map { it.totalFailureVmSlices },
- "total_vms_submitted" to results.map { it.totalVmsSubmitted },
- "total_vms_queued" to results.map { it.totalVmsQueued },
- "total_vms_finished" to results.map { it.totalVmsFinished },
- "total_vms_failed" to results.map { it.totalVmsFailed }
+ public fun finish(id: Long, results: List<WebComputeMetricExporter.Result>) {
+ client.jobs.update(
+ id,
+ Job.Update(
+ JobState.FINISHED,
+ mapOf(
+ "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime },
+ "total_granted_burst" to results.map { it.totalActiveTime },
+ "total_overcommitted_burst" to results.map { it.totalStealTime },
+ "total_interfered_burst" to results.map { it.totalLostTime },
+ "mean_cpu_usage" to results.map { it.meanCpuUsage },
+ "mean_cpu_demand" to results.map { it.meanCpuDemand },
+ "mean_num_deployed_images" to results.map { it.meanNumDeployedImages },
+ "max_num_deployed_images" to results.map { it.maxNumDeployedImages },
+ "total_power_draw" to results.map { it.totalPowerDraw },
+ "total_failure_slices" to results.map { it.totalFailureSlices },
+ "total_failure_vm_slices" to results.map { it.totalFailureVmSlices },
+ "total_vms_submitted" to results.map { it.totalVmsSubmitted },
+ "total_vms_queued" to results.map { it.totalVmsQueued },
+ "total_vms_finished" to results.map { it.totalVmsFinished },
+ "total_vms_failed" to results.map { it.totalVmsFailed }
+ )
)
)
}