summaryrefslogtreecommitdiff
path: root/simulator/opendc/opendc-runner-web/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-07-17 17:25:45 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-08-24 19:48:08 +0200
commit0a895abfe307fbb6a28ceac6a07c5ac4863627fd (patch)
tree82e0d4f9810c16cf4b1fde247545dde9778ce6d1 /simulator/opendc/opendc-runner-web/src
parent9f85e80e40a663e3ebaf46a16f27332c4b7f0b53 (diff)
Add data processing pipeline via Spark
This change adds support for processing the experimental results by means of a Spark data processing pipeline.
Diffstat (limited to 'simulator/opendc/opendc-runner-web/src')
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt30
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt187
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt44
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml3
4 files changed, 245 insertions, 19 deletions
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
index fe1913f8..86696887 100644
--- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
@@ -109,12 +109,23 @@ class RunnerCli : CliktCommand(name = "runner") {
*/
private val outputPath by option(
"--output",
- help = "path to the results directory"
+ help = "path to the results directory",
+ envvar = "OPENDC_OUTPUT"
)
.file(canBeFile = false)
.defaultLazy { File("results/") }
/**
+ * The Spark master to connect to.
+ */
+ private val spark by option(
+ "--spark",
+ help = "Spark master to connect to",
+ envvar = "OPENDC_SPARK"
+ )
+ .required()
+
+ /**
* Connect to the user-specified database.
*/
private fun createDatabase(): MongoDatabase {
@@ -147,7 +158,8 @@ class RunnerCli : CliktCommand(name = "runner") {
val traceReader = Sc20RawParquetTraceReader(traceDir)
val performanceInterferenceReader = let {
val path = File(traceDir, "performance-interference-model.json")
- val enabled = scenario.getEmbedded(listOf("operational", "performanceInterferenceEnabled"), Boolean::class.java)
+ val operational = scenario.get("operational", Document::class.java)
+ val enabled = operational.getBoolean("performanceInterferenceEnabled")
if (!enabled || !path.exists()) {
return@let null
@@ -163,7 +175,7 @@ class RunnerCli : CliktCommand(name = "runner") {
runRepeat(scenario, it, topologies, traceReader, performanceInterferenceReader)
}
- logger.info { "Finished scenario $id" }
+ logger.info { "Finished simulation for scenario $id" }
}
/**
@@ -266,13 +278,15 @@ class RunnerCli : CliktCommand(name = "runner") {
override fun run() = runBlocking(Dispatchers.Default) {
logger.info { "Starting OpenDC web runner" }
-
logger.info { "Connecting to MongoDB instance" }
val database = createDatabase()
val manager = ScenarioManager(database.getCollection("scenarios"))
val portfolios = database.getCollection("portfolios")
val topologies = database.getCollection("topologies")
+ logger.info { "Loading Spark" }
+ val resultProcessor = ResultProcessor(spark, outputPath)
+
logger.info { "Watching for queued scenarios" }
while (true) {
@@ -302,7 +316,13 @@ class RunnerCli : CliktCommand(name = "runner") {
try {
val portfolio = portfolios.find(Filters.eq("_id", scenario.getString("portfolioId"))).first()!!
runScenario(portfolio, scenario, topologies)
- manager.finish(id)
+
+ logger.info { "Starting result processing" }
+
+ val result = resultProcessor.process(id)
+ manager.finish(id, result)
+
+ logger.info { "Successfully finished scenario $id" }
} catch (e: Exception) {
logger.warn(e) { "Scenario failed to finish" }
manager.fail(id)
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt
new file mode 100644
index 00000000..39092653
--- /dev/null
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ResultProcessor.kt
@@ -0,0 +1,187 @@
+package com.atlarge.opendc.runner.web
+
+import java.io.File
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions.*
+
+/**
+ * A helper class for processing the experiment results using Apache Spark.
+ */
+class ResultProcessor(private val master: String, private val outputPath: File) {
+ /**
+ * Process the results of the scenario with the given [id].
+ */
+ fun process(id: String): Result {
+ val spark = SparkSession.builder()
+ .master(master)
+ .appName("opendc-simulator-$id")
+ .config("spark.driver.bindAddress", "0.0.0.0") // Needed to allow the worker to connect to driver
+ .orCreate
+
+ try {
+ val hostMetrics = spark.read().parquet(File(outputPath, "host-metrics/scenario_id=$id").path)
+ val provisionerMetrics = spark.read().parquet(File(outputPath, "provisioner-metrics/scenario_id=$id").path)
+ val res = aggregate(hostMetrics, provisionerMetrics).first()
+
+ return Result(
+ res.getList<Long>(1),
+ res.getList<Long>(2),
+ res.getList<Long>(3),
+ res.getList<Long>(4),
+ res.getList<Double>(5),
+ res.getList<Double>(6),
+ res.getList<Double>(7),
+ res.getList<Int>(8),
+ res.getList<Long>(9),
+ res.getList<Long>(10),
+ res.getList<Long>(11),
+ res.getList<Int>(12),
+ res.getList<Int>(13),
+ res.getList<Int>(14),
+ res.getList<Int>(15)
+ )
+ } finally {
+ spark.close()
+ }
+ }
+
+ data class Result(
+ val totalRequestedBurst: List<Long>,
+ val totalGrantedBurst: List<Long>,
+ val totalOvercommittedBurst: List<Long>,
+ val totalInterferedBurst: List<Long>,
+ val meanCpuUsage: List<Double>,
+ val meanCpuDemand: List<Double>,
+ val meanNumDeployedImages: List<Double>,
+ val maxNumDeployedImages: List<Int>,
+ val totalPowerDraw: List<Long>,
+ val totalFailureSlices: List<Long>,
+ val totalFailureVmSlices: List<Long>,
+ val totalVmsSubmitted: List<Int>,
+ val totalVmsQueued: List<Int>,
+ val totalVmsFinished: List<Int>,
+ val totalVmsFailed: List<Int>
+ )
+
+ /**
+ * Perform aggregation of the experiment results.
+ */
+ private fun aggregate(hostMetrics: Dataset<Row>, provisionerMetrics: Dataset<Row>): Dataset<Row> {
+ // Extrapolate the duration of the entries to span the entire trace
+ val hostMetricsExtra = hostMetrics
+ .withColumn("slice_counts", floor(col("duration") / lit(sliceLength)))
+ .withColumn("power_draw", col("power_draw") * col("slice_counts"))
+ .withColumn("state_int", states[col("state")])
+ .withColumn("state_opposite_int", oppositeStates[col("state")])
+ .withColumn("cpu_usage", col("cpu_usage") * col("slice_counts") * col("state_opposite_int"))
+ .withColumn("cpu_demand", col("cpu_demand") * col("slice_counts"))
+ .withColumn("failure_slice_count", col("slice_counts") * col("state_int"))
+ .withColumn("failure_vm_slice_count", col("slice_counts") * col("state_int") * col("vm_count"))
+
+ // Process all data in a single run
+ val hostMetricsGrouped = hostMetricsExtra.groupBy("run_id")
+
+ // Aggregate the summed total metrics
+ val systemMetrics = hostMetricsGrouped.agg(
+ sum("requested_burst").alias("total_requested_burst"),
+ sum("granted_burst").alias("total_granted_burst"),
+ sum("overcommissioned_burst").alias("total_overcommitted_burst"),
+ sum("interfered_burst").alias("total_interfered_burst"),
+ sum("power_draw").alias("total_power_draw"),
+ sum("failure_slice_count").alias("total_failure_slices"),
+ sum("failure_vm_slice_count").alias("total_failure_vm_slices")
+ )
+
+ // Aggregate metrics per host
+ val hvMetrics = hostMetrics
+ .groupBy("run_id", "host_id")
+ .agg(
+ sum("cpu_usage").alias("mean_cpu_usage"),
+ sum("cpu_demand").alias("mean_cpu_demand"),
+ avg("vm_count").alias("mean_num_deployed_images"),
+ count(lit(1)).alias("num_rows")
+ )
+ .withColumn("mean_cpu_usage", col("mean_cpu_usage") / col("num_rows"))
+ .withColumn("mean_cpu_demand", col("mean_cpu_demand") / col("num_rows"))
+ .groupBy("run_id")
+ .agg(
+ avg("mean_cpu_usage").alias("mean_cpu_usage"),
+ avg("mean_cpu_demand").alias("mean_cpu_demand"),
+ avg("mean_num_deployed_images").alias("mean_num_deployed_images"),
+ max("mean_num_deployed_images").alias("max_num_deployed_images")
+ )
+
+ // Group the provisioner metrics per run
+ val provisionerMetricsGrouped = provisionerMetrics.groupBy("run_id")
+
+ // Aggregate the provisioner metrics
+ val provisionerMetricsAggregated = provisionerMetricsGrouped.agg(
+ max("vm_total_count").alias("total_vms_submitted"),
+ max("vm_waiting_count").alias("total_vms_queued"),
+ max("vm_active_count").alias("total_vms_running"),
+ max("vm_inactive_count").alias("total_vms_finished"),
+ max("vm_failed_count").alias("total_vms_failed")
+ )
+
+ // Join the results into a single data frame
+ return systemMetrics
+ .join(hvMetrics, "run_id")
+ .join(provisionerMetricsAggregated, "run_id")
+ .select(
+ col("total_requested_burst"),
+ col("total_granted_burst"),
+ col("total_overcommitted_burst"),
+ col("total_interfered_burst"),
+ col("mean_cpu_usage"),
+ col("mean_cpu_demand"),
+ col("mean_num_deployed_images"),
+ col("max_num_deployed_images"),
+ col("total_power_draw"),
+ col("total_failure_slices"),
+ col("total_failure_vm_slices"),
+ col("total_vms_submitted"),
+ col("total_vms_queued"),
+ col("total_vms_finished"),
+ col("total_vms_failed")
+ )
+ .groupBy(lit(1))
+ .agg(
+ // TODO Check if order of values is correct
+ collect_list(col("total_requested_burst")).alias("total_requested_burst"),
+ collect_list(col("total_granted_burst")).alias("total_granted_burst"),
+ collect_list(col("total_overcommitted_burst")).alias("total_overcommitted_burst"),
+ collect_list(col("total_interfered_burst")).alias("total_interfered_burst"),
+ collect_list(col("mean_cpu_usage")).alias("mean_cpu_usage"),
+ collect_list(col("mean_cpu_demand")).alias("mean_cpu_demand"),
+ collect_list(col("mean_num_deployed_images")).alias("mean_num_deployed_images"),
+ collect_list(col("max_num_deployed_images")).alias("max_num_deployed_images"),
+ collect_list(col("total_power_draw")).alias("total_power_draw"),
+ collect_list(col("total_failure_slices")).alias("total_failure_slices"),
+ collect_list(col("total_failure_vm_slices")).alias("total_failure_vm_slices"),
+ collect_list(col("total_vms_submitted")).alias("total_vms_submitted"),
+ collect_list(col("total_vms_queued")).alias("total_vms_queued"),
+ collect_list(col("total_vms_finished")).alias("total_vms_finished"),
+ collect_list(col("total_vms_failed")).alias("total_vms_failed")
+ )
+ }
+
+ // Spark helper functions
+ operator fun Column.times(other: Column): Column = `$times`(other)
+ operator fun Column.div(other: Column): Column = `$div`(other)
+ operator fun Column.get(other: Column): Column = this.apply(other)
+
+ val sliceLength = 5 * 60 * 1000
+ val states = map(
+ lit("ERROR"), lit(1),
+ lit("ACTIVE"), lit(0),
+ lit("SHUTOFF"), lit(0)
+ )
+ val oppositeStates = map(
+ lit("ERROR"), lit(0),
+ lit("ACTIVE"), lit(1),
+ lit("SHUTOFF"), lit(1)
+ )
+}
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt
index 0f375385..40ffd282 100644
--- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/ScenarioManager.kt
@@ -30,7 +30,7 @@ class ScenarioManager(private val collection: MongoCollection<Document>) {
),
Updates.combine(
Updates.set("simulation.state", "RUNNING"),
- Updates.set("simulation.time", Instant.now())
+ Updates.set("simulation.heartbeat", Instant.now())
)
)
return res != null
@@ -45,7 +45,7 @@ class ScenarioManager(private val collection: MongoCollection<Document>) {
Filters.eq("_id", id),
Filters.eq("simulation.state", "RUNNING")
),
- Updates.set("simulation.time", Instant.now())
+ Updates.set("simulation.heartbeat", Instant.now())
)
}
@@ -54,24 +54,40 @@ class ScenarioManager(private val collection: MongoCollection<Document>) {
*/
fun fail(id: String) {
collection.findOneAndUpdate(
- Filters.and(
- Filters.eq("_id", id),
- Filters.eq("simulation.state", "FAILED")
- ),
- Updates.set("simulation.time", Instant.now())
+ Filters.eq("_id", id),
+ Updates.combine(
+ Updates.set("simulation.state", "FAILED"),
+ Updates.set("simulation.heartbeat", Instant.now())
+ )
)
}
/**
- * Mark the scenario as finished.
+ * Persist the specified results.
*/
- fun finish(id: String) {
+ fun finish(id: String, result: ResultProcessor.Result) {
collection.findOneAndUpdate(
- Filters.and(
- Filters.eq("_id", id),
- Filters.eq("simulation.state", "FINISHED")
- ),
- Updates.set("simulation.time", Instant.now())
+ Filters.eq("_id", id),
+ Updates.combine(
+ Updates.set("simulation.state", "FINISHED"),
+ Updates.unset("simulation.time"),
+ Updates.set("results.total_requested_burst", result.totalRequestedBurst),
+ Updates.set("results.total_granted_burst", result.totalGrantedBurst),
+ Updates.set("results.total_overcommitted_burst", result.totalOvercommittedBurst),
+ Updates.set("results.total_interfered_burst", result.totalInterferedBurst),
+ Updates.set("results.mean_cpu_usage", result.meanCpuUsage),
+ Updates.set("results.mean_cpu_demand", result.meanCpuDemand),
+ Updates.set("results.mean_num_deployed_images", result.meanNumDeployedImages),
+ Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages),
+ Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages),
+ Updates.set("results.total_power_draw", result.totalPowerDraw),
+ Updates.set("results.total_failure_slices", result.totalFailureSlices),
+ Updates.set("results.total_failure_vm_slices", result.totalFailureVmSlices),
+ Updates.set("results.total_vms_submitted", result.totalVmsSubmitted),
+ Updates.set("results.total_vms_queued", result.totalVmsQueued),
+ Updates.set("results.total_vms_finished", result.totalVmsFinished),
+ Updates.set("results.total_vms_failed", result.totalVmsFailed)
+ )
)
}
}
diff --git a/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml b/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml
index b5a2bbb5..1d873554 100644
--- a/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml
+++ b/simulator/opendc/opendc-runner-web/src/main/resources/log4j2.xml
@@ -42,6 +42,9 @@
<Logger name="org.apache.hadoop" level="warn" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
+ <Logger name="org.apache.spark" level="info" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
<Root level="error">
<AppenderRef ref="Console"/>
</Root>