diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-25 14:53:54 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-25 14:53:54 +0200 |
| commit | aa9b32f8cd1467e9718959f400f6777e5d71737d (patch) | |
| tree | b88bbede15108c6855d7f94ded4c7054df186a72 /opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner | |
| parent | eb0e0a3bc557c05a70eead388797ab850ea87366 (diff) | |
| parent | b7a71e5b4aa77b41ef41deec2ace42b67a5a13a7 (diff) | |
merge: Integrate v2.1 progress into public repository
This pull request integrates the changes planned for the v2.1 release of
OpenDC into the public Github repository in order to sync the progress
of both repositories.
Diffstat (limited to 'opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner')
3 files changed, 571 insertions, 0 deletions
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt new file mode 100644 index 00000000..59308e11 --- /dev/null +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -0,0 +1,348 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.runner + +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.options.* +import com.github.ajalt.clikt.parameters.types.file +import com.github.ajalt.clikt.parameters.types.long +import kotlinx.coroutines.* +import mu.KotlinLogging +import org.opendc.compute.workload.* +import org.opendc.compute.workload.topology.HostSpec +import org.opendc.compute.workload.topology.Topology +import org.opendc.compute.workload.topology.apply +import org.opendc.compute.workload.util.PerformanceInterferenceReader +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.power.LinearPowerModel +import org.opendc.simulator.compute.power.SimplePowerDriver +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.collectServiceMetrics +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader +import org.opendc.web.client.ApiClient +import org.opendc.web.client.AuthConfiguration +import org.opendc.web.client.model.Scenario +import java.io.File +import java.net.URI +import java.time.Duration +import java.util.* +import org.opendc.web.client.model.Portfolio as ClientPortfolio +import org.opendc.web.client.model.Topology as ClientTopology + +private val logger = KotlinLogging.logger {} + +/** + * Represents the CLI command for starting the OpenDC web runner. + */ +class RunnerCli : CliktCommand(name = "runner") { + /** + * The URL to the OpenDC API. + */ + private val apiUrl by option( + "--api-url", + help = "url to the OpenDC API", + envvar = "OPENDC_API_URL" + ) + .convert { URI(it) } + .default(URI("https://api.opendc.org/v2")) + + /** + * The auth domain to use. + */ + private val authDomain by option( + "--auth-domain", + help = "auth domain of the OpenDC API", + envvar = "AUTH0_DOMAIN" + ) + .required() + + /** + * The auth client ID to use. + */ + private val authClientId by option( + "--auth-id", + help = "auth client id of the OpenDC API", + envvar = "AUTH0_CLIENT_ID" + ) + .required() + + /** + * The auth client secret to use. + */ + private val authClientSecret by option( + "--auth-secret", + help = "auth client secret of the OpenDC API", + envvar = "AUTH0_CLIENT_SECRET" + ) + .required() + + /** + * The path to the traces directory. + */ + private val tracePath by option( + "--traces", + help = "path to the directory containing the traces", + envvar = "OPENDC_TRACES" + ) + .file(canBeFile = false) + .defaultLazy { File("traces/") } + + /** + * The maximum duration of a single experiment run. + */ + private val runTimeout by option( + "--run-timeout", + help = "maximum duration of experiment in seconds", + envvar = "OPENDC_RUN_TIMEOUT" + ) + .long() + .default(60L * 3) // Experiment may run for a maximum of three minutes + + /** + * Converge a single scenario. + */ + private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, topology: Topology): List<WebComputeMetricExporter.Result> { + val id = scenario.id + + logger.info { "Constructing performance interference model" } + + val workloadLoader = ComputeWorkloadLoader(tracePath) + val interferenceGroups = let { + val path = tracePath.resolve(scenario.trace.traceId).resolve("performance-interference-model.json") + val operational = scenario.operationalPhenomena + val enabled = operational.performanceInterferenceEnabled + + if (!enabled || !path.exists()) { + return@let null + } + + PerformanceInterferenceReader().read(path.inputStream()) + } + + val targets = portfolio.targets + val results = (0 until targets.repeatsPerScenario).map { repeat -> + logger.info { "Starting repeat $repeat" } + withTimeout(runTimeout * 1000) { + val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong())) } + runRepeat(scenario, repeat, topology, workloadLoader, interferenceModel) + } + } + + logger.info { "Finished simulation for scenario $id" } + + return results + } + + /** + * Converge a single repeat. + */ + private suspend fun runRepeat( + scenario: Scenario, + repeat: Int, + topology: Topology, + workloadLoader: ComputeWorkloadLoader, + interferenceModel: VmInterferenceModel? + ): WebComputeMetricExporter.Result { + val exporter = WebComputeMetricExporter() + + try { + runBlockingSimulation { + val workloadName = scenario.trace.traceId + val workloadFraction = scenario.trace.loadSamplingFraction + + val seeder = Random(repeat.toLong()) + + val operational = scenario.operationalPhenomena + val computeScheduler = createComputeScheduler(operational.schedulerName, seeder) + val workload = trace(workloadName).sampleByLoad(workloadFraction) + + val failureModel = + if (operational.failuresEnabled) + grid5000(Duration.ofDays(7)) + else + null + + val simulator = ComputeWorkloadRunner( + coroutineContext, + clock, + computeScheduler, + failureModel, + interferenceModel.takeIf { operational.performanceInterferenceEnabled } + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, exporter, exportInterval = Duration.ofHours(1)) + + try { + // Instantiate the topology onto the simulator + simulator.apply(topology) + // Converge workload trace + simulator.run(workload.resolve(workloadLoader, seeder), seeder.nextLong()) + } finally { + simulator.close() + metricReader.close() + } + + val serviceMetrics = collectServiceMetrics(simulator.producers[0]) + logger.debug { + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" + } + } + } catch (cause: Throwable) { + logger.warn(cause) { "Experiment failed" } + } + + return exporter.getResult() + } + + private val POLL_INTERVAL = 30000L // ms = 30 s + private val HEARTBEAT_INTERVAL = 60000L // ms = 1 min + + override fun run(): Unit = runBlocking(Dispatchers.Default) { + logger.info { "Starting OpenDC web runner" } + + val client = ApiClient(baseUrl = apiUrl, AuthConfiguration(authDomain, authClientId, authClientSecret)) + val manager = ScenarioManager(client) + + logger.info { "Watching for queued scenarios" } + + while (true) { + val scenario = manager.findNext() + + if (scenario == null) { + delay(POLL_INTERVAL) + continue + } + + val id = scenario.id + + logger.info { "Found queued scenario $id: attempting to claim" } + + if (!manager.claim(id)) { + logger.info { "Failed to claim scenario" } + continue + } + + coroutineScope { + // Launch heartbeat process + val heartbeat = launch { + while (true) { + manager.heartbeat(id) + delay(HEARTBEAT_INTERVAL) + } + } + + try { + val scenarioModel = client.getScenario(id)!! + val portfolio = client.getPortfolio(scenarioModel.portfolioId)!! + val environment = convert(client.getTopology(scenarioModel.topology.topologyId)!!) + val results = runScenario(portfolio, scenarioModel, environment) + + logger.info { "Writing results to database" } + + manager.finish(id, results) + + logger.info { "Successfully finished scenario $id" } + } catch (e: Exception) { + logger.error(e) { "Scenario failed to finish" } + manager.fail(id) + } finally { + heartbeat.cancel() + } + } + } + } + + /** + * Convert the specified [topology] into an [Topology] understood by OpenDC. + */ + private fun convert(topology: ClientTopology): Topology { + return object : Topology { + + override fun resolve(): List<HostSpec> { + val res = mutableListOf<HostSpec>() + val random = Random(0) + + val machines = topology.rooms.asSequence() + .flatMap { room -> + room.tiles.flatMap { tile -> + tile.rack?.machines?.map { machine -> tile.rack to machine } ?: emptyList() + } + } + for ((rack, machine) in machines) { + val clusterId = rack.id + val position = machine.position + + val processors = machine.cpus.flatMap { cpu -> + val cores = cpu.numberOfCores + val speed = cpu.clockRateMhz + // TODO Remove hard coding of vendor + val node = ProcessingNode("Intel", "amd64", cpu.name, cores) + List(cores) { coreId -> + ProcessingUnit(node, coreId, speed) + } + } + val memoryUnits = machine.memory.map { memory -> + MemoryUnit( + "Samsung", + memory.name, + memory.speedMbPerS, + memory.sizeMb.toLong() + ) + } + + val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW } + val powerModel = LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5) + val powerDriver = SimplePowerDriver(powerModel) + + val spec = HostSpec( + UUID(random.nextLong(), random.nextLong()), + "node-$clusterId-$position", + mapOf("cluster" to clusterId), + MachineModel(processors, memoryUnits), + powerDriver + ) + + res += spec + } + + return res + } + + override fun toString(): String = "WebRunnerTopologyFactory" + } + } +} + +/** + * Main entry point of the runner. + */ +fun main(args: Array<String>): Unit = RunnerCli().main(args) diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt new file mode 100644 index 00000000..1ee835a6 --- /dev/null +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.runner + +import org.opendc.web.client.ApiClient +import org.opendc.web.client.model.Job +import org.opendc.web.client.model.SimulationState + +/** + * Manages the queue of scenarios that need to be processed. + */ +public class ScenarioManager(private val client: ApiClient) { + /** + * Find the next job that the simulator needs to process. + */ + public suspend fun findNext(): Job? { + return client.getJobs().firstOrNull() + } + + /** + * Claim the simulation job with the specified id. + */ + public suspend fun claim(id: String): Boolean { + return client.updateJob(id, SimulationState.CLAIMED) + } + + /** + * Update the heartbeat of the specified scenario. + */ + public suspend fun heartbeat(id: String) { + client.updateJob(id, SimulationState.RUNNING) + } + + /** + * Mark the scenario as failed. + */ + public suspend fun fail(id: String) { + client.updateJob(id, SimulationState.FAILED) + } + + /** + * Persist the specified results. + */ + public suspend fun finish(id: String, results: List<WebComputeMetricExporter.Result>) { + client.updateJob( + id, SimulationState.FINISHED, + mapOf( + "total_requested_burst" to results.map { it.totalActiveTime + it.totalIdleTime }, + "total_granted_burst" to results.map { it.totalActiveTime }, + "total_overcommitted_burst" to results.map { it.totalStealTime }, + "total_interfered_burst" to results.map { it.totalLostTime }, + "mean_cpu_usage" to results.map { it.meanCpuUsage }, + "mean_cpu_demand" to results.map { it.meanCpuDemand }, + "mean_num_deployed_images" to results.map { it.meanNumDeployedImages }, + "max_num_deployed_images" to results.map { it.maxNumDeployedImages }, + "total_power_draw" to results.map { it.totalPowerDraw }, + "total_failure_slices" to results.map { it.totalFailureSlices }, + "total_failure_vm_slices" to results.map { it.totalFailureVmSlices }, + "total_vms_submitted" to results.map { it.totalVmsSubmitted }, + "total_vms_queued" to results.map { it.totalVmsQueued }, + "total_vms_finished" to results.map { it.totalVmsFinished }, + "total_vms_failed" to results.map { it.totalVmsFailed } + ) + ) + } +} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt new file mode 100644 index 00000000..7913660d --- /dev/null +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.runner + +import org.opendc.telemetry.compute.ComputeMetricExporter +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServiceData +import kotlin.math.max +import kotlin.math.roundToLong + +/** + * A [ComputeMonitor] that tracks the aggregate metrics for each repeat. + */ +class WebComputeMetricExporter : ComputeMetricExporter() { + override fun record(data: HostData) { + val slices = data.downtime / SLICE_LENGTH + + hostAggregateMetrics = AggregateHostMetrics( + hostAggregateMetrics.totalActiveTime + data.cpuActiveTime, + hostAggregateMetrics.totalIdleTime + data.cpuIdleTime, + hostAggregateMetrics.totalStealTime + data.cpuStealTime, + hostAggregateMetrics.totalLostTime + data.cpuLostTime, + hostAggregateMetrics.totalPowerDraw + data.powerTotal, + hostAggregateMetrics.totalFailureSlices + slices, + hostAggregateMetrics.totalFailureVmSlices + data.guestsRunning * slices + ) + + hostMetrics.compute(data.host.id) { _, prev -> + HostMetrics( + data.cpuUsage + (prev?.cpuUsage ?: 0.0), + data.cpuDemand + (prev?.cpuDemand ?: 0.0), + data.guestsRunning + (prev?.instanceCount ?: 0), + 1 + (prev?.count ?: 0) + ) + } + } + + private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() + private val hostMetrics: MutableMap<String, HostMetrics> = mutableMapOf() + private val SLICE_LENGTH: Long = 5 * 60L + + data class AggregateHostMetrics( + val totalActiveTime: Long = 0L, + val totalIdleTime: Long = 0L, + val totalStealTime: Long = 0L, + val totalLostTime: Long = 0L, + val totalPowerDraw: Double = 0.0, + val totalFailureSlices: Double = 0.0, + val totalFailureVmSlices: Double = 0.0, + ) + + data class HostMetrics( + val cpuUsage: Double, + val cpuDemand: Double, + val instanceCount: Long, + val count: Long + ) + + private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics() + + override fun record(data: ServiceData) { + serviceMetrics = AggregateServiceMetrics( + max(data.attemptsSuccess, serviceMetrics.vmTotalCount), + max(data.serversPending, serviceMetrics.vmWaitingCount), + max(data.serversActive, serviceMetrics.vmActiveCount), + max(0, serviceMetrics.vmInactiveCount), + max(data.attemptsFailure, serviceMetrics.vmFailedCount), + ) + } + + data class AggregateServiceMetrics( + val vmTotalCount: Int = 0, + val vmWaitingCount: Int = 0, + val vmActiveCount: Int = 0, + val vmInactiveCount: Int = 0, + val vmFailedCount: Int = 0 + ) + + fun getResult(): Result { + return Result( + hostAggregateMetrics.totalActiveTime, + hostAggregateMetrics.totalIdleTime, + hostAggregateMetrics.totalStealTime, + hostAggregateMetrics.totalLostTime, + hostMetrics.map { it.value.cpuUsage / it.value.count }.average(), + hostMetrics.map { it.value.cpuDemand / it.value.count }.average(), + hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(), + hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, + hostAggregateMetrics.totalPowerDraw, + hostAggregateMetrics.totalFailureSlices.roundToLong(), + hostAggregateMetrics.totalFailureVmSlices.roundToLong(), + serviceMetrics.vmTotalCount, + serviceMetrics.vmWaitingCount, + serviceMetrics.vmInactiveCount, + serviceMetrics.vmFailedCount, + ) + } + + data class Result( + val totalActiveTime: Long, + val totalIdleTime: Long, + val totalStealTime: Long, + val totalLostTime: Long, + val meanCpuUsage: Double, + val meanCpuDemand: Double, + val meanNumDeployedImages: Double, + val maxNumDeployedImages: Double, + val totalPowerDraw: Double, + val totalFailureSlices: Long, + val totalFailureVmSlices: Long, + val totalVmsSubmitted: Int, + val totalVmsQueued: Int, + val totalVmsFinished: Int, + val totalVmsFailed: Int + ) +} |
