diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-07-16 22:04:35 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-08-24 19:48:06 +0200 |
| commit | 5d528f6b1902d372eb2ef594bc96712ad74ac361 (patch) | |
| tree | cc17c4b72724aba6a1038412f891c2d0e506d6a8 /simulator/opendc/opendc-runner-web/src | |
| parent | a4ae44e7f5bbfb293cdce256da3c40f927605ac9 (diff) | |
Add prototype of web experiment runner
This change adds a bridge between the frontend and the new simulator
implementation via MongoDB.
Diffstat (limited to 'simulator/opendc/opendc-runner-web/src')
3 files changed, 488 insertions, 1 deletions
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt index 3cee259c..d70ad6bd 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt @@ -1,16 +1,299 @@ package com.atlarge.opendc.runner.web +import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.opendc.compute.virt.service.allocation.* +import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor +import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain +import com.atlarge.opendc.experiments.sc20.experiment.createProvisioner +import com.atlarge.opendc.experiments.sc20.experiment.model.Workload +import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor +import com.atlarge.opendc.experiments.sc20.experiment.processTrace +import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader +import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.options.* +import com.github.ajalt.clikt.parameters.types.file +import com.github.ajalt.clikt.parameters.types.int +import com.mongodb.MongoClientSettings +import com.mongodb.MongoCredential +import com.mongodb.ServerAddress +import com.mongodb.client.MongoClients +import com.mongodb.client.MongoCollection +import com.mongodb.client.MongoDatabase +import com.mongodb.client.model.Filters +import java.io.File +import java.util.* +import kotlin.random.Random +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel import mu.KotlinLogging +import org.bson.Document private val logger = KotlinLogging.logger {} /** + * The provider for the simulation engine to use. + */ +private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() + +/** * Represents the CLI command for starting the OpenDC web runner. */ class RunnerCli : CliktCommand(name = "runner") { - override fun run() { + /** + * The name of the database to use. + */ + private val mongoDb by option( + "--mongo-db", + help = "name of the database to use", + envvar = "OPENDC_DB" + ) + .default("opendc") + + /** + * The database host to connect to. + */ + private val mongoHost by option( + "--mongo-host", + help = "database host to connect to", + envvar = "OPENDC_DB_HOST" + ) + .default("localhost") + + /** + * The database port to connect to. + */ + private val mongoPort by option( + "--mongo-port", + help = "database port to connect to", + envvar = "OPENDC_DB_PORT" + ) + .int() + .default(27017) + + /** + * The database user to connect with. + */ + private val mongoUser by option( + "--mongo-user", + help = "database user to connect with", + envvar = "OPENDC_DB_USER" + ) + .default("opendc") + + /** + * The database password to connect with. + */ + private val mongoPassword by option( + "--mongo-password", + help = "database password to connect with", + envvar = "OPENDC_DB_PASSWORD" + ) + .convert { it.toCharArray() } + .required() + + /** + * The path to the traces directory. + */ + private val tracePath by option( + "--traces", + help = "path to the directory containing the traces", + envvar = "OPENDC_TRACES" + ) + .file(canBeFile = false) + .defaultLazy { File("traces/") } + + /** + * The path to the output directory. + */ + private val outputPath by option( + "--output", + help = "path to the results directory" + ) + .file(canBeFile = false) + .defaultLazy { File("results/") } + + /** + * Connect to the user-specified database. + */ + private fun createDatabase(): MongoDatabase { + val credential = MongoCredential.createScramSha1Credential( + mongoUser, + mongoDb, + mongoPassword + ) + + val settings = MongoClientSettings.builder() + .credential(credential) + .applyToClusterSettings { it.hosts(listOf(ServerAddress(mongoHost, mongoPort))) } + .build() + val client = MongoClients.create(settings) + return client.getDatabase(mongoDb) + } + + /** + * Run a single scenario. + */ + private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection<Document>) { + val id = scenario.getString("_id") + val traceReader = Sc20RawParquetTraceReader( + File( + tracePath, + scenario.getEmbedded(listOf("trace", "traceId"), String::class.java) + ) + ) + + val targets = portfolio.get("targets", Document::class.java) + + repeat(targets.getInteger("repeatsPerScenario")) { + logger.info { "Starting repeat $it" } + runRepeat(scenario, it, topologies, traceReader) + } + + logger.info { "Finished scenario $id" } + } + + /** + * Run a single repeat. + */ + private suspend fun runRepeat( + scenario: Document, + repeat: Int, + topologies: MongoCollection<Document>, + traceReader: Sc20RawParquetTraceReader + ) { + val id = scenario.getString("_id") + val seed = repeat + val traceDocument = scenario.get("trace", Document::class.java) + val workloadName = traceDocument.getString("traceId") + val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() + + val seeder = Random(seed) + val system = provider("experiment-$id") + val root = system.newDomain("root") + + val chan = Channel<Unit>(Channel.CONFLATED) + + val operational = scenario.get("operational", Document::class.java) + val allocationPolicy = + when (val policyName = operational.getString("schedulerName")) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) + else -> throw IllegalArgumentException("Unknown policy $policyName") + } + + val trace = Sc20ParquetTraceReader( + listOf(traceReader), + emptyMap(), + Workload(workloadName, workloadFraction), + seed + ) + val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), String::class.java) + val environment = TopologyParser(topologies, topologyId) + val monitor = ParquetExperimentMonitor( + outputPath, + "scenario_id=$id/run_id=$repeat", + 4096 + ) + + root.launch { + val (bareMetalProvisioner, scheduler) = createProvisioner( + root, + environment, + allocationPolicy + ) + + val failureDomain = if (operational.getBoolean("failuresEnabled")) { + logger.debug("ENABLING failures") + createFailureDomain( + seeder.nextInt(), + operational.getDouble("failureFrequency"), + bareMetalProvisioner, + chan + ) + } else { + null + } + + attachMonitor(scheduler, monitor) + processTrace( + trace, + scheduler, + chan, + monitor, + emptyMap() + ) + + logger.debug("SUBMIT=${scheduler.submittedVms}") + logger.debug("FAIL=${scheduler.unscheduledVms}") + logger.debug("QUEUED=${scheduler.queuedVms}") + logger.debug("RUNNING=${scheduler.runningVms}") + logger.debug("FINISHED=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + } + + try { + system.run() + } finally { + system.terminate() + monitor.close() + } + } + + override fun run() = runBlocking(Dispatchers.Default) { logger.info { "Starting OpenDC web runner" } + + logger.info { "Connecting to MongoDB instance" } + val database = createDatabase() + val manager = ScenarioManager(database.getCollection("scenarios")) + val portfolios = database.getCollection("portfolios") + val topologies = database.getCollection("topologies") + + logger.info { "Watching for queued scenarios" } + + while (true) { + val scenario = manager.findNext() + + if (scenario == null) { + delay(5000) + continue + } + + val id = scenario.getString("_id") + + logger.info { "Found queued scenario $id: attempting to claim" } + + if (!manager.claim(id)) { + logger.info { "Failed to claim scenario" } + continue + } + + coroutineScope { + // Launch heartbeat process + launch { + delay(60000) + manager.heartbeat(id) + } + + try { + val portfolio = portfolios.find(Filters.eq("_id", scenario.getString("portfolioId"))).first()!! + runScenario(portfolio, scenario, topologies) + manager.finish(id) + } catch (e: Exception) { + logger.warn(e) { "Scenario failed to finish" } + manager.fail(id) + } + } + } } } diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt new file mode 100644 index 00000000..0f375385 --- /dev/null +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt @@ -0,0 +1,77 @@ +package com.atlarge.opendc.runner.web + +import com.mongodb.client.MongoCollection +import com.mongodb.client.model.Filters +import com.mongodb.client.model.Updates +import java.time.Instant +import org.bson.Document + +/** + * Manages the queue of scenarios that need to be processed. + */ +class ScenarioManager(private val collection: MongoCollection<Document>) { + /** + * Find the next scenario that the simulator needs to process. + */ + fun findNext(): Document? { + return collection + .find(Filters.eq("simulation.state", "QUEUED")) + .first() + } + + /** + * Claim the scenario in the database with the specified id. + */ + fun claim(id: String): Boolean { + val res = collection.findOneAndUpdate( + Filters.and( + Filters.eq("_id", id), + Filters.eq("simulation.state", "QUEUED") + ), + Updates.combine( + Updates.set("simulation.state", "RUNNING"), + Updates.set("simulation.time", Instant.now()) + ) + ) + return res != null + } + + /** + * Update the heartbeat of the specified scenario. + */ + fun heartbeat(id: String) { + collection.findOneAndUpdate( + Filters.and( + Filters.eq("_id", id), + Filters.eq("simulation.state", "RUNNING") + ), + Updates.set("simulation.time", Instant.now()) + ) + } + + /** + * Mark the scenario as failed. + */ + fun fail(id: String) { + collection.findOneAndUpdate( + Filters.and( + Filters.eq("_id", id), + Filters.eq("simulation.state", "FAILED") + ), + Updates.set("simulation.time", Instant.now()) + ) + } + + /** + * Mark the scenario as finished. + */ + fun finish(id: String) { + collection.findOneAndUpdate( + Filters.and( + Filters.eq("_id", id), + Filters.eq("simulation.state", "FINISHED") + ), + Updates.set("simulation.time", Instant.now()) + ) + } +} diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt new file mode 100644 index 00000000..499585ec --- /dev/null +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt @@ -0,0 +1,127 @@ +package com.atlarge.opendc.runner.web + +import com.atlarge.odcsim.Domain +import com.atlarge.opendc.compute.core.MemoryUnit +import com.atlarge.opendc.compute.core.ProcessingNode +import com.atlarge.opendc.compute.core.ProcessingUnit +import com.atlarge.opendc.compute.metal.NODE_CLUSTER +import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver +import com.atlarge.opendc.compute.metal.power.LinearLoadPowerModel +import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService +import com.atlarge.opendc.core.Environment +import com.atlarge.opendc.core.Platform +import com.atlarge.opendc.core.Zone +import com.atlarge.opendc.core.services.ServiceRegistry +import com.atlarge.opendc.format.environment.EnvironmentReader +import com.mongodb.client.AggregateIterable +import com.mongodb.client.MongoCollection +import com.mongodb.client.model.Aggregates +import com.mongodb.client.model.Field +import com.mongodb.client.model.Filters +import com.mongodb.client.model.Projections +import java.util.* +import kotlinx.coroutines.launch +import org.bson.Document + +/** + * A helper class that converts the MongoDB topology into an OpenDC environment. + */ +class TopologyParser(private val collection: MongoCollection<Document>, private val id: String) : EnvironmentReader { + /** + * Parse the topology with the specified [id]. + */ + override suspend fun construct(dom: Domain): Environment { + val nodes = mutableListOf<SimpleBareMetalDriver>() + val random = Random(0) + + for (machine in fetchMachines(id)) { + val machineId = machine.getString("_id") + val clusterId = machine.getString("rack_id") + val position = machine.getInteger("position") + + val processors = machine.getList("cpus", Document::class.java).flatMap { cpu -> + val cores = cpu.getInteger("numberOfCores") + val speed = cpu.get("clockRateMhz", Number::class.java).toDouble() + // TODO Remove hardcoding of vendor + val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores) + List(cores) { coreId -> + ProcessingUnit(node, coreId, speed) + } + } + val memoryUnits = machine.getList("memories", Document::class.java).map { memory -> + MemoryUnit( + "Samsung", + memory.getString("name"), + memory.get("speedMbPerS", Number::class.java).toDouble(), + memory.get("sizeMb", Number::class.java).toLong() + ) + } + nodes.add( + SimpleBareMetalDriver( + dom.newDomain(machineId), + UUID(random.nextLong(), random.nextLong()), + "node-$clusterId-$position", + mapOf(NODE_CLUSTER to clusterId), + processors, + memoryUnits, + // For now we assume a simple linear load model with an idle draw of ~200W and a maximum + // power draw of 350W. + // Source: https://stackoverflow.com/questions/6128960 + LinearLoadPowerModel(200.0, 350.0) + ) + ) + } + + val provisioningService = SimpleProvisioningService(dom.newDomain("provisioner")) + dom.launch { + for (node in nodes) { + provisioningService.create(node) + } + } + + val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) + + val platform = Platform( + UUID.randomUUID(), "opendc-platform", listOf( + Zone(UUID.randomUUID(), "zone", serviceRegistry) + ) + ) + + return Environment(fetchName(id), null, listOf(platform)) + } + + override fun close() {} + + /** + * Fetch the metadata of the topology. + */ + private fun fetchName(id: String): String { + return collection.aggregate( + listOf( + Aggregates.match(Filters.eq("_id", id)), + Aggregates.project(Projections.include("name")) + ) + ) + .first()!! + .getString("name") + } + + /** + * Fetch a topology from the database with the specified [id]. + */ + private fun fetchMachines(id: String): AggregateIterable<Document> { + return collection.aggregate( + listOf( + Aggregates.match(Filters.eq("_id", id)), + Aggregates.project(Projections.fields(Document("racks", "\$rooms.tiles.rack"))), + Aggregates.unwind("\$racks"), + Aggregates.unwind("\$racks"), + Aggregates.replaceRoot("\$racks"), + Aggregates.addFields(Field("machines.rack_id", "\$_id")), + Aggregates.unwind("\$machines"), + Aggregates.replaceRoot("\$machines") + ) + ) + } +} |
