From a283fac5e4d2a6be229acba191acdcbf7eba6dcd Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 1 Oct 2020 01:55:20 +0200 Subject: Migrate to org.opendc namespace This change moves the OpenDC simulator codebase to the org.opendc namespace of which we control the domain. Previously, we used the com.atlarge package of which we did not control the domain, which might lead to difficulties in the future. --- simulator/opendc-runner-web/build.gradle.kts | 2 +- .../kotlin/com/atlarge/opendc/runner/web/Main.kt | 346 ------------------- .../atlarge/opendc/runner/web/ResultProcessor.kt | 193 ----------- .../atlarge/opendc/runner/web/ScenarioManager.kt | 93 ------ .../atlarge/opendc/runner/web/TopologyParser.kt | 131 -------- .../src/main/kotlin/org/opendc/runner/web/Main.kt | 368 +++++++++++++++++++++ .../org/opendc/runner/web/ResultProcessor.kt | 215 ++++++++++++ .../org/opendc/runner/web/ScenarioManager.kt | 115 +++++++ .../kotlin/org/opendc/runner/web/TopologyParser.kt | 153 +++++++++ .../src/main/resources/log4j2.xml | 9 +- 10 files changed, 855 insertions(+), 770 deletions(-) delete mode 100644 simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt delete mode 100644 simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt delete mode 100644 simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt delete mode 100644 simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt create mode 100644 simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt create mode 100644 simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt create mode 100644 simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt create mode 100644 simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt (limited to 'simulator/opendc-runner-web') diff --git a/simulator/opendc-runner-web/build.gradle.kts b/simulator/opendc-runner-web/build.gradle.kts index 8c83db87..7e81347c 100644 --- a/simulator/opendc-runner-web/build.gradle.kts +++ b/simulator/opendc-runner-web/build.gradle.kts @@ -29,7 +29,7 @@ plugins { } application { - mainClassName = "com.atlarge.opendc.runner.web.MainKt" + mainClassName = "org.opendc.runner.web.MainKt" } dependencies { diff --git a/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt deleted file mode 100644 index ac4d9087..00000000 --- a/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt +++ /dev/null @@ -1,346 +0,0 @@ -package com.atlarge.opendc.runner.web - -import com.atlarge.opendc.compute.virt.service.allocation.* -import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor -import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain -import com.atlarge.opendc.experiments.sc20.experiment.createProvisioner -import com.atlarge.opendc.experiments.sc20.experiment.model.Workload -import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor -import com.atlarge.opendc.experiments.sc20.experiment.processTrace -import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader -import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader -import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader -import com.github.ajalt.clikt.core.CliktCommand -import com.github.ajalt.clikt.parameters.options.* -import com.github.ajalt.clikt.parameters.types.file -import com.github.ajalt.clikt.parameters.types.int -import com.mongodb.MongoClientSettings -import com.mongodb.MongoCredential -import com.mongodb.ServerAddress -import com.mongodb.client.MongoClients -import com.mongodb.client.MongoCollection -import com.mongodb.client.MongoDatabase -import com.mongodb.client.model.Filters -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.test.TestCoroutineScope -import mu.KotlinLogging -import org.bson.Document -import org.opendc.simulator.utils.DelayControllerClockAdapter -import java.io.File -import java.util.* -import kotlin.random.Random - -private val logger = KotlinLogging.logger {} - -/** - * Represents the CLI command for starting the OpenDC web runner. - */ -@OptIn(ExperimentalCoroutinesApi::class) -class RunnerCli : CliktCommand(name = "runner") { - /** - * The name of the database to use. - */ - private val mongoDb by option( - "--mongo-db", - help = "name of the database to use", - envvar = "OPENDC_DB" - ) - .default("opendc") - - /** - * The database host to connect to. - */ - private val mongoHost by option( - "--mongo-host", - help = "database host to connect to", - envvar = "OPENDC_DB_HOST" - ) - .default("localhost") - - /** - * The database port to connect to. - */ - private val mongoPort by option( - "--mongo-port", - help = "database port to connect to", - envvar = "OPENDC_DB_PORT" - ) - .int() - .default(27017) - - /** - * The database user to connect with. - */ - private val mongoUser by option( - "--mongo-user", - help = "database user to connect with", - envvar = "OPENDC_DB_USER" - ) - .default("opendc") - - /** - * The database password to connect with. - */ - private val mongoPassword by option( - "--mongo-password", - help = "database password to connect with", - envvar = "OPENDC_DB_PASSWORD" - ) - .convert { it.toCharArray() } - .required() - - /** - * The path to the traces directory. - */ - private val tracePath by option( - "--traces", - help = "path to the directory containing the traces", - envvar = "OPENDC_TRACES" - ) - .file(canBeFile = false) - .defaultLazy { File("traces/") } - - /** - * The path to the output directory. - */ - private val outputPath by option( - "--output", - help = "path to the results directory", - envvar = "OPENDC_OUTPUT" - ) - .file(canBeFile = false) - .defaultLazy { File("results/") } - - /** - * The Spark master to connect to. - */ - private val spark by option( - "--spark", - help = "Spark master to connect to", - envvar = "OPENDC_SPARK" - ) - .default("local[*]") - - /** - * Connect to the user-specified database. - */ - private fun createDatabase(): MongoDatabase { - val credential = MongoCredential.createScramSha1Credential( - mongoUser, - mongoDb, - mongoPassword - ) - - val settings = MongoClientSettings.builder() - .credential(credential) - .applyToClusterSettings { it.hosts(listOf(ServerAddress(mongoHost, mongoPort))) } - .build() - val client = MongoClients.create(settings) - return client.getDatabase(mongoDb) - } - - /** - * Run a single scenario. - */ - private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection) { - val id = scenario.getString("_id") - - logger.info { "Constructing performance interference model" } - - val traceDir = File( - tracePath, - scenario.getEmbedded(listOf("trace", "traceId"), String::class.java) - ) - val traceReader = Sc20RawParquetTraceReader(traceDir) - val performanceInterferenceReader = let { - val path = File(traceDir, "performance-interference-model.json") - val operational = scenario.get("operational", Document::class.java) - val enabled = operational.getBoolean("performanceInterferenceEnabled") - - if (!enabled || !path.exists()) { - return@let null - } - - path.inputStream().use { Sc20PerformanceInterferenceReader(it) } - } - - val targets = portfolio.get("targets", Document::class.java) - - repeat(targets.getInteger("repeatsPerScenario")) { - logger.info { "Starting repeat $it" } - runRepeat(scenario, it, topologies, traceReader, performanceInterferenceReader) - } - - logger.info { "Finished simulation for scenario $id" } - } - - /** - * Run a single repeat. - */ - private suspend fun runRepeat( - scenario: Document, - repeat: Int, - topologies: MongoCollection, - traceReader: Sc20RawParquetTraceReader, - performanceInterferenceReader: Sc20PerformanceInterferenceReader? - ) { - val id = scenario.getString("_id") - val seed = repeat - val traceDocument = scenario.get("trace", Document::class.java) - val workloadName = traceDocument.getString("traceId") - val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() - - val seeder = Random(seed) - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) - - val chan = Channel(Channel.CONFLATED) - - val operational = scenario.get("operational", Document::class.java) - val allocationPolicy = - when (val policyName = operational.getString("schedulerName")) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) - else -> throw IllegalArgumentException("Unknown policy $policyName") - } - - val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap() - val trace = Sc20ParquetTraceReader( - listOf(traceReader), - performanceInterferenceModel, - Workload(workloadName, workloadFraction), - seed - ) - val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), String::class.java) - val environment = TopologyParser(topologies, topologyId) - val monitor = ParquetExperimentMonitor( - outputPath, - "scenario_id=$id/run_id=$repeat", - 4096 - ) - - testScope.launch { - val (bareMetalProvisioner, scheduler) = createProvisioner( - this, - clock, - environment, - allocationPolicy - ) - - val failureDomain = if (operational.getBoolean("failuresEnabled")) { - logger.debug("ENABLING failures") - createFailureDomain( - testScope, - clock, - seeder.nextInt(), - operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7, - bareMetalProvisioner, - chan - ) - } else { - null - } - - attachMonitor(this, clock, scheduler, monitor) - processTrace( - this, - clock, - trace, - scheduler, - chan, - monitor, - emptyMap() - ) - - logger.debug("SUBMIT=${scheduler.submittedVms}") - logger.debug("FAIL=${scheduler.unscheduledVms}") - logger.debug("QUEUED=${scheduler.queuedVms}") - logger.debug("RUNNING=${scheduler.runningVms}") - logger.debug("FINISHED=${scheduler.finishedVms}") - - failureDomain?.cancel() - scheduler.terminate() - } - - try { - testScope.advanceUntilIdle() - } finally { - monitor.close() - } - } - - val POLL_INTERVAL = 5000L // ms = 5 s - val HEARTBEAT_INTERVAL = 60000L // ms = 1 min - - override fun run() = runBlocking(Dispatchers.Default) { - logger.info { "Starting OpenDC web runner" } - logger.info { "Connecting to MongoDB instance" } - val database = createDatabase() - val manager = ScenarioManager(database.getCollection("scenarios")) - val portfolios = database.getCollection("portfolios") - val topologies = database.getCollection("topologies") - - logger.info { "Launching Spark" } - val resultProcessor = ResultProcessor(spark, outputPath) - - logger.info { "Watching for queued scenarios" } - - while (true) { - val scenario = manager.findNext() - - if (scenario == null) { - delay(POLL_INTERVAL) - continue - } - - val id = scenario.getString("_id") - - logger.info { "Found queued scenario $id: attempting to claim" } - - if (!manager.claim(id)) { - logger.info { "Failed to claim scenario" } - continue - } - - coroutineScope { - // Launch heartbeat process - val heartbeat = launch { - while (true) { - delay(HEARTBEAT_INTERVAL) - manager.heartbeat(id) - } - } - - try { - val portfolio = portfolios.find(Filters.eq("_id", scenario.getString("portfolioId"))).first()!! - runScenario(portfolio, scenario, topologies) - - logger.info { "Starting result processing" } - - val result = resultProcessor.process(id) - manager.finish(id, result) - - logger.info { "Successfully finished scenario $id" } - } catch (e: Exception) { - logger.warn(e) { "Scenario failed to finish" } - manager.fail(id) - } finally { - heartbeat.cancel() - } - } - } - } -} - -/** - * Main entry point of the runner. - */ -fun main(args: Array) = RunnerCli().main(args) diff --git a/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt b/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt deleted file mode 100644 index c0b0ac31..00000000 --- a/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt +++ /dev/null @@ -1,193 +0,0 @@ -package com.atlarge.opendc.runner.web - -import org.apache.spark.sql.Column -import org.apache.spark.sql.Dataset -import org.apache.spark.sql.Row -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.functions.* -import java.io.File - -/** - * A helper class for processing the experiment results using Apache Spark. - */ -class ResultProcessor(private val master: String, private val outputPath: File) { - /** - * Process the results of the scenario with the given [id]. - */ - fun process(id: String): Result { - val spark = SparkSession.builder() - .master(master) - .appName("opendc-simulator-$id") - .config("spark.driver.bindAddress", "0.0.0.0") // Needed to allow the worker to connect to driver - .orCreate - - try { - val hostMetrics = spark.read().parquet(File(outputPath, "host-metrics/scenario_id=$id").path) - val provisionerMetrics = spark.read().parquet(File(outputPath, "provisioner-metrics/scenario_id=$id").path) - val res = aggregate(hostMetrics, provisionerMetrics).first() - - return Result( - res.getList(1), - res.getList(2), - res.getList(3), - res.getList(4), - res.getList(5), - res.getList(6), - res.getList(7), - res.getList(8), - res.getList(9), - res.getList(10), - res.getList(11), - res.getList(12), - res.getList(13), - res.getList(14), - res.getList(15) - ) - } finally { - spark.close() - } - } - - data class Result( - val totalRequestedBurst: List, - val totalGrantedBurst: List, - val totalOvercommittedBurst: List, - val totalInterferedBurst: List, - val meanCpuUsage: List, - val meanCpuDemand: List, - val meanNumDeployedImages: List, - val maxNumDeployedImages: List, - val totalPowerDraw: List, - val totalFailureSlices: List, - val totalFailureVmSlices: List, - val totalVmsSubmitted: List, - val totalVmsQueued: List, - val totalVmsFinished: List, - val totalVmsFailed: List - ) - - /** - * Perform aggregation of the experiment results. - */ - private fun aggregate(hostMetrics: Dataset, provisionerMetrics: Dataset): Dataset { - // Extrapolate the duration of the entries to span the entire trace - val hostMetricsExtra = hostMetrics - .withColumn("slice_counts", floor(col("duration") / lit(sliceLength))) - .withColumn("power_draw", col("power_draw") * col("slice_counts")) - .withColumn("state_int", states[col("state")]) - .withColumn("state_opposite_int", oppositeStates[col("state")]) - .withColumn("cpu_usage", col("cpu_usage") * col("slice_counts") * col("state_opposite_int")) - .withColumn("cpu_demand", col("cpu_demand") * col("slice_counts")) - .withColumn("failure_slice_count", col("slice_counts") * col("state_int")) - .withColumn("failure_vm_slice_count", col("slice_counts") * col("state_int") * col("vm_count")) - - // Process all data in a single run - val hostMetricsGrouped = hostMetricsExtra.groupBy("run_id") - - // Aggregate the summed total metrics - val systemMetrics = hostMetricsGrouped.agg( - sum("requested_burst").alias("total_requested_burst"), - sum("granted_burst").alias("total_granted_burst"), - sum("overcommissioned_burst").alias("total_overcommitted_burst"), - sum("interfered_burst").alias("total_interfered_burst"), - sum("power_draw").alias("total_power_draw"), - sum("failure_slice_count").alias("total_failure_slices"), - sum("failure_vm_slice_count").alias("total_failure_vm_slices") - ) - - // Aggregate metrics per host - val hvMetrics = hostMetrics - .groupBy("run_id", "host_id") - .agg( - sum("cpu_usage").alias("mean_cpu_usage"), - sum("cpu_demand").alias("mean_cpu_demand"), - avg("vm_count").alias("mean_num_deployed_images"), - count(lit(1)).alias("num_rows") - ) - .withColumn("mean_cpu_usage", col("mean_cpu_usage") / col("num_rows")) - .withColumn("mean_cpu_demand", col("mean_cpu_demand") / col("num_rows")) - .groupBy("run_id") - .agg( - avg("mean_cpu_usage").alias("mean_cpu_usage"), - avg("mean_cpu_demand").alias("mean_cpu_demand"), - avg("mean_num_deployed_images").alias("mean_num_deployed_images"), - max("mean_num_deployed_images").alias("max_num_deployed_images") - ) - - // Group the provisioner metrics per run - val provisionerMetricsGrouped = provisionerMetrics.groupBy("run_id") - - // Aggregate the provisioner metrics - val provisionerMetricsAggregated = provisionerMetricsGrouped.agg( - max("vm_total_count").alias("total_vms_submitted"), - max("vm_waiting_count").alias("total_vms_queued"), - max("vm_active_count").alias("total_vms_running"), - max("vm_inactive_count").alias("total_vms_finished"), - max("vm_failed_count").alias("total_vms_failed") - ) - - // Join the results into a single data frame - return systemMetrics - .join(hvMetrics, "run_id") - .join(provisionerMetricsAggregated, "run_id") - .select( - col("total_requested_burst"), - col("total_granted_burst"), - col("total_overcommitted_burst"), - col("total_interfered_burst"), - col("mean_cpu_usage"), - col("mean_cpu_demand"), - col("mean_num_deployed_images"), - col("max_num_deployed_images"), - col("total_power_draw"), - col("total_failure_slices"), - col("total_failure_vm_slices"), - col("total_vms_submitted"), - col("total_vms_queued"), - col("total_vms_finished"), - col("total_vms_failed") - ) - .groupBy(lit(1)) - .agg( - // TODO Check if order of values is correct - collect_list(col("total_requested_burst")).alias("total_requested_burst"), - collect_list(col("total_granted_burst")).alias("total_granted_burst"), - collect_list(col("total_overcommitted_burst")).alias("total_overcommitted_burst"), - collect_list(col("total_interfered_burst")).alias("total_interfered_burst"), - collect_list(col("mean_cpu_usage")).alias("mean_cpu_usage"), - collect_list(col("mean_cpu_demand")).alias("mean_cpu_demand"), - collect_list(col("mean_num_deployed_images")).alias("mean_num_deployed_images"), - collect_list(col("max_num_deployed_images")).alias("max_num_deployed_images"), - collect_list(col("total_power_draw")).alias("total_power_draw"), - collect_list(col("total_failure_slices")).alias("total_failure_slices"), - collect_list(col("total_failure_vm_slices")).alias("total_failure_vm_slices"), - collect_list(col("total_vms_submitted")).alias("total_vms_submitted"), - collect_list(col("total_vms_queued")).alias("total_vms_queued"), - collect_list(col("total_vms_finished")).alias("total_vms_finished"), - collect_list(col("total_vms_failed")).alias("total_vms_failed") - ) - } - - // Spark helper functions - operator fun Column.times(other: Column): Column = `$times`(other) - operator fun Column.div(other: Column): Column = `$div`(other) - operator fun Column.get(other: Column): Column = this.apply(other) - - val sliceLength = 5 * 60 * 1000 - val states = map( - lit("ERROR"), - lit(1), - lit("ACTIVE"), - lit(0), - lit("SHUTOFF"), - lit(0) - ) - val oppositeStates = map( - lit("ERROR"), - lit(0), - lit("ACTIVE"), - lit(1), - lit("SHUTOFF"), - lit(1) - ) -} diff --git a/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt b/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt deleted file mode 100644 index 6ec4995d..00000000 --- a/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt +++ /dev/null @@ -1,93 +0,0 @@ -package com.atlarge.opendc.runner.web - -import com.mongodb.client.MongoCollection -import com.mongodb.client.model.Filters -import com.mongodb.client.model.Updates -import org.bson.Document -import java.time.Instant - -/** - * Manages the queue of scenarios that need to be processed. - */ -class ScenarioManager(private val collection: MongoCollection) { - /** - * Find the next scenario that the simulator needs to process. - */ - fun findNext(): Document? { - return collection - .find(Filters.eq("simulation.state", "QUEUED")) - .first() - } - - /** - * Claim the scenario in the database with the specified id. - */ - fun claim(id: String): Boolean { - val res = collection.findOneAndUpdate( - Filters.and( - Filters.eq("_id", id), - Filters.eq("simulation.state", "QUEUED") - ), - Updates.combine( - Updates.set("simulation.state", "RUNNING"), - Updates.set("simulation.heartbeat", Instant.now()) - ) - ) - return res != null - } - - /** - * Update the heartbeat of the specified scenario. - */ - fun heartbeat(id: String) { - collection.findOneAndUpdate( - Filters.and( - Filters.eq("_id", id), - Filters.eq("simulation.state", "RUNNING") - ), - Updates.set("simulation.heartbeat", Instant.now()) - ) - } - - /** - * Mark the scenario as failed. - */ - fun fail(id: String) { - collection.findOneAndUpdate( - Filters.eq("_id", id), - Updates.combine( - Updates.set("simulation.state", "FAILED"), - Updates.set("simulation.heartbeat", Instant.now()) - ) - ) - } - - /** - * Persist the specified results. - */ - fun finish(id: String, result: ResultProcessor.Result) { - collection.findOneAndUpdate( - Filters.eq("_id", id), - Updates.combine( - Updates.set("simulation.state", "FINISHED"), - Updates.unset("simulation.time"), - Updates.set("results.total_requested_burst", result.totalRequestedBurst), - Updates.set("results.total_granted_burst", result.totalGrantedBurst), - Updates.set("results.total_overcommitted_burst", result.totalOvercommittedBurst), - Updates.set("results.total_interfered_burst", result.totalInterferedBurst), - Updates.set("results.mean_cpu_usage", result.meanCpuUsage), - Updates.set("results.mean_cpu_demand", result.meanCpuDemand), - Updates.set("results.mean_num_deployed_images", result.meanNumDeployedImages), - Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), - Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), - Updates.set("results.total_power_draw", result.totalPowerDraw), - Updates.set("results.total_failure_slices", result.totalFailureSlices), - Updates.set("results.total_failure_vm_slices", result.totalFailureVmSlices), - Updates.set("results.total_vms_submitted", result.totalVmsSubmitted), - Updates.set("results.total_vms_queued", result.totalVmsQueued), - Updates.set("results.total_vms_finished", result.totalVmsFinished), - Updates.set("results.total_vms_failed", result.totalVmsFailed) - ) - ) - } -} diff --git a/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt deleted file mode 100644 index f9b1c6c4..00000000 --- a/simulator/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt +++ /dev/null @@ -1,131 +0,0 @@ -package com.atlarge.opendc.runner.web - -import com.atlarge.opendc.compute.core.MemoryUnit -import com.atlarge.opendc.compute.core.ProcessingNode -import com.atlarge.opendc.compute.core.ProcessingUnit -import com.atlarge.opendc.compute.metal.NODE_CLUSTER -import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver -import com.atlarge.opendc.compute.metal.power.LinearLoadPowerModel -import com.atlarge.opendc.compute.metal.service.ProvisioningService -import com.atlarge.opendc.compute.metal.service.SimpleProvisioningService -import com.atlarge.opendc.core.Environment -import com.atlarge.opendc.core.Platform -import com.atlarge.opendc.core.Zone -import com.atlarge.opendc.core.services.ServiceRegistry -import com.atlarge.opendc.format.environment.EnvironmentReader -import com.mongodb.client.AggregateIterable -import com.mongodb.client.MongoCollection -import com.mongodb.client.model.Aggregates -import com.mongodb.client.model.Field -import com.mongodb.client.model.Filters -import com.mongodb.client.model.Projections -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.launch -import org.bson.Document -import java.time.Clock -import java.util.* - -/** - * A helper class that converts the MongoDB topology into an OpenDC environment. - */ -class TopologyParser(private val collection: MongoCollection, private val id: String) : EnvironmentReader { - /** - * Parse the topology with the specified [id]. - */ - override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { - val nodes = mutableListOf() - val random = Random(0) - - for (machine in fetchMachines(id)) { - val machineId = machine.getString("_id") - val clusterId = machine.getString("rack_id") - val position = machine.getInteger("position") - - val processors = machine.getList("cpus", Document::class.java).flatMap { cpu -> - val cores = cpu.getInteger("numberOfCores") - val speed = cpu.get("clockRateMhz", Number::class.java).toDouble() - // TODO Remove hardcoding of vendor - val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores) - List(cores) { coreId -> - ProcessingUnit(node, coreId, speed) - } - } - val memoryUnits = machine.getList("memories", Document::class.java).map { memory -> - MemoryUnit( - "Samsung", - memory.getString("name"), - memory.get("speedMbPerS", Number::class.java).toDouble(), - memory.get("sizeMb", Number::class.java).toLong() - ) - } - nodes.add( - SimpleBareMetalDriver( - coroutineScope, - clock, - UUID(random.nextLong(), random.nextLong()), - "node-$clusterId-$position", - mapOf(NODE_CLUSTER to clusterId), - processors, - memoryUnits, - // For now we assume a simple linear load model with an idle draw of ~200W and a maximum - // power draw of 350W. - // Source: https://stackoverflow.com/questions/6128960 - LinearLoadPowerModel(200.0, 350.0) - ) - ) - } - - val provisioningService = SimpleProvisioningService() - coroutineScope.launch { - for (node in nodes) { - provisioningService.create(node) - } - } - - val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) - - val platform = Platform( - UUID.randomUUID(), - "opendc-platform", - listOf( - Zone(UUID.randomUUID(), "zone", serviceRegistry) - ) - ) - - return Environment(fetchName(id), null, listOf(platform)) - } - - override fun close() {} - - /** - * Fetch the metadata of the topology. - */ - private fun fetchName(id: String): String { - return collection.aggregate( - listOf( - Aggregates.match(Filters.eq("_id", id)), - Aggregates.project(Projections.include("name")) - ) - ) - .first()!! - .getString("name") - } - - /** - * Fetch a topology from the database with the specified [id]. - */ - private fun fetchMachines(id: String): AggregateIterable { - return collection.aggregate( - listOf( - Aggregates.match(Filters.eq("_id", id)), - Aggregates.project(Projections.fields(Document("racks", "\$rooms.tiles.rack"))), - Aggregates.unwind("\$racks"), - Aggregates.unwind("\$racks"), - Aggregates.replaceRoot("\$racks"), - Aggregates.addFields(Field("machines.rack_id", "\$_id")), - Aggregates.unwind("\$machines"), - Aggregates.replaceRoot("\$machines") - ) - ) - } -} diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt new file mode 100644 index 00000000..69b2e69d --- /dev/null +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -0,0 +1,368 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.runner.web + +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.options.* +import com.github.ajalt.clikt.parameters.types.file +import com.github.ajalt.clikt.parameters.types.int +import com.mongodb.MongoClientSettings +import com.mongodb.MongoCredential +import com.mongodb.ServerAddress +import com.mongodb.client.MongoClients +import com.mongodb.client.MongoCollection +import com.mongodb.client.MongoDatabase +import com.mongodb.client.model.Filters +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.test.TestCoroutineScope +import mu.KotlinLogging +import org.bson.Document +import org.opendc.compute.virt.service.allocation.* +import org.opendc.experiments.sc20.experiment.attachMonitor +import org.opendc.experiments.sc20.experiment.createFailureDomain +import org.opendc.experiments.sc20.experiment.createProvisioner +import org.opendc.experiments.sc20.experiment.model.Workload +import org.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor +import org.opendc.experiments.sc20.experiment.processTrace +import org.opendc.experiments.sc20.trace.Sc20ParquetTraceReader +import org.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader +import org.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader +import org.opendc.simulator.utils.DelayControllerClockAdapter +import java.io.File +import java.util.* +import kotlin.random.Random + +private val logger = KotlinLogging.logger {} + +/** + * Represents the CLI command for starting the OpenDC web runner. + */ +@OptIn(ExperimentalCoroutinesApi::class) +class RunnerCli : CliktCommand(name = "runner") { + /** + * The name of the database to use. + */ + private val mongoDb by option( + "--mongo-db", + help = "name of the database to use", + envvar = "OPENDC_DB" + ) + .default("opendc") + + /** + * The database host to connect to. + */ + private val mongoHost by option( + "--mongo-host", + help = "database host to connect to", + envvar = "OPENDC_DB_HOST" + ) + .default("localhost") + + /** + * The database port to connect to. + */ + private val mongoPort by option( + "--mongo-port", + help = "database port to connect to", + envvar = "OPENDC_DB_PORT" + ) + .int() + .default(27017) + + /** + * The database user to connect with. + */ + private val mongoUser by option( + "--mongo-user", + help = "database user to connect with", + envvar = "OPENDC_DB_USER" + ) + .default("opendc") + + /** + * The database password to connect with. + */ + private val mongoPassword by option( + "--mongo-password", + help = "database password to connect with", + envvar = "OPENDC_DB_PASSWORD" + ) + .convert { it.toCharArray() } + .required() + + /** + * The path to the traces directory. + */ + private val tracePath by option( + "--traces", + help = "path to the directory containing the traces", + envvar = "OPENDC_TRACES" + ) + .file(canBeFile = false) + .defaultLazy { File("traces/") } + + /** + * The path to the output directory. + */ + private val outputPath by option( + "--output", + help = "path to the results directory", + envvar = "OPENDC_OUTPUT" + ) + .file(canBeFile = false) + .defaultLazy { File("results/") } + + /** + * The Spark master to connect to. + */ + private val spark by option( + "--spark", + help = "Spark master to connect to", + envvar = "OPENDC_SPARK" + ) + .default("local[*]") + + /** + * Connect to the user-specified database. + */ + private fun createDatabase(): MongoDatabase { + val credential = MongoCredential.createScramSha1Credential( + mongoUser, + mongoDb, + mongoPassword + ) + + val settings = MongoClientSettings.builder() + .credential(credential) + .applyToClusterSettings { it.hosts(listOf(ServerAddress(mongoHost, mongoPort))) } + .build() + val client = MongoClients.create(settings) + return client.getDatabase(mongoDb) + } + + /** + * Run a single scenario. + */ + private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection) { + val id = scenario.getString("_id") + + logger.info { "Constructing performance interference model" } + + val traceDir = File( + tracePath, + scenario.getEmbedded(listOf("trace", "traceId"), String::class.java) + ) + val traceReader = Sc20RawParquetTraceReader(traceDir) + val performanceInterferenceReader = let { + val path = File(traceDir, "performance-interference-model.json") + val operational = scenario.get("operational", Document::class.java) + val enabled = operational.getBoolean("performanceInterferenceEnabled") + + if (!enabled || !path.exists()) { + return@let null + } + + path.inputStream().use { Sc20PerformanceInterferenceReader(it) } + } + + val targets = portfolio.get("targets", Document::class.java) + + repeat(targets.getInteger("repeatsPerScenario")) { + logger.info { "Starting repeat $it" } + runRepeat(scenario, it, topologies, traceReader, performanceInterferenceReader) + } + + logger.info { "Finished simulation for scenario $id" } + } + + /** + * Run a single repeat. + */ + private suspend fun runRepeat( + scenario: Document, + repeat: Int, + topologies: MongoCollection, + traceReader: Sc20RawParquetTraceReader, + performanceInterferenceReader: Sc20PerformanceInterferenceReader? + ) { + val id = scenario.getString("_id") + val seed = repeat + val traceDocument = scenario.get("trace", Document::class.java) + val workloadName = traceDocument.getString("traceId") + val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() + + val seeder = Random(seed) + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) + + val chan = Channel(Channel.CONFLATED) + + val operational = scenario.get("operational", Document::class.java) + val allocationPolicy = + when (val policyName = operational.getString("schedulerName")) { + "mem" -> AvailableMemoryAllocationPolicy() + "mem-inv" -> AvailableMemoryAllocationPolicy(true) + "core-mem" -> AvailableCoreMemoryAllocationPolicy() + "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) + "active-servers" -> NumberOfActiveServersAllocationPolicy() + "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) + "provisioned-cores" -> ProvisionedCoresAllocationPolicy() + "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) + "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) + else -> throw IllegalArgumentException("Unknown policy $policyName") + } + + val performanceInterferenceModel = performanceInterferenceReader?.construct(seeder) ?: emptyMap() + val trace = Sc20ParquetTraceReader( + listOf(traceReader), + performanceInterferenceModel, + Workload(workloadName, workloadFraction), + seed + ) + val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), String::class.java) + val environment = TopologyParser(topologies, topologyId) + val monitor = ParquetExperimentMonitor( + outputPath, + "scenario_id=$id/run_id=$repeat", + 4096 + ) + + testScope.launch { + val (bareMetalProvisioner, scheduler) = createProvisioner( + this, + clock, + environment, + allocationPolicy + ) + + val failureDomain = if (operational.getBoolean("failuresEnabled")) { + logger.debug("ENABLING failures") + createFailureDomain( + testScope, + clock, + seeder.nextInt(), + operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7, + bareMetalProvisioner, + chan + ) + } else { + null + } + + attachMonitor(this, clock, scheduler, monitor) + processTrace( + this, + clock, + trace, + scheduler, + chan, + monitor, + emptyMap() + ) + + logger.debug("SUBMIT=${scheduler.submittedVms}") + logger.debug("FAIL=${scheduler.unscheduledVms}") + logger.debug("QUEUED=${scheduler.queuedVms}") + logger.debug("RUNNING=${scheduler.runningVms}") + logger.debug("FINISHED=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + } + + try { + testScope.advanceUntilIdle() + } finally { + monitor.close() + } + } + + val POLL_INTERVAL = 5000L // ms = 5 s + val HEARTBEAT_INTERVAL = 60000L // ms = 1 min + + override fun run() = runBlocking(Dispatchers.Default) { + logger.info { "Starting OpenDC web runner" } + logger.info { "Connecting to MongoDB instance" } + val database = createDatabase() + val manager = ScenarioManager(database.getCollection("scenarios")) + val portfolios = database.getCollection("portfolios") + val topologies = database.getCollection("topologies") + + logger.info { "Launching Spark" } + val resultProcessor = ResultProcessor(spark, outputPath) + + logger.info { "Watching for queued scenarios" } + + while (true) { + val scenario = manager.findNext() + + if (scenario == null) { + delay(POLL_INTERVAL) + continue + } + + val id = scenario.getString("_id") + + logger.info { "Found queued scenario $id: attempting to claim" } + + if (!manager.claim(id)) { + logger.info { "Failed to claim scenario" } + continue + } + + coroutineScope { + // Launch heartbeat process + val heartbeat = launch { + while (true) { + delay(HEARTBEAT_INTERVAL) + manager.heartbeat(id) + } + } + + try { + val portfolio = portfolios.find(Filters.eq("_id", scenario.getString("portfolioId"))).first()!! + runScenario(portfolio, scenario, topologies) + + logger.info { "Starting result processing" } + + val result = resultProcessor.process(id) + manager.finish(id, result) + + logger.info { "Successfully finished scenario $id" } + } catch (e: Exception) { + logger.warn(e) { "Scenario failed to finish" } + manager.fail(id) + } finally { + heartbeat.cancel() + } + } + } + } +} + +/** + * Main entry point of the runner. + */ +fun main(args: Array) = RunnerCli().main(args) diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt new file mode 100644 index 00000000..5cae6aa8 --- /dev/null +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt @@ -0,0 +1,215 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.runner.web + +import org.apache.spark.sql.Column +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.Row +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.* +import java.io.File + +/** + * A helper class for processing the experiment results using Apache Spark. + */ +class ResultProcessor(private val master: String, private val outputPath: File) { + /** + * Process the results of the scenario with the given [id]. + */ + fun process(id: String): Result { + val spark = SparkSession.builder() + .master(master) + .appName("opendc-simulator-$id") + .config("spark.driver.bindAddress", "0.0.0.0") // Needed to allow the worker to connect to driver + .orCreate + + try { + val hostMetrics = spark.read().parquet(File(outputPath, "host-metrics/scenario_id=$id").path) + val provisionerMetrics = spark.read().parquet(File(outputPath, "provisioner-metrics/scenario_id=$id").path) + val res = aggregate(hostMetrics, provisionerMetrics).first() + + return Result( + res.getList(1), + res.getList(2), + res.getList(3), + res.getList(4), + res.getList(5), + res.getList(6), + res.getList(7), + res.getList(8), + res.getList(9), + res.getList(10), + res.getList(11), + res.getList(12), + res.getList(13), + res.getList(14), + res.getList(15) + ) + } finally { + spark.close() + } + } + + data class Result( + val totalRequestedBurst: List, + val totalGrantedBurst: List, + val totalOvercommittedBurst: List, + val totalInterferedBurst: List, + val meanCpuUsage: List, + val meanCpuDemand: List, + val meanNumDeployedImages: List, + val maxNumDeployedImages: List, + val totalPowerDraw: List, + val totalFailureSlices: List, + val totalFailureVmSlices: List, + val totalVmsSubmitted: List, + val totalVmsQueued: List, + val totalVmsFinished: List, + val totalVmsFailed: List + ) + + /** + * Perform aggregation of the experiment results. + */ + private fun aggregate(hostMetrics: Dataset, provisionerMetrics: Dataset): Dataset { + // Extrapolate the duration of the entries to span the entire trace + val hostMetricsExtra = hostMetrics + .withColumn("slice_counts", floor(col("duration") / lit(sliceLength))) + .withColumn("power_draw", col("power_draw") * col("slice_counts")) + .withColumn("state_int", states[col("state")]) + .withColumn("state_opposite_int", oppositeStates[col("state")]) + .withColumn("cpu_usage", col("cpu_usage") * col("slice_counts") * col("state_opposite_int")) + .withColumn("cpu_demand", col("cpu_demand") * col("slice_counts")) + .withColumn("failure_slice_count", col("slice_counts") * col("state_int")) + .withColumn("failure_vm_slice_count", col("slice_counts") * col("state_int") * col("vm_count")) + + // Process all data in a single run + val hostMetricsGrouped = hostMetricsExtra.groupBy("run_id") + + // Aggregate the summed total metrics + val systemMetrics = hostMetricsGrouped.agg( + sum("requested_burst").alias("total_requested_burst"), + sum("granted_burst").alias("total_granted_burst"), + sum("overcommissioned_burst").alias("total_overcommitted_burst"), + sum("interfered_burst").alias("total_interfered_burst"), + sum("power_draw").alias("total_power_draw"), + sum("failure_slice_count").alias("total_failure_slices"), + sum("failure_vm_slice_count").alias("total_failure_vm_slices") + ) + + // Aggregate metrics per host + val hvMetrics = hostMetrics + .groupBy("run_id", "host_id") + .agg( + sum("cpu_usage").alias("mean_cpu_usage"), + sum("cpu_demand").alias("mean_cpu_demand"), + avg("vm_count").alias("mean_num_deployed_images"), + count(lit(1)).alias("num_rows") + ) + .withColumn("mean_cpu_usage", col("mean_cpu_usage") / col("num_rows")) + .withColumn("mean_cpu_demand", col("mean_cpu_demand") / col("num_rows")) + .groupBy("run_id") + .agg( + avg("mean_cpu_usage").alias("mean_cpu_usage"), + avg("mean_cpu_demand").alias("mean_cpu_demand"), + avg("mean_num_deployed_images").alias("mean_num_deployed_images"), + max("mean_num_deployed_images").alias("max_num_deployed_images") + ) + + // Group the provisioner metrics per run + val provisionerMetricsGrouped = provisionerMetrics.groupBy("run_id") + + // Aggregate the provisioner metrics + val provisionerMetricsAggregated = provisionerMetricsGrouped.agg( + max("vm_total_count").alias("total_vms_submitted"), + max("vm_waiting_count").alias("total_vms_queued"), + max("vm_active_count").alias("total_vms_running"), + max("vm_inactive_count").alias("total_vms_finished"), + max("vm_failed_count").alias("total_vms_failed") + ) + + // Join the results into a single data frame + return systemMetrics + .join(hvMetrics, "run_id") + .join(provisionerMetricsAggregated, "run_id") + .select( + col("total_requested_burst"), + col("total_granted_burst"), + col("total_overcommitted_burst"), + col("total_interfered_burst"), + col("mean_cpu_usage"), + col("mean_cpu_demand"), + col("mean_num_deployed_images"), + col("max_num_deployed_images"), + col("total_power_draw"), + col("total_failure_slices"), + col("total_failure_vm_slices"), + col("total_vms_submitted"), + col("total_vms_queued"), + col("total_vms_finished"), + col("total_vms_failed") + ) + .groupBy(lit(1)) + .agg( + // TODO Check if order of values is correct + collect_list(col("total_requested_burst")).alias("total_requested_burst"), + collect_list(col("total_granted_burst")).alias("total_granted_burst"), + collect_list(col("total_overcommitted_burst")).alias("total_overcommitted_burst"), + collect_list(col("total_interfered_burst")).alias("total_interfered_burst"), + collect_list(col("mean_cpu_usage")).alias("mean_cpu_usage"), + collect_list(col("mean_cpu_demand")).alias("mean_cpu_demand"), + collect_list(col("mean_num_deployed_images")).alias("mean_num_deployed_images"), + collect_list(col("max_num_deployed_images")).alias("max_num_deployed_images"), + collect_list(col("total_power_draw")).alias("total_power_draw"), + collect_list(col("total_failure_slices")).alias("total_failure_slices"), + collect_list(col("total_failure_vm_slices")).alias("total_failure_vm_slices"), + collect_list(col("total_vms_submitted")).alias("total_vms_submitted"), + collect_list(col("total_vms_queued")).alias("total_vms_queued"), + collect_list(col("total_vms_finished")).alias("total_vms_finished"), + collect_list(col("total_vms_failed")).alias("total_vms_failed") + ) + } + + // Spark helper functions + operator fun Column.times(other: Column): Column = `$times`(other) + operator fun Column.div(other: Column): Column = `$div`(other) + operator fun Column.get(other: Column): Column = this.apply(other) + + val sliceLength = 5 * 60 * 1000 + val states = map( + lit("ERROR"), + lit(1), + lit("ACTIVE"), + lit(0), + lit("SHUTOFF"), + lit(0) + ) + val oppositeStates = map( + lit("ERROR"), + lit(0), + lit("ACTIVE"), + lit(1), + lit("SHUTOFF"), + lit(1) + ) +} diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt new file mode 100644 index 00000000..0c6c8fee --- /dev/null +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.runner.web + +import com.mongodb.client.MongoCollection +import com.mongodb.client.model.Filters +import com.mongodb.client.model.Updates +import org.bson.Document +import java.time.Instant + +/** + * Manages the queue of scenarios that need to be processed. + */ +class ScenarioManager(private val collection: MongoCollection) { + /** + * Find the next scenario that the simulator needs to process. + */ + fun findNext(): Document? { + return collection + .find(Filters.eq("simulation.state", "QUEUED")) + .first() + } + + /** + * Claim the scenario in the database with the specified id. + */ + fun claim(id: String): Boolean { + val res = collection.findOneAndUpdate( + Filters.and( + Filters.eq("_id", id), + Filters.eq("simulation.state", "QUEUED") + ), + Updates.combine( + Updates.set("simulation.state", "RUNNING"), + Updates.set("simulation.heartbeat", Instant.now()) + ) + ) + return res != null + } + + /** + * Update the heartbeat of the specified scenario. + */ + fun heartbeat(id: String) { + collection.findOneAndUpdate( + Filters.and( + Filters.eq("_id", id), + Filters.eq("simulation.state", "RUNNING") + ), + Updates.set("simulation.heartbeat", Instant.now()) + ) + } + + /** + * Mark the scenario as failed. + */ + fun fail(id: String) { + collection.findOneAndUpdate( + Filters.eq("_id", id), + Updates.combine( + Updates.set("simulation.state", "FAILED"), + Updates.set("simulation.heartbeat", Instant.now()) + ) + ) + } + + /** + * Persist the specified results. + */ + fun finish(id: String, result: ResultProcessor.Result) { + collection.findOneAndUpdate( + Filters.eq("_id", id), + Updates.combine( + Updates.set("simulation.state", "FINISHED"), + Updates.unset("simulation.time"), + Updates.set("results.total_requested_burst", result.totalRequestedBurst), + Updates.set("results.total_granted_burst", result.totalGrantedBurst), + Updates.set("results.total_overcommitted_burst", result.totalOvercommittedBurst), + Updates.set("results.total_interfered_burst", result.totalInterferedBurst), + Updates.set("results.mean_cpu_usage", result.meanCpuUsage), + Updates.set("results.mean_cpu_demand", result.meanCpuDemand), + Updates.set("results.mean_num_deployed_images", result.meanNumDeployedImages), + Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), + Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), + Updates.set("results.total_power_draw", result.totalPowerDraw), + Updates.set("results.total_failure_slices", result.totalFailureSlices), + Updates.set("results.total_failure_vm_slices", result.totalFailureVmSlices), + Updates.set("results.total_vms_submitted", result.totalVmsSubmitted), + Updates.set("results.total_vms_queued", result.totalVmsQueued), + Updates.set("results.total_vms_finished", result.totalVmsFinished), + Updates.set("results.total_vms_failed", result.totalVmsFailed) + ) + ) + } +} diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt new file mode 100644 index 00000000..884833cb --- /dev/null +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.runner.web + +import com.mongodb.client.AggregateIterable +import com.mongodb.client.MongoCollection +import com.mongodb.client.model.Aggregates +import com.mongodb.client.model.Field +import com.mongodb.client.model.Filters +import com.mongodb.client.model.Projections +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.launch +import org.bson.Document +import org.opendc.compute.core.MemoryUnit +import org.opendc.compute.core.ProcessingNode +import org.opendc.compute.core.ProcessingUnit +import org.opendc.compute.metal.NODE_CLUSTER +import org.opendc.compute.metal.driver.SimpleBareMetalDriver +import org.opendc.compute.metal.power.LinearLoadPowerModel +import org.opendc.compute.metal.service.ProvisioningService +import org.opendc.compute.metal.service.SimpleProvisioningService +import org.opendc.core.Environment +import org.opendc.core.Platform +import org.opendc.core.Zone +import org.opendc.core.services.ServiceRegistry +import org.opendc.format.environment.EnvironmentReader +import java.time.Clock +import java.util.* + +/** + * A helper class that converts the MongoDB topology into an OpenDC environment. + */ +class TopologyParser(private val collection: MongoCollection, private val id: String) : EnvironmentReader { + /** + * Parse the topology with the specified [id]. + */ + override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { + val nodes = mutableListOf() + val random = Random(0) + + for (machine in fetchMachines(id)) { + val machineId = machine.getString("_id") + val clusterId = machine.getString("rack_id") + val position = machine.getInteger("position") + + val processors = machine.getList("cpus", Document::class.java).flatMap { cpu -> + val cores = cpu.getInteger("numberOfCores") + val speed = cpu.get("clockRateMhz", Number::class.java).toDouble() + // TODO Remove hardcoding of vendor + val node = ProcessingNode("Intel", "amd64", cpu.getString("name"), cores) + List(cores) { coreId -> + ProcessingUnit(node, coreId, speed) + } + } + val memoryUnits = machine.getList("memories", Document::class.java).map { memory -> + MemoryUnit( + "Samsung", + memory.getString("name"), + memory.get("speedMbPerS", Number::class.java).toDouble(), + memory.get("sizeMb", Number::class.java).toLong() + ) + } + nodes.add( + SimpleBareMetalDriver( + coroutineScope, + clock, + UUID(random.nextLong(), random.nextLong()), + "node-$clusterId-$position", + mapOf(NODE_CLUSTER to clusterId), + processors, + memoryUnits, + // For now we assume a simple linear load model with an idle draw of ~200W and a maximum + // power draw of 350W. + // Source: https://stackoverflow.com/questions/6128960 + LinearLoadPowerModel(200.0, 350.0) + ) + ) + } + + val provisioningService = SimpleProvisioningService() + coroutineScope.launch { + for (node in nodes) { + provisioningService.create(node) + } + } + + val serviceRegistry = ServiceRegistry().put(ProvisioningService, provisioningService) + + val platform = Platform( + UUID.randomUUID(), + "opendc-platform", + listOf( + Zone(UUID.randomUUID(), "zone", serviceRegistry) + ) + ) + + return Environment(fetchName(id), null, listOf(platform)) + } + + override fun close() {} + + /** + * Fetch the metadata of the topology. + */ + private fun fetchName(id: String): String { + return collection.aggregate( + listOf( + Aggregates.match(Filters.eq("_id", id)), + Aggregates.project(Projections.include("name")) + ) + ) + .first()!! + .getString("name") + } + + /** + * Fetch a topology from the database with the specified [id]. + */ + private fun fetchMachines(id: String): AggregateIterable { + return collection.aggregate( + listOf( + Aggregates.match(Filters.eq("_id", id)), + Aggregates.project(Projections.fields(Document("racks", "\$rooms.tiles.rack"))), + Aggregates.unwind("\$racks"), + Aggregates.unwind("\$racks"), + Aggregates.replaceRoot("\$racks"), + Aggregates.addFields(Field("machines.rack_id", "\$_id")), + Aggregates.unwind("\$machines"), + Aggregates.replaceRoot("\$machines") + ) + ) + } +} diff --git a/simulator/opendc-runner-web/src/main/resources/log4j2.xml b/simulator/opendc-runner-web/src/main/resources/log4j2.xml index 1d873554..16cedf34 100644 --- a/simulator/opendc-runner-web/src/main/resources/log4j2.xml +++ b/simulator/opendc-runner-web/src/main/resources/log4j2.xml @@ -26,17 +26,14 @@ - + - + - - - - + -- cgit v1.2.3