From 548e9c0f1c87afe92579d518c31b81ef29c3ab33 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 29 Oct 2020 16:45:21 +0100 Subject: Add support for ObjectId in simulator --- .../src/main/kotlin/org/opendc/runner/web/Main.kt | 11 ++++++----- .../src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt | 3 ++- .../src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt | 9 +++++---- .../src/main/kotlin/org/opendc/runner/web/TopologyParser.kt | 9 +++++---- 4 files changed, 18 insertions(+), 14 deletions(-) (limited to 'simulator') diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 26577ef2..d21d000b 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -38,6 +38,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.test.TestCoroutineScope import mu.KotlinLogging import org.bson.Document +import org.bson.types.ObjectId import org.opendc.compute.simulator.allocation.* import org.opendc.experiments.sc20.experiment.attachMonitor import org.opendc.experiments.sc20.experiment.createFailureDomain @@ -165,7 +166,7 @@ public class RunnerCli : CliktCommand(name = "runner") { * Run a single scenario. */ private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection) { - val id = scenario.getString("_id") + val id = scenario.getObjectId("_id") logger.info { "Constructing performance interference model" } @@ -206,7 +207,7 @@ public class RunnerCli : CliktCommand(name = "runner") { traceReader: Sc20RawParquetTraceReader, performanceInterferenceReader: Sc20PerformanceInterferenceReader? ) { - val id = scenario.getString("_id") + val id = scenario.getObjectId("_id") val seed = repeat val traceDocument = scenario.get("trace", Document::class.java) val workloadName = traceDocument.getString("traceId") @@ -240,7 +241,7 @@ public class RunnerCli : CliktCommand(name = "runner") { Workload(workloadName, workloadFraction), seed ) - val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), String::class.java) + val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) val environment = TopologyParser(topologies, topologyId) val monitor = ParquetExperimentMonitor( outputPath, @@ -321,7 +322,7 @@ public class RunnerCli : CliktCommand(name = "runner") { continue } - val id = scenario.getString("_id") + val id = scenario.getObjectId("_id") logger.info { "Found queued scenario $id: attempting to claim" } @@ -340,7 +341,7 @@ public class RunnerCli : CliktCommand(name = "runner") { } try { - val portfolio = portfolios.find(Filters.eq("_id", scenario.getString("portfolioId"))).first()!! + val portfolio = portfolios.find(Filters.eq("_id", scenario.getObjectId("portfolioId"))).first()!! runScenario(portfolio, scenario, topologies) logger.info { "Starting result processing" } diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt index f9a3cd2b..979e5d3c 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt @@ -27,6 +27,7 @@ import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.* +import org.bson.types.ObjectId import java.io.File /** @@ -36,7 +37,7 @@ public class ResultProcessor(private val master: String, private val outputPath: /** * Process the results of the scenario with the given [id]. */ - public fun process(id: String): Result { + public fun process(id: ObjectId): Result { val spark = SparkSession.builder() .master(master) .appName("opendc-simulator-$id") diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt index 504fccdc..14f66757 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt @@ -26,6 +26,7 @@ import com.mongodb.client.MongoCollection import com.mongodb.client.model.Filters import com.mongodb.client.model.Updates import org.bson.Document +import org.bson.types.ObjectId import java.time.Instant /** @@ -44,7 +45,7 @@ public class ScenarioManager(private val collection: MongoCollection) /** * Claim the scenario in the database with the specified id. */ - public fun claim(id: String): Boolean { + public fun claim(id: ObjectId): Boolean { val res = collection.findOneAndUpdate( Filters.and( Filters.eq("_id", id), @@ -61,7 +62,7 @@ public class ScenarioManager(private val collection: MongoCollection) /** * Update the heartbeat of the specified scenario. */ - public fun heartbeat(id: String) { + public fun heartbeat(id: ObjectId) { collection.findOneAndUpdate( Filters.and( Filters.eq("_id", id), @@ -74,7 +75,7 @@ public class ScenarioManager(private val collection: MongoCollection) /** * Mark the scenario as failed. */ - public fun fail(id: String) { + public fun fail(id: ObjectId) { collection.findOneAndUpdate( Filters.eq("_id", id), Updates.combine( @@ -87,7 +88,7 @@ public class ScenarioManager(private val collection: MongoCollection) /** * Persist the specified results. */ - public fun finish(id: String, result: ResultProcessor.Result) { + public fun finish(id: ObjectId, result: ResultProcessor.Result) { collection.findOneAndUpdate( Filters.eq("_id", id), Updates.combine( diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt index 5e483271..0d1e96e6 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt @@ -31,6 +31,7 @@ import com.mongodb.client.model.Projections import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import org.bson.Document +import org.bson.types.ObjectId import org.opendc.compute.core.metal.NODE_CLUSTER import org.opendc.compute.core.metal.service.ProvisioningService import org.opendc.compute.core.metal.service.SimpleProvisioningService @@ -51,7 +52,7 @@ import java.util.* /** * A helper class that converts the MongoDB topology into an OpenDC environment. */ -public class TopologyParser(private val collection: MongoCollection, private val id: String) : EnvironmentReader { +public class TopologyParser(private val collection: MongoCollection, private val id: ObjectId) : EnvironmentReader { /** * Parse the topology with the specified [id]. */ @@ -60,7 +61,7 @@ public class TopologyParser(private val collection: MongoCollection, p val random = Random(0) for (machine in fetchMachines(id)) { - val clusterId = machine.getString("rack_id") + val clusterId = machine.getObjectId("rack_id") val position = machine.getInteger("position") val processors = machine.getList("cpus", Document::class.java).flatMap { cpu -> @@ -121,7 +122,7 @@ public class TopologyParser(private val collection: MongoCollection, p /** * Fetch the metadata of the topology. */ - private fun fetchName(id: String): String { + private fun fetchName(id: ObjectId): String { return collection.aggregate( listOf( Aggregates.match(Filters.eq("_id", id)), @@ -135,7 +136,7 @@ public class TopologyParser(private val collection: MongoCollection, p /** * Fetch a topology from the database with the specified [id]. */ - private fun fetchMachines(id: String): AggregateIterable { + private fun fetchMachines(id: ObjectId): AggregateIterable { return collection.aggregate( listOf( Aggregates.match(Filters.eq("_id", id)), -- cgit v1.2.3 From 1c3ac36f999c4e2d4573aebd4343c097fff2b637 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 29 Oct 2020 16:49:12 +0100 Subject: Update to Gradle 6.7 --- simulator/gradle/wrapper/gradle-wrapper.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'simulator') diff --git a/simulator/gradle/wrapper/gradle-wrapper.properties b/simulator/gradle/wrapper/gradle-wrapper.properties index 12d38de6..be52383e 100644 --- a/simulator/gradle/wrapper/gradle-wrapper.properties +++ b/simulator/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.6.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-6.7-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -- cgit v1.2.3 From bb62b80585b2453529f45b981617e8321b8ded2c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 29 Oct 2020 17:02:18 +0100 Subject: Add support for Java 15 build --- simulator/.dockerignore | 2 +- simulator/Dockerfile | 20 +++----------------- .../main/kotlin/kotlin-library-convention.gradle.kts | 4 ++++ 3 files changed, 8 insertions(+), 18 deletions(-) (limited to 'simulator') diff --git a/simulator/.dockerignore b/simulator/.dockerignore index 816d338c..8cbb9e5e 100644 --- a/simulator/.dockerignore +++ b/simulator/.dockerignore @@ -6,5 +6,5 @@ .idea_modules/ .gradle -**/build/ +**/build diff --git a/simulator/Dockerfile b/simulator/Dockerfile index 880af95d..34280a17 100644 --- a/simulator/Dockerfile +++ b/simulator/Dockerfile @@ -1,31 +1,17 @@ -FROM openjdk:14-slim AS staging +FROM openjdk:15-slim MAINTAINER OpenDC Maintainers -# Build staging artifacts for dependency caching -COPY ./ /app -WORKDIR /app -RUN mkdir /staging \ - && cp -r buildSrc/ /staging \ - && cp gradle.properties /staging 2>/dev/null | true \ - && find -name "*.gradle.kts" | xargs cp --parents -t /staging - -FROM openjdk:14-slim AS builder - # Obtain (cache) Gradle wrapper COPY gradlew /app/ COPY gradle /app/gradle WORKDIR /app RUN ./gradlew --version -# Install (cache) project dependencies only -COPY --from=staging /staging/ /app/ -RUN ./gradlew clean build --no-daemon > /dev/null 2>&1 || true - # Build project COPY ./ /app/ RUN ./gradlew --no-daemon :opendc-runner-web:installDist -FROM openjdk:14-slim -COPY --from=builder /app/opendc-runner-web/build/install /app +FROM openjdk:15-slim +COPY --from=0 /app/opendc-runner-web/build/install /app WORKDIR /app CMD opendc-runner-web/bin/opendc-runner-web diff --git a/simulator/buildSrc/src/main/kotlin/kotlin-library-convention.gradle.kts b/simulator/buildSrc/src/main/kotlin/kotlin-library-convention.gradle.kts index 452db573..bbecf346 100644 --- a/simulator/buildSrc/src/main/kotlin/kotlin-library-convention.gradle.kts +++ b/simulator/buildSrc/src/main/kotlin/kotlin-library-convention.gradle.kts @@ -44,6 +44,10 @@ kotlin { explicitApi() } +jacoco { + toolVersion = "0.8.6" +} + tasks.withType().configureEach { kotlinOptions.jvmTarget = "1.8" kotlinOptions.freeCompilerArgs += "-Xopt-in=kotlin.RequiresOptIn" -- cgit v1.2.3 From 40fcd37fb5eb8ea33b7dea95d45cc73018ea22b8 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 29 Oct 2020 21:16:13 +0100 Subject: Support non-ObjectIds and empty traces --- .../src/main/kotlin/org/opendc/runner/web/Main.kt | 1 + .../org/opendc/runner/web/ResultProcessor.kt | 57 +++++++++++++++------- .../kotlin/org/opendc/runner/web/TopologyParser.kt | 2 +- .../src/main/resources/log4j2.xml | 4 +- 4 files changed, 43 insertions(+), 21 deletions(-) (limited to 'simulator') diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index d21d000b..0b96db3d 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -293,6 +293,7 @@ public class RunnerCli : CliktCommand(name = "runner") { try { testScope.advanceUntilIdle() + testScope.uncaughtExceptions.forEach { it.printStackTrace() } } finally { monitor.close() } diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt index 979e5d3c..7f122cf7 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt @@ -47,25 +47,46 @@ public class ResultProcessor(private val master: String, private val outputPath: try { val hostMetrics = spark.read().parquet(File(outputPath, "host-metrics/scenario_id=$id").path) val provisionerMetrics = spark.read().parquet(File(outputPath, "provisioner-metrics/scenario_id=$id").path) - val res = aggregate(hostMetrics, provisionerMetrics).first() + val res = aggregate(hostMetrics, provisionerMetrics).collectAsList() - return Result( - res.getList(1), - res.getList(2), - res.getList(3), - res.getList(4), - res.getList(5), - res.getList(6), - res.getList(7), - res.getList(8), - res.getList(9), - res.getList(10), - res.getList(11), - res.getList(12), - res.getList(13), - res.getList(14), - res.getList(15) - ) + if (res.isEmpty()) { + return Result( + emptyList(), + emptyList(), + emptyList(), + emptyList(), + emptyList(), + emptyList(), + emptyList(), + emptyList(), + emptyList(), + emptyList(), + emptyList(), + emptyList(), + emptyList(), + emptyList(), + emptyList() + ) + } else { + val head = res.first() + return Result( + head.getList(1), + head.getList(2), + head.getList(3), + head.getList(4), + head.getList(5), + head.getList(6), + head.getList(7), + head.getList(8), + head.getList(9), + head.getList(10), + head.getList(11), + head.getList(12), + head.getList(13), + head.getList(14), + head.getList(15) + ) + } } finally { spark.close() } diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt index 0d1e96e6..80bd20f7 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/TopologyParser.kt @@ -61,7 +61,7 @@ public class TopologyParser(private val collection: MongoCollection, p val random = Random(0) for (machine in fetchMachines(id)) { - val clusterId = machine.getObjectId("rack_id") + val clusterId = machine.get("rack_id").toString() val position = machine.getInteger("position") val processors = machine.getList("cpus", Document::class.java).flatMap { cpu -> diff --git a/simulator/opendc-runner-web/src/main/resources/log4j2.xml b/simulator/opendc-runner-web/src/main/resources/log4j2.xml index 16cedf34..ffc39890 100644 --- a/simulator/opendc-runner-web/src/main/resources/log4j2.xml +++ b/simulator/opendc-runner-web/src/main/resources/log4j2.xml @@ -30,7 +30,7 @@ - + @@ -39,7 +39,7 @@ - + -- cgit v1.2.3 From 4ec2ace2e1ca37294f6e55c2965f1fc6f98d622c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 29 Oct 2020 23:45:54 +0100 Subject: Eliminate Spark dependencies from Web Runner This change eliminates the large Spark dependencies from the web runner. Instead, we perform on the fly aggregation of the data and report directly to MongoDB. --- simulator/opendc-runner-web/build.gradle.kts | 7 +- .../src/main/kotlin/org/opendc/runner/web/Main.kt | 49 +--- .../org/opendc/runner/web/ResultProcessor.kt | 237 ------------------ .../org/opendc/runner/web/ScenarioManager.kt | 33 ++- .../org/opendc/runner/web/WebExperimentMonitor.kt | 273 +++++++++++++++++++++ .../src/main/resources/log4j2.xml | 8 +- 6 files changed, 304 insertions(+), 303 deletions(-) delete mode 100644 simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt create mode 100644 simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt (limited to 'simulator') diff --git a/simulator/opendc-runner-web/build.gradle.kts b/simulator/opendc-runner-web/build.gradle.kts index cf437843..a46e430f 100644 --- a/simulator/opendc-runner-web/build.gradle.kts +++ b/simulator/opendc-runner-web/build.gradle.kts @@ -42,12 +42,11 @@ dependencies { implementation("com.github.ajalt:clikt:2.8.0") implementation("io.github.microutils:kotlin-logging:1.7.10") + implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") { + exclude("org.jetbrains.kotlin", module = "kotlin-reflect") + } implementation("org.mongodb:mongodb-driver-sync:4.0.5") - implementation("org.apache.spark:spark-sql_2.12:3.0.0") { - exclude(group = "org.slf4j", module = "slf4j-log4j12") - exclude(group = "log4j") - } runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1") runtimeOnly("org.apache.logging.log4j:log4j-1.2-api:2.13.1") diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 0b96db3d..80b3bb20 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -44,7 +44,6 @@ import org.opendc.experiments.sc20.experiment.attachMonitor import org.opendc.experiments.sc20.experiment.createFailureDomain import org.opendc.experiments.sc20.experiment.createProvisioner import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor import org.opendc.experiments.sc20.experiment.processTrace import org.opendc.experiments.sc20.trace.Sc20ParquetTraceReader import org.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader @@ -123,27 +122,6 @@ public class RunnerCli : CliktCommand(name = "runner") { .file(canBeFile = false) .defaultLazy { File("traces/") } - /** - * The path to the output directory. - */ - private val outputPath by option( - "--output", - help = "path to the results directory", - envvar = "OPENDC_OUTPUT" - ) - .file(canBeFile = false) - .defaultLazy { File("results/") } - - /** - * The Spark master to connect to. - */ - private val spark by option( - "--spark", - help = "Spark master to connect to", - envvar = "OPENDC_SPARK" - ) - .default("local[*]") - /** * Connect to the user-specified database. */ @@ -165,7 +143,7 @@ public class RunnerCli : CliktCommand(name = "runner") { /** * Run a single scenario. */ - private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection) { + private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection): List { val id = scenario.getObjectId("_id") logger.info { "Constructing performance interference model" } @@ -189,12 +167,14 @@ public class RunnerCli : CliktCommand(name = "runner") { val targets = portfolio.get("targets", Document::class.java) - repeat(targets.getInteger("repeatsPerScenario")) { + val results = (0..targets.getInteger("repeatsPerScenario") - 1).map { logger.info { "Starting repeat $it" } runRepeat(scenario, it, topologies, traceReader, performanceInterferenceReader) } logger.info { "Finished simulation for scenario $id" } + + return results } /** @@ -206,8 +186,7 @@ public class RunnerCli : CliktCommand(name = "runner") { topologies: MongoCollection, traceReader: Sc20RawParquetTraceReader, performanceInterferenceReader: Sc20PerformanceInterferenceReader? - ) { - val id = scenario.getObjectId("_id") + ): WebExperimentMonitor.Result { val seed = repeat val traceDocument = scenario.get("trace", Document::class.java) val workloadName = traceDocument.getString("traceId") @@ -243,11 +222,7 @@ public class RunnerCli : CliktCommand(name = "runner") { ) val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) val environment = TopologyParser(topologies, topologyId) - val monitor = ParquetExperimentMonitor( - outputPath, - "scenario_id=$id/run_id=$repeat", - 4096 - ) + val monitor = WebExperimentMonitor() testScope.launch { val (bareMetalProvisioner, scheduler) = createProvisioner( @@ -297,6 +272,8 @@ public class RunnerCli : CliktCommand(name = "runner") { } finally { monitor.close() } + + return monitor.getResult() } private val POLL_INTERVAL = 5000L // ms = 5 s @@ -310,9 +287,6 @@ public class RunnerCli : CliktCommand(name = "runner") { val portfolios = database.getCollection("portfolios") val topologies = database.getCollection("topologies") - logger.info { "Launching Spark" } - val resultProcessor = ResultProcessor(spark, outputPath) - logger.info { "Watching for queued scenarios" } while (true) { @@ -343,12 +317,11 @@ public class RunnerCli : CliktCommand(name = "runner") { try { val portfolio = portfolios.find(Filters.eq("_id", scenario.getObjectId("portfolioId"))).first()!! - runScenario(portfolio, scenario, topologies) + val results = runScenario(portfolio, scenario, topologies) - logger.info { "Starting result processing" } + logger.info { "Writing results to database" } - val result = resultProcessor.process(id) - manager.finish(id, result) + manager.finish(id, results) logger.info { "Successfully finished scenario $id" } } catch (e: Exception) { diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt deleted file mode 100644 index 7f122cf7..00000000 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.runner.web - -import org.apache.spark.sql.Column -import org.apache.spark.sql.Dataset -import org.apache.spark.sql.Row -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.functions.* -import org.bson.types.ObjectId -import java.io.File - -/** - * A helper class for processing the experiment results using Apache Spark. - */ -public class ResultProcessor(private val master: String, private val outputPath: File) { - /** - * Process the results of the scenario with the given [id]. - */ - public fun process(id: ObjectId): Result { - val spark = SparkSession.builder() - .master(master) - .appName("opendc-simulator-$id") - .config("spark.driver.bindAddress", "0.0.0.0") // Needed to allow the worker to connect to driver - .orCreate - - try { - val hostMetrics = spark.read().parquet(File(outputPath, "host-metrics/scenario_id=$id").path) - val provisionerMetrics = spark.read().parquet(File(outputPath, "provisioner-metrics/scenario_id=$id").path) - val res = aggregate(hostMetrics, provisionerMetrics).collectAsList() - - if (res.isEmpty()) { - return Result( - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList() - ) - } else { - val head = res.first() - return Result( - head.getList(1), - head.getList(2), - head.getList(3), - head.getList(4), - head.getList(5), - head.getList(6), - head.getList(7), - head.getList(8), - head.getList(9), - head.getList(10), - head.getList(11), - head.getList(12), - head.getList(13), - head.getList(14), - head.getList(15) - ) - } - } finally { - spark.close() - } - } - - public data class Result( - public val totalRequestedBurst: List, - public val totalGrantedBurst: List, - public val totalOvercommittedBurst: List, - public val totalInterferedBurst: List, - public val meanCpuUsage: List, - public val meanCpuDemand: List, - public val meanNumDeployedImages: List, - public val maxNumDeployedImages: List, - public val totalPowerDraw: List, - public val totalFailureSlices: List, - public val totalFailureVmSlices: List, - public val totalVmsSubmitted: List, - public val totalVmsQueued: List, - public val totalVmsFinished: List, - public val totalVmsFailed: List - ) - - /** - * Perform aggregation of the experiment results. - */ - private fun aggregate(hostMetrics: Dataset, provisionerMetrics: Dataset): Dataset { - // Extrapolate the duration of the entries to span the entire trace - val hostMetricsExtra = hostMetrics - .withColumn("slice_counts", floor(col("duration") / lit(sliceLength))) - .withColumn("power_draw", col("power_draw") * col("slice_counts")) - .withColumn("state_int", states[col("state")]) - .withColumn("state_opposite_int", oppositeStates[col("state")]) - .withColumn("cpu_usage", col("cpu_usage") * col("slice_counts") * col("state_opposite_int")) - .withColumn("cpu_demand", col("cpu_demand") * col("slice_counts")) - .withColumn("failure_slice_count", col("slice_counts") * col("state_int")) - .withColumn("failure_vm_slice_count", col("slice_counts") * col("state_int") * col("vm_count")) - - // Process all data in a single run - val hostMetricsGrouped = hostMetricsExtra.groupBy("run_id") - - // Aggregate the summed total metrics - val systemMetrics = hostMetricsGrouped.agg( - sum("requested_burst").alias("total_requested_burst"), - sum("granted_burst").alias("total_granted_burst"), - sum("overcommissioned_burst").alias("total_overcommitted_burst"), - sum("interfered_burst").alias("total_interfered_burst"), - sum("power_draw").alias("total_power_draw"), - sum("failure_slice_count").alias("total_failure_slices"), - sum("failure_vm_slice_count").alias("total_failure_vm_slices") - ) - - // Aggregate metrics per host - val hvMetrics = hostMetrics - .groupBy("run_id", "host_id") - .agg( - sum("cpu_usage").alias("mean_cpu_usage"), - sum("cpu_demand").alias("mean_cpu_demand"), - avg("vm_count").alias("mean_num_deployed_images"), - count(lit(1)).alias("num_rows") - ) - .withColumn("mean_cpu_usage", col("mean_cpu_usage") / col("num_rows")) - .withColumn("mean_cpu_demand", col("mean_cpu_demand") / col("num_rows")) - .groupBy("run_id") - .agg( - avg("mean_cpu_usage").alias("mean_cpu_usage"), - avg("mean_cpu_demand").alias("mean_cpu_demand"), - avg("mean_num_deployed_images").alias("mean_num_deployed_images"), - max("mean_num_deployed_images").alias("max_num_deployed_images") - ) - - // Group the provisioner metrics per run - val provisionerMetricsGrouped = provisionerMetrics.groupBy("run_id") - - // Aggregate the provisioner metrics - val provisionerMetricsAggregated = provisionerMetricsGrouped.agg( - max("vm_total_count").alias("total_vms_submitted"), - max("vm_waiting_count").alias("total_vms_queued"), - max("vm_active_count").alias("total_vms_running"), - max("vm_inactive_count").alias("total_vms_finished"), - max("vm_failed_count").alias("total_vms_failed") - ) - - // Join the results into a single data frame - return systemMetrics - .join(hvMetrics, "run_id") - .join(provisionerMetricsAggregated, "run_id") - .select( - col("total_requested_burst"), - col("total_granted_burst"), - col("total_overcommitted_burst"), - col("total_interfered_burst"), - col("mean_cpu_usage"), - col("mean_cpu_demand"), - col("mean_num_deployed_images"), - col("max_num_deployed_images"), - col("total_power_draw"), - col("total_failure_slices"), - col("total_failure_vm_slices"), - col("total_vms_submitted"), - col("total_vms_queued"), - col("total_vms_finished"), - col("total_vms_failed") - ) - .groupBy(lit(1)) - .agg( - // TODO Check if order of values is correct - collect_list(col("total_requested_burst")).alias("total_requested_burst"), - collect_list(col("total_granted_burst")).alias("total_granted_burst"), - collect_list(col("total_overcommitted_burst")).alias("total_overcommitted_burst"), - collect_list(col("total_interfered_burst")).alias("total_interfered_burst"), - collect_list(col("mean_cpu_usage")).alias("mean_cpu_usage"), - collect_list(col("mean_cpu_demand")).alias("mean_cpu_demand"), - collect_list(col("mean_num_deployed_images")).alias("mean_num_deployed_images"), - collect_list(col("max_num_deployed_images")).alias("max_num_deployed_images"), - collect_list(col("total_power_draw")).alias("total_power_draw"), - collect_list(col("total_failure_slices")).alias("total_failure_slices"), - collect_list(col("total_failure_vm_slices")).alias("total_failure_vm_slices"), - collect_list(col("total_vms_submitted")).alias("total_vms_submitted"), - collect_list(col("total_vms_queued")).alias("total_vms_queued"), - collect_list(col("total_vms_finished")).alias("total_vms_finished"), - collect_list(col("total_vms_failed")).alias("total_vms_failed") - ) - } - - // Spark helper functions - private operator fun Column.times(other: Column): Column = `$times`(other) - private operator fun Column.div(other: Column): Column = `$div`(other) - private operator fun Column.get(other: Column): Column = this.apply(other) - - private val sliceLength = 5 * 60 * 1000 - private val states = map( - lit("ERROR"), - lit(1), - lit("ACTIVE"), - lit(0), - lit("SHUTOFF"), - lit(0) - ) - private val oppositeStates = map( - lit("ERROR"), - lit(0), - lit("ACTIVE"), - lit(1), - lit("SHUTOFF"), - lit(1) - ) -} diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt index 14f66757..a3907051 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt @@ -88,28 +88,27 @@ public class ScenarioManager(private val collection: MongoCollection) /** * Persist the specified results. */ - public fun finish(id: ObjectId, result: ResultProcessor.Result) { + public fun finish(id: ObjectId, results: List) { collection.findOneAndUpdate( Filters.eq("_id", id), Updates.combine( Updates.set("simulation.state", "FINISHED"), Updates.unset("simulation.time"), - Updates.set("results.total_requested_burst", result.totalRequestedBurst), - Updates.set("results.total_granted_burst", result.totalGrantedBurst), - Updates.set("results.total_overcommitted_burst", result.totalOvercommittedBurst), - Updates.set("results.total_interfered_burst", result.totalInterferedBurst), - Updates.set("results.mean_cpu_usage", result.meanCpuUsage), - Updates.set("results.mean_cpu_demand", result.meanCpuDemand), - Updates.set("results.mean_num_deployed_images", result.meanNumDeployedImages), - Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), - Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), - Updates.set("results.total_power_draw", result.totalPowerDraw), - Updates.set("results.total_failure_slices", result.totalFailureSlices), - Updates.set("results.total_failure_vm_slices", result.totalFailureVmSlices), - Updates.set("results.total_vms_submitted", result.totalVmsSubmitted), - Updates.set("results.total_vms_queued", result.totalVmsQueued), - Updates.set("results.total_vms_finished", result.totalVmsFinished), - Updates.set("results.total_vms_failed", result.totalVmsFailed) + Updates.set("results.total_requested_burst", results.map { it.totalRequestedBurst }), + Updates.set("results.total_granted_burst", results.map { it.totalGrantedBurst }), + Updates.set("results.total_overcommitted_burst", results.map { it.totalOvercommittedBurst }), + Updates.set("results.total_interfered_burst", results.map { it.totalInterferedBurst }), + Updates.set("results.mean_cpu_usage", results.map { it.meanCpuUsage }), + Updates.set("results.mean_cpu_demand", results.map { it.meanCpuDemand }), + Updates.set("results.mean_num_deployed_images", results.map { it.meanNumDeployedImages }), + Updates.set("results.max_num_deployed_images", results.map { it.maxNumDeployedImages }), + Updates.set("results.total_power_draw", results.map { it.totalPowerDraw }), + Updates.set("results.total_failure_slices", results.map { it.totalFailureSlices }), + Updates.set("results.total_failure_vm_slices", results.map { it.totalFailureVmSlices }), + Updates.set("results.total_vms_submitted", results.map { it.totalVmsSubmitted }), + Updates.set("results.total_vms_queued", results.map { it.totalVmsQueued }), + Updates.set("results.total_vms_finished", results.map { it.totalVmsFinished }), + Updates.set("results.total_vms_failed", results.map { it.totalVmsFailed }) ) ) } diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt new file mode 100644 index 00000000..7ef25552 --- /dev/null +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt @@ -0,0 +1,273 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.runner.web + +import mu.KotlinLogging +import org.opendc.compute.core.Server +import org.opendc.compute.core.ServerState +import org.opendc.compute.core.virt.driver.VirtDriver +import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor +import org.opendc.experiments.sc20.telemetry.HostEvent +import kotlin.math.max + +/** + * An [ExperimentMonitor] that tracks the aggregate metrics for each repeat. + */ +public class WebExperimentMonitor : ExperimentMonitor { + private val logger = KotlinLogging.logger {} + private val currentHostEvent = mutableMapOf() + private var startTime = -1L + + override fun reportVmStateChange(time: Long, server: Server) { + if (startTime < 0) { + startTime = time + + // Update timestamp of initial event + currentHostEvent.replaceAll { _, v -> v.copy(timestamp = startTime) } + } + } + + override fun reportHostStateChange( + time: Long, + driver: VirtDriver, + server: Server + ) { + logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" } + + val previousEvent = currentHostEvent[server] + + val roundedTime = previousEvent?.let { + val duration = time - it.timestamp + val k = 5 * 60 * 1000L // 5 min in ms + val rem = duration % k + + if (rem == 0L) { + time + } else { + it.timestamp + duration + k - rem + } + } ?: time + + reportHostSlice( + roundedTime, + 0, + 0, + 0, + 0, + 0.0, + 0.0, + 0, + server + ) + } + + private val lastPowerConsumption = mutableMapOf() + + override fun reportPowerConsumption(host: Server, draw: Double) { + lastPowerConsumption[host] = draw + } + + override fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + duration: Long + ) { + val previousEvent = currentHostEvent[hostServer] + when { + previousEvent == null -> { + val event = HostEvent( + time, + 5 * 60 * 1000L, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount + ) + + currentHostEvent[hostServer] = event + } + previousEvent.timestamp == time -> { + val event = HostEvent( + time, + previousEvent.duration, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount + ) + + currentHostEvent[hostServer] = event + } + else -> { + processHostEvent(previousEvent) + + val event = HostEvent( + time, + time - previousEvent.timestamp, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount + ) + + currentHostEvent[hostServer] = event + } + } + } + + private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() + private val hostMetrics: MutableMap = mutableMapOf() + + private fun processHostEvent(event: HostEvent) { + val slices = event.duration / SLICE_LENGTH + + hostAggregateMetrics = AggregateHostMetrics( + hostAggregateMetrics.totalRequestedBurst + event.requestedBurst, + hostAggregateMetrics.totalGrantedBurst + event.grantedBurst, + hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst, + hostAggregateMetrics.totalInterferedBurst + event.interferedBurst, + hostAggregateMetrics.totalPowerDraw + (slices * (event.powerDraw / 12)), + hostAggregateMetrics.totalFailureSlices + if (event.host.state != ServerState.ACTIVE) slices.toLong() else 0, + hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != ServerState.ACTIVE) event.vmCount * slices.toLong() else 0 + ) + + hostMetrics.compute(event.host) { key, prev -> + HostMetrics( + (event.cpuUsage.takeIf { event.host.state == ServerState.ACTIVE } ?: 0.0) + (prev?.cpuUsage ?: 0.0), + (event.cpuDemand.takeIf { event.host.state == ServerState.ACTIVE } ?: 0.0) + (prev?.cpuDemand ?: 0.0), + event.vmCount + (prev?.vmCount ?: 0), + 1 + (prev?.count ?: 0) + ) + } + } + + private val SLICE_LENGTH: Long = 5 * 60 * 1000 + + public data class AggregateHostMetrics( + val totalRequestedBurst: Long = 0, + val totalGrantedBurst: Long = 0, + val totalOvercommittedBurst: Long = 0, + val totalInterferedBurst: Long = 0, + val totalPowerDraw: Double = 0.0, + val totalFailureSlices: Long = 0, + val totalFailureVmSlices: Long = 0, + ) + + public data class HostMetrics( + val cpuUsage: Double, + val cpuDemand: Double, + val vmCount: Long, + val count: Long + ) + + private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics() + + override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { + provisionerMetrics = AggregateProvisionerMetrics( + max(event.totalVmCount, provisionerMetrics.vmTotalCount), + max(event.waitingVmCount, provisionerMetrics.vmWaitingCount), + max(event.activeVmCount, provisionerMetrics.vmActiveCount), + max(event.inactiveVmCount, provisionerMetrics.vmInactiveCount), + max(event.failedVmCount, provisionerMetrics.vmFailedCount), + ) + } + + public data class AggregateProvisionerMetrics( + val vmTotalCount: Int = 0, + val vmWaitingCount: Int = 0, + val vmActiveCount: Int = 0, + val vmInactiveCount: Int = 0, + val vmFailedCount: Int = 0 + ) + + override fun close() { + for ((_, event) in currentHostEvent) { + processHostEvent(event) + } + currentHostEvent.clear() + } + + public fun getResult(): Result { + return Result( + hostAggregateMetrics.totalRequestedBurst, + hostAggregateMetrics.totalGrantedBurst, + hostAggregateMetrics.totalOvercommittedBurst, + hostAggregateMetrics.totalInterferedBurst, + hostMetrics.map { it.value.cpuUsage / it.value.count }.average(), + hostMetrics.map { it.value.cpuDemand / it.value.count }.average(), + hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.average(), + hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, + hostAggregateMetrics.totalPowerDraw, + hostAggregateMetrics.totalFailureSlices, + hostAggregateMetrics.totalFailureVmSlices, + provisionerMetrics.vmTotalCount, + provisionerMetrics.vmWaitingCount, + provisionerMetrics.vmInactiveCount, + provisionerMetrics.vmFailedCount, + ) + } + + public data class Result( + public val totalRequestedBurst: Long, + public val totalGrantedBurst: Long, + public val totalOvercommittedBurst: Long, + public val totalInterferedBurst: Long, + public val meanCpuUsage: Double, + public val meanCpuDemand: Double, + public val meanNumDeployedImages: Double, + public val maxNumDeployedImages: Double, + public val totalPowerDraw: Double, + public val totalFailureSlices: Long, + public val totalFailureVmSlices: Long, + public val totalVmsSubmitted: Int, + public val totalVmsQueued: Int, + public val totalVmsFinished: Int, + public val totalVmsFailed: Int + ) +} diff --git a/simulator/opendc-runner-web/src/main/resources/log4j2.xml b/simulator/opendc-runner-web/src/main/resources/log4j2.xml index ffc39890..87179332 100644 --- a/simulator/opendc-runner-web/src/main/resources/log4j2.xml +++ b/simulator/opendc-runner-web/src/main/resources/log4j2.xml @@ -30,18 +30,12 @@ - + - - - - - - -- cgit v1.2.3