From 0a895abfe307fbb6a28ceac6a07c5ac4863627fd Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 17 Jul 2020 17:25:45 +0200 Subject: Add data processing pipeline via Spark This change adds support for processing the experimental results by means of a Spark data processing pipeline. --- simulator/.dockerignore | 11 ++ simulator/Dockerfile | 2 +- .../opendc/opendc-runner-web/build.gradle.kts | 8 +- .../kotlin/com/atlarge/opendc/runner/web/Main.kt | 30 +++- .../atlarge/opendc/runner/web/ResultProcessor.kt | 187 +++++++++++++++++++++ .../atlarge/opendc/runner/web/ScenarioManager.kt | 44 +++-- .../src/main/resources/log4j2.xml | 3 + 7 files changed, 264 insertions(+), 21 deletions(-) create mode 100644 simulator/.dockerignore create mode 100644 simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt (limited to 'simulator') diff --git a/simulator/.dockerignore b/simulator/.dockerignore new file mode 100644 index 00000000..bcbdf2b0 --- /dev/null +++ b/simulator/.dockerignore @@ -0,0 +1,11 @@ +# IntelliJ +/out/ +.idea/ +*/out +*.iml +.idea_modules/ + +### Gradle +.gradle +build/ + diff --git a/simulator/Dockerfile b/simulator/Dockerfile index c923cddf..7daa8a2e 100644 --- a/simulator/Dockerfile +++ b/simulator/Dockerfile @@ -15,7 +15,7 @@ USER root WORKDIR $APP_HOME # Build the application -RUN gradle --no-daemon assemble installDist +RUN gradle --no-daemon :opendc:opendc-runner-web:installDist # Fix permissions RUN chown -R gradle:gradle $APP_HOME diff --git a/simulator/opendc/opendc-runner-web/build.gradle.kts b/simulator/opendc/opendc-runner-web/build.gradle.kts index 52a59694..6f725de1 100644 --- a/simulator/opendc/opendc-runner-web/build.gradle.kts +++ b/simulator/opendc/opendc-runner-web/build.gradle.kts @@ -42,8 +42,14 @@ dependencies { implementation("com.github.ajalt:clikt:2.8.0") implementation("io.github.microutils:kotlin-logging:1.7.10") + implementation("org.mongodb:mongodb-driver-sync:4.0.5") + implementation("org.apache.spark:spark-sql_2.12:3.0.0") { + exclude(group = "org.slf4j", module = "slf4j-log4j12") + exclude(group = "log4j") + } - runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1") runtimeOnly(project(":odcsim:odcsim-engine-omega")) + runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1") + runtimeOnly("org.apache.logging.log4j:log4j-1.2-api:2.13.1") } diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt index fe1913f8..86696887 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt @@ -109,11 +109,22 @@ class RunnerCli : CliktCommand(name = "runner") { */ private val outputPath by option( "--output", - help = "path to the results directory" + help = "path to the results directory", + envvar = "OPENDC_OUTPUT" ) .file(canBeFile = false) .defaultLazy { File("results/") } + /** + * The Spark master to connect to. + */ + private val spark by option( + "--spark", + help = "Spark master to connect to", + envvar = "OPENDC_SPARK" + ) + .required() + /** * Connect to the user-specified database. */ @@ -147,7 +158,8 @@ class RunnerCli : CliktCommand(name = "runner") { val traceReader = Sc20RawParquetTraceReader(traceDir) val performanceInterferenceReader = let { val path = File(traceDir, "performance-interference-model.json") - val enabled = scenario.getEmbedded(listOf("operational", "performanceInterferenceEnabled"), Boolean::class.java) + val operational = scenario.get("operational", Document::class.java) + val enabled = operational.getBoolean("performanceInterferenceEnabled") if (!enabled || !path.exists()) { return@let null @@ -163,7 +175,7 @@ class RunnerCli : CliktCommand(name = "runner") { runRepeat(scenario, it, topologies, traceReader, performanceInterferenceReader) } - logger.info { "Finished scenario $id" } + logger.info { "Finished simulation for scenario $id" } } /** @@ -266,13 +278,15 @@ class RunnerCli : CliktCommand(name = "runner") { override fun run() = runBlocking(Dispatchers.Default) { logger.info { "Starting OpenDC web runner" } - logger.info { "Connecting to MongoDB instance" } val database = createDatabase() val manager = ScenarioManager(database.getCollection("scenarios")) val portfolios = database.getCollection("portfolios") val topologies = database.getCollection("topologies") + logger.info { "Loading Spark" } + val resultProcessor = ResultProcessor(spark, outputPath) + logger.info { "Watching for queued scenarios" } while (true) { @@ -302,7 +316,13 @@ class RunnerCli : CliktCommand(name = "runner") { try { val portfolio = portfolios.find(Filters.eq("_id", scenario.getString("portfolioId"))).first()!! runScenario(portfolio, scenario, topologies) - manager.finish(id) + + logger.info { "Starting result processing" } + + val result = resultProcessor.process(id) + manager.finish(id, result) + + logger.info { "Successfully finished scenario $id" } } catch (e: Exception) { logger.warn(e) { "Scenario failed to finish" } manager.fail(id) diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt new file mode 100644 index 00000000..39092653 --- /dev/null +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt @@ -0,0 +1,187 @@ +package com.atlarge.opendc.runner.web + +import java.io.File +import org.apache.spark.sql.Column +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.Row +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.* + +/** + * A helper class for processing the experiment results using Apache Spark. + */ +class ResultProcessor(private val master: String, private val outputPath: File) { + /** + * Process the results of the scenario with the given [id]. + */ + fun process(id: String): Result { + val spark = SparkSession.builder() + .master(master) + .appName("opendc-simulator-$id") + .config("spark.driver.bindAddress", "0.0.0.0") // Needed to allow the worker to connect to driver + .orCreate + + try { + val hostMetrics = spark.read().parquet(File(outputPath, "host-metrics/scenario_id=$id").path) + val provisionerMetrics = spark.read().parquet(File(outputPath, "provisioner-metrics/scenario_id=$id").path) + val res = aggregate(hostMetrics, provisionerMetrics).first() + + return Result( + res.getList(1), + res.getList(2), + res.getList(3), + res.getList(4), + res.getList(5), + res.getList(6), + res.getList(7), + res.getList(8), + res.getList(9), + res.getList(10), + res.getList(11), + res.getList(12), + res.getList(13), + res.getList(14), + res.getList(15) + ) + } finally { + spark.close() + } + } + + data class Result( + val totalRequestedBurst: List, + val totalGrantedBurst: List, + val totalOvercommittedBurst: List, + val totalInterferedBurst: List, + val meanCpuUsage: List, + val meanCpuDemand: List, + val meanNumDeployedImages: List, + val maxNumDeployedImages: List, + val totalPowerDraw: List, + val totalFailureSlices: List, + val totalFailureVmSlices: List, + val totalVmsSubmitted: List, + val totalVmsQueued: List, + val totalVmsFinished: List, + val totalVmsFailed: List + ) + + /** + * Perform aggregation of the experiment results. + */ + private fun aggregate(hostMetrics: Dataset, provisionerMetrics: Dataset): Dataset { + // Extrapolate the duration of the entries to span the entire trace + val hostMetricsExtra = hostMetrics + .withColumn("slice_counts", floor(col("duration") / lit(sliceLength))) + .withColumn("power_draw", col("power_draw") * col("slice_counts")) + .withColumn("state_int", states[col("state")]) + .withColumn("state_opposite_int", oppositeStates[col("state")]) + .withColumn("cpu_usage", col("cpu_usage") * col("slice_counts") * col("state_opposite_int")) + .withColumn("cpu_demand", col("cpu_demand") * col("slice_counts")) + .withColumn("failure_slice_count", col("slice_counts") * col("state_int")) + .withColumn("failure_vm_slice_count", col("slice_counts") * col("state_int") * col("vm_count")) + + // Process all data in a single run + val hostMetricsGrouped = hostMetricsExtra.groupBy("run_id") + + // Aggregate the summed total metrics + val systemMetrics = hostMetricsGrouped.agg( + sum("requested_burst").alias("total_requested_burst"), + sum("granted_burst").alias("total_granted_burst"), + sum("overcommissioned_burst").alias("total_overcommitted_burst"), + sum("interfered_burst").alias("total_interfered_burst"), + sum("power_draw").alias("total_power_draw"), + sum("failure_slice_count").alias("total_failure_slices"), + sum("failure_vm_slice_count").alias("total_failure_vm_slices") + ) + + // Aggregate metrics per host + val hvMetrics = hostMetrics + .groupBy("run_id", "host_id") + .agg( + sum("cpu_usage").alias("mean_cpu_usage"), + sum("cpu_demand").alias("mean_cpu_demand"), + avg("vm_count").alias("mean_num_deployed_images"), + count(lit(1)).alias("num_rows") + ) + .withColumn("mean_cpu_usage", col("mean_cpu_usage") / col("num_rows")) + .withColumn("mean_cpu_demand", col("mean_cpu_demand") / col("num_rows")) + .groupBy("run_id") + .agg( + avg("mean_cpu_usage").alias("mean_cpu_usage"), + avg("mean_cpu_demand").alias("mean_cpu_demand"), + avg("mean_num_deployed_images").alias("mean_num_deployed_images"), + max("mean_num_deployed_images").alias("max_num_deployed_images") + ) + + // Group the provisioner metrics per run + val provisionerMetricsGrouped = provisionerMetrics.groupBy("run_id") + + // Aggregate the provisioner metrics + val provisionerMetricsAggregated = provisionerMetricsGrouped.agg( + max("vm_total_count").alias("total_vms_submitted"), + max("vm_waiting_count").alias("total_vms_queued"), + max("vm_active_count").alias("total_vms_running"), + max("vm_inactive_count").alias("total_vms_finished"), + max("vm_failed_count").alias("total_vms_failed") + ) + + // Join the results into a single data frame + return systemMetrics + .join(hvMetrics, "run_id") + .join(provisionerMetricsAggregated, "run_id") + .select( + col("total_requested_burst"), + col("total_granted_burst"), + col("total_overcommitted_burst"), + col("total_interfered_burst"), + col("mean_cpu_usage"), + col("mean_cpu_demand"), + col("mean_num_deployed_images"), + col("max_num_deployed_images"), + col("total_power_draw"), + col("total_failure_slices"), + col("total_failure_vm_slices"), + col("total_vms_submitted"), + col("total_vms_queued"), + col("total_vms_finished"), + col("total_vms_failed") + ) + .groupBy(lit(1)) + .agg( + // TODO Check if order of values is correct + collect_list(col("total_requested_burst")).alias("total_requested_burst"), + collect_list(col("total_granted_burst")).alias("total_granted_burst"), + collect_list(col("total_overcommitted_burst")).alias("total_overcommitted_burst"), + collect_list(col("total_interfered_burst")).alias("total_interfered_burst"), + collect_list(col("mean_cpu_usage")).alias("mean_cpu_usage"), + collect_list(col("mean_cpu_demand")).alias("mean_cpu_demand"), + collect_list(col("mean_num_deployed_images")).alias("mean_num_deployed_images"), + collect_list(col("max_num_deployed_images")).alias("max_num_deployed_images"), + collect_list(col("total_power_draw")).alias("total_power_draw"), + collect_list(col("total_failure_slices")).alias("total_failure_slices"), + collect_list(col("total_failure_vm_slices")).alias("total_failure_vm_slices"), + collect_list(col("total_vms_submitted")).alias("total_vms_submitted"), + collect_list(col("total_vms_queued")).alias("total_vms_queued"), + collect_list(col("total_vms_finished")).alias("total_vms_finished"), + collect_list(col("total_vms_failed")).alias("total_vms_failed") + ) + } + + // Spark helper functions + operator fun Column.times(other: Column): Column = `$times`(other) + operator fun Column.div(other: Column): Column = `$div`(other) + operator fun Column.get(other: Column): Column = this.apply(other) + + val sliceLength = 5 * 60 * 1000 + val states = map( + lit("ERROR"), lit(1), + lit("ACTIVE"), lit(0), + lit("SHUTOFF"), lit(0) + ) + val oppositeStates = map( + lit("ERROR"), lit(0), + lit("ACTIVE"), lit(1), + lit("SHUTOFF"), lit(1) + ) +} diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt index 0f375385..40ffd282 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt @@ -30,7 +30,7 @@ class ScenarioManager(private val collection: MongoCollection) { ), Updates.combine( Updates.set("simulation.state", "RUNNING"), - Updates.set("simulation.time", Instant.now()) + Updates.set("simulation.heartbeat", Instant.now()) ) ) return res != null @@ -45,7 +45,7 @@ class ScenarioManager(private val collection: MongoCollection) { Filters.eq("_id", id), Filters.eq("simulation.state", "RUNNING") ), - Updates.set("simulation.time", Instant.now()) + Updates.set("simulation.heartbeat", Instant.now()) ) } @@ -54,24 +54,40 @@ class ScenarioManager(private val collection: MongoCollection) { */ fun fail(id: String) { collection.findOneAndUpdate( - Filters.and( - Filters.eq("_id", id), - Filters.eq("simulation.state", "FAILED") - ), - Updates.set("simulation.time", Instant.now()) + Filters.eq("_id", id), + Updates.combine( + Updates.set("simulation.state", "FAILED"), + Updates.set("simulation.heartbeat", Instant.now()) + ) ) } /** - * Mark the scenario as finished. + * Persist the specified results. */ - fun finish(id: String) { + fun finish(id: String, result: ResultProcessor.Result) { collection.findOneAndUpdate( - Filters.and( - Filters.eq("_id", id), - Filters.eq("simulation.state", "FINISHED") - ), - Updates.set("simulation.time", Instant.now()) + Filters.eq("_id", id), + Updates.combine( + Updates.set("simulation.state", "FINISHED"), + Updates.unset("simulation.time"), + Updates.set("results.total_requested_burst", result.totalRequestedBurst), + Updates.set("results.total_granted_burst", result.totalGrantedBurst), + Updates.set("results.total_overcommitted_burst", result.totalOvercommittedBurst), + Updates.set("results.total_interfered_burst", result.totalInterferedBurst), + Updates.set("results.mean_cpu_usage", result.meanCpuUsage), + Updates.set("results.mean_cpu_demand", result.meanCpuDemand), + Updates.set("results.mean_num_deployed_images", result.meanNumDeployedImages), + Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), + Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), + Updates.set("results.total_power_draw", result.totalPowerDraw), + Updates.set("results.total_failure_slices", result.totalFailureSlices), + Updates.set("results.total_failure_vm_slices", result.totalFailureVmSlices), + Updates.set("results.total_vms_submitted", result.totalVmsSubmitted), + Updates.set("results.total_vms_queued", result.totalVmsQueued), + Updates.set("results.total_vms_finished", result.totalVmsFinished), + Updates.set("results.total_vms_failed", result.totalVmsFailed) + ) ) } } diff --git a/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml b/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml index b5a2bbb5..1d873554 100644 --- a/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml +++ b/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml @@ -42,6 +42,9 @@ + + + -- cgit v1.2.3