From a4ae44e7f5bbfb293cdce256da3c40f927605ac9 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Jul 2020 18:02:43 +0200 Subject: Add skeleton for web runner --- .../kotlin/com/atlarge/opendc/runner/web/Main.kt | 20 +++++++++ .../src/main/resources/log4j2.xml | 49 ++++++++++++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt create mode 100644 simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml (limited to 'simulator/opendc/opendc-runner-web/src/main') diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt new file mode 100644 index 00000000..3cee259c --- /dev/null +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt @@ -0,0 +1,20 @@ +package com.atlarge.opendc.runner.web + +import com.github.ajalt.clikt.core.CliktCommand +import mu.KotlinLogging + +private val logger = KotlinLogging.logger {} + +/** + * Represents the CLI command for starting the OpenDC web runner. + */ +class RunnerCli : CliktCommand(name = "runner") { + override fun run() { + logger.info { "Starting OpenDC web runner" } + } +} + +/** + * Main entry point of the runner. + */ +fun main(args: Array) = RunnerCli().main(args) diff --git a/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml b/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml new file mode 100644 index 00000000..b5a2bbb5 --- /dev/null +++ b/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml @@ -0,0 +1,49 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + -- cgit v1.2.3 From 5d528f6b1902d372eb2ef594bc96712ad74ac361 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 16 Jul 2020 22:04:35 +0200 Subject: Add prototype of web experiment runner This change adds a bridge between the frontend and the new simulator implementation via MongoDB. --- .../kotlin/com/atlarge/opendc/runner/web/Main.kt | 285 ++++++++++++++++++++- .../atlarge/opendc/runner/web/ScenarioManager.kt | 77 ++++++ .../atlarge/opendc/runner/web/TopologyParser.kt | 127 +++++++++ 3 files changed, 488 insertions(+), 1 deletion(-) create mode 100644 simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt create mode 100644 simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt (limited to 'simulator/opendc/opendc-runner-web/src/main') diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt index 3cee259c..d70ad6bd 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt @@ -1,16 +1,299 @@ package com.atlarge.opendc.runner.web +import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.opendc.compute.virt.service.allocation.* +import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor +import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain +import com.atlarge.opendc.experiments.sc20.experiment.createProvisioner +import com.atlarge.opendc.experiments.sc20.experiment.model.Workload +import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor +import com.atlarge.opendc.experiments.sc20.experiment.processTrace +import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader +import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.options.* +import com.github.ajalt.clikt.parameters.types.file +import com.github.ajalt.clikt.parameters.types.int +import com.mongodb.MongoClientSettings +import com.mongodb.MongoCredential +import com.mongodb.ServerAddress +import com.mongodb.client.MongoClients +import com.mongodb.client.MongoCollection +import com.mongodb.client.MongoDatabase +import com.mongodb.client.model.Filters +import java.io.File +import java.util.* +import kotlin.random.Random +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel import mu.KotlinLogging +import org.bson.Document private val logger = KotlinLogging.logger {} +/** + * The provider for the simulation engine to use. + */ +private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + /** * Represents the CLI command for starting the OpenDC web runner. */ class RunnerCli : CliktCommand(name = "runner") { - override fun run() { + /** + * The name of the database to use. + */ + private val mongoDb by option( + "--mongo-db", + help = "name of the database to use", + envvar = "OPENDC_DB" + ) + .default("opendc") + + /** + * The database host to connect to. + */ + private val mongoHost by option( + "--mongo-host", + help = "database host to connect to", + envvar = "OPENDC_DB_HOST" + ) + .default("localhost") + + /** + * The database port to connect to. + */ + private val mongoPort by option( + "--mongo-port", + help = "database port to connect to", + envvar = "OPENDC_DB_PORT" + ) + .int() + .default(27017) + + /** + * The database user to connect with. + */ + private val mongoUser by option( + "--mongo-user", + help = "database user to connect with", + envvar = "OPENDC_DB_USER" + ) + .default("opendc") + + /** + * The database password to connect with. + */ + private val mongoPassword by option( + "--mongo-password", + help = "database password to connect with", + envvar = "OPENDC_DB_PASSWORD" + ) + .convert { it.toCharArray() } + .required() + + /** + * The path to the traces directory. + */ + private val tracePath by option( + "--traces", + help = "path to the directory containing the traces", + envvar = "OPENDC_TRACES" + ) + .file(canBeFile = false) + .defaultLazy { File("traces/") } + + /** + * The path to the output directory. + */ + private val outputPath by option( + "--output", + help = "path to the results directory" + ) + .file(canBeFile = false) + .defaultLazy { File("results/") } + + /** + * Connect to the user-specified database. + */ + private fun createDatabase(): MongoDatabase { + val credential = MongoCredential.createScramSha1Credential( + mongoUser, + mongoDb, + mongoPassword + ) + + val settings = MongoClientSettings.builder() + .credential(credential) + .applyToClusterSettings { it.hosts(listOf(ServerAddress(mongoHost, mongoPort))) } + .build() + val client = MongoClients.create(settings) + return client.getDatabase(mongoDb) + } + + /** + * Run a single scenario. + */ + private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection) { + val id = scenario.getString("_id") + val traceReader = Sc20RawParquetTraceReader( + File( + tracePath, + scenario.getEmbedded(listOf("trace", "traceId"), String::class.java) + ) + ) + + val targets = portfolio.get("targets", Document::class.java) + + repeat(targets.getInteger("repeatsPerScenario")) { + logger.info { "Starting repeat $it" } + runRepeat(scenario, it, topologies, traceReader) + } + + logger.info { "Finished scenario $id" } + } + + /** + * Run a single repeat. + */ + private suspend fun runRepeat( + scenario: Document, + repeat: Int, + topologies: MongoCollection, + traceReader: Sc20RawParquetTraceReader + ) { + val id = scenario.getString("_id") + val seed = repeat + val traceDocument = scenario.get("trace", Document::class.java) + val workloadName = traceDocument.getString("traceId") + val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() + + val seeder = Random(seed) + val system = provider("experiment-$id") + val root = system.newDomain("root") + + val chan = Channel(Channel.CONFLATED) + + val operational = scenario.get("operational", Document::class.java) + val allocationPolicy = + when (val policyName = operational.getString("schedulerName")) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) + else -> throw IllegalArgumentException("Unknown policy $policyName") + } + + val trace = Sc20ParquetTraceReader( + listOf(traceReader), + emptyMap(), + Workload(workloadName, workloadFraction), + seed + ) + val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), String::class.java) + val environment = TopologyParser(topologies, topologyId) + val monitor = ParquetExperimentMonitor( + outputPath, + "scenario_id=$id/run_id=$repeat", + 4096 + ) + + root.launch { + val (bareMetalProvisioner, scheduler) = createProvisioner( + root, + environment, + allocationPolicy + ) + + val failureDomain = if (operational.getBoolean("failuresEnabled")) { + logger.debug("ENABLING failures") + createFailureDomain( + seeder.nextInt(), + operational.getDouble("failureFrequency"), + bareMetalProvisioner, + chan + ) + } else { + null + } + + attachMonitor(scheduler, monitor) + processTrace( + trace, + scheduler, + chan, + monitor, + emptyMap() + ) + + logger.debug("SUBMIT=${scheduler.submittedVms}") + logger.debug("FAIL=${scheduler.unscheduledVms}") + logger.debug("QUEUED=${scheduler.queuedVms}") + logger.debug("RUNNING=${scheduler.runningVms}") + logger.debug("FINISHED=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + } + + try { + system.run() + } finally { + system.terminate() + monitor.close() + } + } + + override fun run() = runBlocking(Dispatchers.Default) { logger.info { "Starting OpenDC web runner" } + + logger.info { "Connecting to MongoDB instance" } + val database = createDatabase() + val manager = ScenarioManager(database.getCollection("scenarios")) + val portfolios = database.getCollection("portfolios") + val topologies = database.getCollection("topologies") + + logger.info { "Watching for queued scenarios" } + + while (true) { + val scenario = manager.findNext() + + if (scenario == null) { + delay(5000) + continue + } + + val id = scenario.getString("_id") + + logger.info { "Found queued scenario $id: attempting to claim" } + + if (!manager.claim(id)) { + logger.info { "Failed to claim scenario" } + continue + } + + coroutineScope { + // Launch heartbeat process + launch { + delay(60000) + manager.heartbeat(id) + } + + try { + val portfolio = portfolios.find(Filters.eq("_id", scenario.getString("portfolioId"))).first()!! + runScenario(portfolio, scenario, topologies) + manager.finish(id) + } catch (e: Exception) { + logger.warn(e) { "Scenario failed to finish" } + manager.fail(id) + } + } + } } } diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt new file mode 100644 index 00000000..0f375385 --- /dev/null +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt @@ -0,0 +1,77 @@ +package com.atlarge.opendc.runner.web + +import com.mongodb.client.MongoCollection +import com.mongodb.client.model.Filters +import com.mongodb.client.model.Updates +import java.time.Instant +import org.bson.Document + +/** + * Manages the queue of scenarios that need to be processed. + */ +class ScenarioManager(private val collection: MongoCollection) { + /** + * Find the next scenario that the simulator needs to process. + */ + fun findNext(): Document? { + return collection + .find(Filters.eq("simulation.state", "QUEUED")) + .first() + } + + /** + * Claim the scenario in the database with the specified id. + */ + fun claim(id: String): Boolean { + val res = collection.findOneAndUpdate( + Filters.and( + Filters.eq("_id", id), + Filters.eq("simulation.state", "QUEUED") + ), + Updates.combine( + Updates.set("simulation.state", "RUNNING"), + Updates.set("simulation.time", Instant.now()) + ) + ) + return res != null + } + + /** + * Update the heartbeat of the specified scenario. + */ + fun heartbeat(id: String) { + collection.findOneAndUpdate( + Filters.and( + Filters.eq("_id", id), + Filters.eq("simulation.state", "RUNNING") + ), + Updates.set("simulation.time", Instant.now()) + ) + } + + /** + * Mark the scenario as failed. + */ + fun fail(id: String) { + collection.findOneAndUpdate( + Filters.and( + Filters.eq("_id", id), + Filters.eq("simulation.state", "FAILED") + ), + Updates.set("simulation.time", Instant.now()) + ) + } + + /** + * Mark the scenario as finished. + */ + fun finish(id: String) { + collection.findOneAndUpdate( + Filters.and( + Filters.eq("_id", id), + Filters.eq("simulation.state", "FINISHED") + ), + Updates.set("simulation.time", Instant.now()) + ) + } +} diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt new file mode 100644 index 00000000..499585ec --- /dev/null +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt @@ -0,0 +1,127 @@ +package com.atlarge.opendc.runner.web + +import com.atlarge.odcsim.Domain +import com.atlarge.opendc.compute.core.MemoryUnit +import com.atlarge.opendc.compute.core.ProcessingNode +import com.atlarge.opendc.compute.core.ProcessingUnit +import com.atlarge.opendc.compute.metal.NODE_CLUSTER +import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver +import com.atlarge.opendc.compute.metal.power.LinearLoadPowerModel +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService +import com.atlarge.opendc.core.Environment +import com.atlarge.opendc.core.Platform +import com.atlarge.opendc.core.Zone +import com.atlarge.opendc.core.services.ServiceRegistry +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.mongodb.client.AggregateIterable +import com.mongodb.client.MongoCollection +import com.mongodb.client.model.Aggregates +import com.mongodb.client.model.Field +import com.mongodb.client.model.Filters +import com.mongodb.client.model.Projections +import java.util.* +import kotlinx.coroutines.launch +import org.bson.Document + +/** + * A helper class that converts the MongoDB topology into an OpenDC environment. + */ +class TopologyParser(private val collection: MongoCollection, private val id: String) : EnvironmentReader { + /** + * Parse the topology with the specified [id]. + */ + override suspend fun construct(dom: Domain): Environment { + val nodes = mutableListOf() + val random = Random(0) + + for (machine in fetchMachines(id)) { + val machineId = machine.getString("_id") + val clusterId = machine.getString("rack_id") + val position = machine.getInteger("position") + + val processors = machine.getList("cpus", Document::class.java).flatMap { cpu -> + val cores = cpu.getInteger("numberOfCores") + val speed = cpu.get("clockRateMhz", Number::class.java).toDouble() + // TODO Remove hardcoding of vendor + val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores) + List(cores) { coreId -> + ProcessingUnit(node, coreId, speed) + } + } + val memoryUnits = machine.getList("memories", Document::class.java).map { memory -> + MemoryUnit( + "Samsung", + memory.getString("name"), + memory.get("speedMbPerS", Number::class.java).toDouble(), + memory.get("sizeMb", Number::class.java).toLong() + ) + } + nodes.add( + SimpleBareMetalDriver( + dom.newDomain(machineId), + UUID(random.nextLong(), random.nextLong()), + "node-$clusterId-$position", + mapOf(NODE_CLUSTER to clusterId), + processors, + memoryUnits, + // For now we assume a simple linear load model with an idle draw of ~200W and a maximum + // power draw of 350W. + // Source: https://stackoverflow.com/questions/6128960 + LinearLoadPowerModel(200.0, 350.0) + ) + ) + } + + val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner")) + dom.launch { + for (node in nodes) { + provisioningService.create(node) + } + } + + val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) + + val platform = Platform( + UUID.randomUUID(), "opendc-platform", listOf( + Zone(UUID.randomUUID(), "zone", serviceRegistry) + ) + ) + + return Environment(fetchName(id), null, listOf(platform)) + } + + override fun close() {} + + /** + * Fetch the metadata of the topology. + */ + private fun fetchName(id: String): String { + return collection.aggregate( + listOf( + Aggregates.match(Filters.eq("_id", id)), + Aggregates.project(Projections.include("name")) + ) + ) + .first()!! + .getString("name") + } + + /** + * Fetch a topology from the database with the specified [id]. + */ + private fun fetchMachines(id: String): AggregateIterable { + return collection.aggregate( + listOf( + Aggregates.match(Filters.eq("_id", id)), + Aggregates.project(Projections.fields(Document("racks", "\$rooms.tiles.rack"))), + Aggregates.unwind("\$racks"), + Aggregates.unwind("\$racks"), + Aggregates.replaceRoot("\$racks"), + Aggregates.addFields(Field("machines.rack_id", "\$_id")), + Aggregates.unwind("\$machines"), + Aggregates.replaceRoot("\$machines") + ) + ) + } +} -- cgit v1.2.3 From 9f85e80e40a663e3ebaf46a16f27332c4b7f0b53 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 17 Jul 2020 11:38:39 +0200 Subject: Enable support for failures and perf. interference --- .../kotlin/com/atlarge/opendc/runner/web/Main.kt | 33 ++++++++++++++++------ 1 file changed, 24 insertions(+), 9 deletions(-) (limited to 'simulator/opendc/opendc-runner-web/src/main') diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt index d70ad6bd..fe1913f8 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt @@ -10,6 +10,7 @@ import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentM import com.atlarge.opendc.experiments.sc20.experiment.processTrace import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader +import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.options.* import com.github.ajalt.clikt.parameters.types.file @@ -136,18 +137,30 @@ class RunnerCli : CliktCommand(name = "runner") { */ private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection) { val id = scenario.getString("_id") - val traceReader = Sc20RawParquetTraceReader( - File( - tracePath, - scenario.getEmbedded(listOf("trace", "traceId"), String::class.java) - ) + + logger.info { "Constructing performance interference model" } + + val traceDir = File( + tracePath, + scenario.getEmbedded(listOf("trace", "traceId"), String::class.java) ) + val traceReader = Sc20RawParquetTraceReader(traceDir) + val performanceInterferenceReader = let { + val path = File(traceDir, "performance-interference-model.json") + val enabled = scenario.getEmbedded(listOf("operational", "performanceInterferenceEnabled"), Boolean::class.java) + + if (!enabled || !path.exists()) { + return@let null + } + + path.inputStream().use { Sc20PerformanceInterferenceReader(it) } + } val targets = portfolio.get("targets", Document::class.java) repeat(targets.getInteger("repeatsPerScenario")) { logger.info { "Starting repeat $it" } - runRepeat(scenario, it, topologies, traceReader) + runRepeat(scenario, it, topologies, traceReader, performanceInterferenceReader) } logger.info { "Finished scenario $id" } @@ -160,7 +173,8 @@ class RunnerCli : CliktCommand(name = "runner") { scenario: Document, repeat: Int, topologies: MongoCollection, - traceReader: Sc20RawParquetTraceReader + traceReader: Sc20RawParquetTraceReader, + performanceInterferenceReader: Sc20PerformanceInterferenceReader? ) { val id = scenario.getString("_id") val seed = repeat @@ -189,9 +203,10 @@ class RunnerCli : CliktCommand(name = "runner") { else -> throw IllegalArgumentException("Unknown policy $policyName") } + val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap() val trace = Sc20ParquetTraceReader( listOf(traceReader), - emptyMap(), + performanceInterferenceModel, Workload(workloadName, workloadFraction), seed ) @@ -214,7 +229,7 @@ class RunnerCli : CliktCommand(name = "runner") { logger.debug("ENABLING failures") createFailureDomain( seeder.nextInt(), - operational.getDouble("failureFrequency"), + operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7, bareMetalProvisioner, chan ) -- cgit v1.2.3 From 0a895abfe307fbb6a28ceac6a07c5ac4863627fd Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 17 Jul 2020 17:25:45 +0200 Subject: Add data processing pipeline via Spark This change adds support for processing the experimental results by means of a Spark data processing pipeline. --- .../kotlin/com/atlarge/opendc/runner/web/Main.kt | 30 +++- .../atlarge/opendc/runner/web/ResultProcessor.kt | 187 +++++++++++++++++++++ .../atlarge/opendc/runner/web/ScenarioManager.kt | 44 +++-- .../src/main/resources/log4j2.xml | 3 + 4 files changed, 245 insertions(+), 19 deletions(-) create mode 100644 simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt (limited to 'simulator/opendc/opendc-runner-web/src/main') diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt index fe1913f8..86696887 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt @@ -109,11 +109,22 @@ class RunnerCli : CliktCommand(name = "runner") { */ private val outputPath by option( "--output", - help = "path to the results directory" + help = "path to the results directory", + envvar = "OPENDC_OUTPUT" ) .file(canBeFile = false) .defaultLazy { File("results/") } + /** + * The Spark master to connect to. + */ + private val spark by option( + "--spark", + help = "Spark master to connect to", + envvar = "OPENDC_SPARK" + ) + .required() + /** * Connect to the user-specified database. */ @@ -147,7 +158,8 @@ class RunnerCli : CliktCommand(name = "runner") { val traceReader = Sc20RawParquetTraceReader(traceDir) val performanceInterferenceReader = let { val path = File(traceDir, "performance-interference-model.json") - val enabled = scenario.getEmbedded(listOf("operational", "performanceInterferenceEnabled"), Boolean::class.java) + val operational = scenario.get("operational", Document::class.java) + val enabled = operational.getBoolean("performanceInterferenceEnabled") if (!enabled || !path.exists()) { return@let null @@ -163,7 +175,7 @@ class RunnerCli : CliktCommand(name = "runner") { runRepeat(scenario, it, topologies, traceReader, performanceInterferenceReader) } - logger.info { "Finished scenario $id" } + logger.info { "Finished simulation for scenario $id" } } /** @@ -266,13 +278,15 @@ class RunnerCli : CliktCommand(name = "runner") { override fun run() = runBlocking(Dispatchers.Default) { logger.info { "Starting OpenDC web runner" } - logger.info { "Connecting to MongoDB instance" } val database = createDatabase() val manager = ScenarioManager(database.getCollection("scenarios")) val portfolios = database.getCollection("portfolios") val topologies = database.getCollection("topologies") + logger.info { "Loading Spark" } + val resultProcessor = ResultProcessor(spark, outputPath) + logger.info { "Watching for queued scenarios" } while (true) { @@ -302,7 +316,13 @@ class RunnerCli : CliktCommand(name = "runner") { try { val portfolio = portfolios.find(Filters.eq("_id", scenario.getString("portfolioId"))).first()!! runScenario(portfolio, scenario, topologies) - manager.finish(id) + + logger.info { "Starting result processing" } + + val result = resultProcessor.process(id) + manager.finish(id, result) + + logger.info { "Successfully finished scenario $id" } } catch (e: Exception) { logger.warn(e) { "Scenario failed to finish" } manager.fail(id) diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt new file mode 100644 index 00000000..39092653 --- /dev/null +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt @@ -0,0 +1,187 @@ +package com.atlarge.opendc.runner.web + +import java.io.File +import org.apache.spark.sql.Column +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.Row +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.* + +/** + * A helper class for processing the experiment results using Apache Spark. + */ +class ResultProcessor(private val master: String, private val outputPath: File) { + /** + * Process the results of the scenario with the given [id]. + */ + fun process(id: String): Result { + val spark = SparkSession.builder() + .master(master) + .appName("opendc-simulator-$id") + .config("spark.driver.bindAddress", "0.0.0.0") // Needed to allow the worker to connect to driver + .orCreate + + try { + val hostMetrics = spark.read().parquet(File(outputPath, "host-metrics/scenario_id=$id").path) + val provisionerMetrics = spark.read().parquet(File(outputPath, "provisioner-metrics/scenario_id=$id").path) + val res = aggregate(hostMetrics, provisionerMetrics).first() + + return Result( + res.getList(1), + res.getList(2), + res.getList(3), + res.getList(4), + res.getList(5), + res.getList(6), + res.getList(7), + res.getList(8), + res.getList(9), + res.getList(10), + res.getList(11), + res.getList(12), + res.getList(13), + res.getList(14), + res.getList(15) + ) + } finally { + spark.close() + } + } + + data class Result( + val totalRequestedBurst: List, + val totalGrantedBurst: List, + val totalOvercommittedBurst: List, + val totalInterferedBurst: List, + val meanCpuUsage: List, + val meanCpuDemand: List, + val meanNumDeployedImages: List, + val maxNumDeployedImages: List, + val totalPowerDraw: List, + val totalFailureSlices: List, + val totalFailureVmSlices: List, + val totalVmsSubmitted: List, + val totalVmsQueued: List, + val totalVmsFinished: List, + val totalVmsFailed: List + ) + + /** + * Perform aggregation of the experiment results. + */ + private fun aggregate(hostMetrics: Dataset, provisionerMetrics: Dataset): Dataset { + // Extrapolate the duration of the entries to span the entire trace + val hostMetricsExtra = hostMetrics + .withColumn("slice_counts", floor(col("duration") / lit(sliceLength))) + .withColumn("power_draw", col("power_draw") * col("slice_counts")) + .withColumn("state_int", states[col("state")]) + .withColumn("state_opposite_int", oppositeStates[col("state")]) + .withColumn("cpu_usage", col("cpu_usage") * col("slice_counts") * col("state_opposite_int")) + .withColumn("cpu_demand", col("cpu_demand") * col("slice_counts")) + .withColumn("failure_slice_count", col("slice_counts") * col("state_int")) + .withColumn("failure_vm_slice_count", col("slice_counts") * col("state_int") * col("vm_count")) + + // Process all data in a single run + val hostMetricsGrouped = hostMetricsExtra.groupBy("run_id") + + // Aggregate the summed total metrics + val systemMetrics = hostMetricsGrouped.agg( + sum("requested_burst").alias("total_requested_burst"), + sum("granted_burst").alias("total_granted_burst"), + sum("overcommissioned_burst").alias("total_overcommitted_burst"), + sum("interfered_burst").alias("total_interfered_burst"), + sum("power_draw").alias("total_power_draw"), + sum("failure_slice_count").alias("total_failure_slices"), + sum("failure_vm_slice_count").alias("total_failure_vm_slices") + ) + + // Aggregate metrics per host + val hvMetrics = hostMetrics + .groupBy("run_id", "host_id") + .agg( + sum("cpu_usage").alias("mean_cpu_usage"), + sum("cpu_demand").alias("mean_cpu_demand"), + avg("vm_count").alias("mean_num_deployed_images"), + count(lit(1)).alias("num_rows") + ) + .withColumn("mean_cpu_usage", col("mean_cpu_usage") / col("num_rows")) + .withColumn("mean_cpu_demand", col("mean_cpu_demand") / col("num_rows")) + .groupBy("run_id") + .agg( + avg("mean_cpu_usage").alias("mean_cpu_usage"), + avg("mean_cpu_demand").alias("mean_cpu_demand"), + avg("mean_num_deployed_images").alias("mean_num_deployed_images"), + max("mean_num_deployed_images").alias("max_num_deployed_images") + ) + + // Group the provisioner metrics per run + val provisionerMetricsGrouped = provisionerMetrics.groupBy("run_id") + + // Aggregate the provisioner metrics + val provisionerMetricsAggregated = provisionerMetricsGrouped.agg( + max("vm_total_count").alias("total_vms_submitted"), + max("vm_waiting_count").alias("total_vms_queued"), + max("vm_active_count").alias("total_vms_running"), + max("vm_inactive_count").alias("total_vms_finished"), + max("vm_failed_count").alias("total_vms_failed") + ) + + // Join the results into a single data frame + return systemMetrics + .join(hvMetrics, "run_id") + .join(provisionerMetricsAggregated, "run_id") + .select( + col("total_requested_burst"), + col("total_granted_burst"), + col("total_overcommitted_burst"), + col("total_interfered_burst"), + col("mean_cpu_usage"), + col("mean_cpu_demand"), + col("mean_num_deployed_images"), + col("max_num_deployed_images"), + col("total_power_draw"), + col("total_failure_slices"), + col("total_failure_vm_slices"), + col("total_vms_submitted"), + col("total_vms_queued"), + col("total_vms_finished"), + col("total_vms_failed") + ) + .groupBy(lit(1)) + .agg( + // TODO Check if order of values is correct + collect_list(col("total_requested_burst")).alias("total_requested_burst"), + collect_list(col("total_granted_burst")).alias("total_granted_burst"), + collect_list(col("total_overcommitted_burst")).alias("total_overcommitted_burst"), + collect_list(col("total_interfered_burst")).alias("total_interfered_burst"), + collect_list(col("mean_cpu_usage")).alias("mean_cpu_usage"), + collect_list(col("mean_cpu_demand")).alias("mean_cpu_demand"), + collect_list(col("mean_num_deployed_images")).alias("mean_num_deployed_images"), + collect_list(col("max_num_deployed_images")).alias("max_num_deployed_images"), + collect_list(col("total_power_draw")).alias("total_power_draw"), + collect_list(col("total_failure_slices")).alias("total_failure_slices"), + collect_list(col("total_failure_vm_slices")).alias("total_failure_vm_slices"), + collect_list(col("total_vms_submitted")).alias("total_vms_submitted"), + collect_list(col("total_vms_queued")).alias("total_vms_queued"), + collect_list(col("total_vms_finished")).alias("total_vms_finished"), + collect_list(col("total_vms_failed")).alias("total_vms_failed") + ) + } + + // Spark helper functions + operator fun Column.times(other: Column): Column = `$times`(other) + operator fun Column.div(other: Column): Column = `$div`(other) + operator fun Column.get(other: Column): Column = this.apply(other) + + val sliceLength = 5 * 60 * 1000 + val states = map( + lit("ERROR"), lit(1), + lit("ACTIVE"), lit(0), + lit("SHUTOFF"), lit(0) + ) + val oppositeStates = map( + lit("ERROR"), lit(0), + lit("ACTIVE"), lit(1), + lit("SHUTOFF"), lit(1) + ) +} diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt index 0f375385..40ffd282 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt @@ -30,7 +30,7 @@ class ScenarioManager(private val collection: MongoCollection) { ), Updates.combine( Updates.set("simulation.state", "RUNNING"), - Updates.set("simulation.time", Instant.now()) + Updates.set("simulation.heartbeat", Instant.now()) ) ) return res != null @@ -45,7 +45,7 @@ class ScenarioManager(private val collection: MongoCollection) { Filters.eq("_id", id), Filters.eq("simulation.state", "RUNNING") ), - Updates.set("simulation.time", Instant.now()) + Updates.set("simulation.heartbeat", Instant.now()) ) } @@ -54,24 +54,40 @@ class ScenarioManager(private val collection: MongoCollection) { */ fun fail(id: String) { collection.findOneAndUpdate( - Filters.and( - Filters.eq("_id", id), - Filters.eq("simulation.state", "FAILED") - ), - Updates.set("simulation.time", Instant.now()) + Filters.eq("_id", id), + Updates.combine( + Updates.set("simulation.state", "FAILED"), + Updates.set("simulation.heartbeat", Instant.now()) + ) ) } /** - * Mark the scenario as finished. + * Persist the specified results. */ - fun finish(id: String) { + fun finish(id: String, result: ResultProcessor.Result) { collection.findOneAndUpdate( - Filters.and( - Filters.eq("_id", id), - Filters.eq("simulation.state", "FINISHED") - ), - Updates.set("simulation.time", Instant.now()) + Filters.eq("_id", id), + Updates.combine( + Updates.set("simulation.state", "FINISHED"), + Updates.unset("simulation.time"), + Updates.set("results.total_requested_burst", result.totalRequestedBurst), + Updates.set("results.total_granted_burst", result.totalGrantedBurst), + Updates.set("results.total_overcommitted_burst", result.totalOvercommittedBurst), + Updates.set("results.total_interfered_burst", result.totalInterferedBurst), + Updates.set("results.mean_cpu_usage", result.meanCpuUsage), + Updates.set("results.mean_cpu_demand", result.meanCpuDemand), + Updates.set("results.mean_num_deployed_images", result.meanNumDeployedImages), + Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), + Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), + Updates.set("results.total_power_draw", result.totalPowerDraw), + Updates.set("results.total_failure_slices", result.totalFailureSlices), + Updates.set("results.total_failure_vm_slices", result.totalFailureVmSlices), + Updates.set("results.total_vms_submitted", result.totalVmsSubmitted), + Updates.set("results.total_vms_queued", result.totalVmsQueued), + Updates.set("results.total_vms_finished", result.totalVmsFinished), + Updates.set("results.total_vms_failed", result.totalVmsFailed) + ) ) } } diff --git a/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml b/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml index b5a2bbb5..1d873554 100644 --- a/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml +++ b/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml @@ -42,6 +42,9 @@ + + + -- cgit v1.2.3 From bde8b51fc40a02e6e8514ff428a748a133502c34 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 18 Jul 2020 16:47:32 +0200 Subject: Default to local Spark instance --- .../src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'simulator/opendc/opendc-runner-web/src/main') diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt index 86696887..0ff9b870 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt @@ -123,7 +123,7 @@ class RunnerCli : CliktCommand(name = "runner") { help = "Spark master to connect to", envvar = "OPENDC_SPARK" ) - .required() + .default("local[*]") /** * Connect to the user-specified database. @@ -284,7 +284,7 @@ class RunnerCli : CliktCommand(name = "runner") { val portfolios = database.getCollection("portfolios") val topologies = database.getCollection("topologies") - logger.info { "Loading Spark" } + logger.info { "Launching Spark" } val resultProcessor = ResultProcessor(spark, outputPath) logger.info { "Watching for queued scenarios" } -- cgit v1.2.3