diff options
Diffstat (limited to 'simulator/opendc-runner-web')
6 files changed, 304 insertions, 303 deletions
diff --git a/simulator/opendc-runner-web/build.gradle.kts b/simulator/opendc-runner-web/build.gradle.kts index cf437843..a46e430f 100644 --- a/simulator/opendc-runner-web/build.gradle.kts +++ b/simulator/opendc-runner-web/build.gradle.kts @@ -42,12 +42,11 @@ dependencies { implementation("com.github.ajalt:clikt:2.8.0") implementation("io.github.microutils:kotlin-logging:1.7.10") + implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") { + exclude("org.jetbrains.kotlin", module = "kotlin-reflect") + } implementation("org.mongodb:mongodb-driver-sync:4.0.5") - implementation("org.apache.spark:spark-sql_2.12:3.0.0") { - exclude(group = "org.slf4j", module = "slf4j-log4j12") - exclude(group = "log4j") - } runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1") runtimeOnly("org.apache.logging.log4j:log4j-1.2-api:2.13.1") diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 0b96db3d..80b3bb20 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -44,7 +44,6 @@ import org.opendc.experiments.sc20.experiment.attachMonitor import org.opendc.experiments.sc20.experiment.createFailureDomain import org.opendc.experiments.sc20.experiment.createProvisioner import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor import org.opendc.experiments.sc20.experiment.processTrace import org.opendc.experiments.sc20.trace.Sc20ParquetTraceReader import org.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader @@ -124,27 +123,6 @@ public class RunnerCli : CliktCommand(name = "runner") { .defaultLazy { File("traces/") } /** - * The path to the output directory. - */ - private val outputPath by option( - "--output", - help = "path to the results directory", - envvar = "OPENDC_OUTPUT" - ) - .file(canBeFile = false) - .defaultLazy { File("results/") } - - /** - * The Spark master to connect to. - */ - private val spark by option( - "--spark", - help = "Spark master to connect to", - envvar = "OPENDC_SPARK" - ) - .default("local[*]") - - /** * Connect to the user-specified database. */ private fun createDatabase(): MongoDatabase { @@ -165,7 +143,7 @@ public class RunnerCli : CliktCommand(name = "runner") { /** * Run a single scenario. */ - private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection<Document>) { + private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection<Document>): List<WebExperimentMonitor.Result> { val id = scenario.getObjectId("_id") logger.info { "Constructing performance interference model" } @@ -189,12 +167,14 @@ public class RunnerCli : CliktCommand(name = "runner") { val targets = portfolio.get("targets", Document::class.java) - repeat(targets.getInteger("repeatsPerScenario")) { + val results = (0..targets.getInteger("repeatsPerScenario") - 1).map { logger.info { "Starting repeat $it" } runRepeat(scenario, it, topologies, traceReader, performanceInterferenceReader) } logger.info { "Finished simulation for scenario $id" } + + return results } /** @@ -206,8 +186,7 @@ public class RunnerCli : CliktCommand(name = "runner") { topologies: MongoCollection<Document>, traceReader: Sc20RawParquetTraceReader, performanceInterferenceReader: Sc20PerformanceInterferenceReader? - ) { - val id = scenario.getObjectId("_id") + ): WebExperimentMonitor.Result { val seed = repeat val traceDocument = scenario.get("trace", Document::class.java) val workloadName = traceDocument.getString("traceId") @@ -243,11 +222,7 @@ public class RunnerCli : CliktCommand(name = "runner") { ) val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) val environment = TopologyParser(topologies, topologyId) - val monitor = ParquetExperimentMonitor( - outputPath, - "scenario_id=$id/run_id=$repeat", - 4096 - ) + val monitor = WebExperimentMonitor() testScope.launch { val (bareMetalProvisioner, scheduler) = createProvisioner( @@ -297,6 +272,8 @@ public class RunnerCli : CliktCommand(name = "runner") { } finally { monitor.close() } + + return monitor.getResult() } private val POLL_INTERVAL = 5000L // ms = 5 s @@ -310,9 +287,6 @@ public class RunnerCli : CliktCommand(name = "runner") { val portfolios = database.getCollection("portfolios") val topologies = database.getCollection("topologies") - logger.info { "Launching Spark" } - val resultProcessor = ResultProcessor(spark, outputPath) - logger.info { "Watching for queued scenarios" } while (true) { @@ -343,12 +317,11 @@ public class RunnerCli : CliktCommand(name = "runner") { try { val portfolio = portfolios.find(Filters.eq("_id", scenario.getObjectId("portfolioId"))).first()!! - runScenario(portfolio, scenario, topologies) + val results = runScenario(portfolio, scenario, topologies) - logger.info { "Starting result processing" } + logger.info { "Writing results to database" } - val result = resultProcessor.process(id) - manager.finish(id, result) + manager.finish(id, results) logger.info { "Successfully finished scenario $id" } } catch (e: Exception) { diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt deleted file mode 100644 index 7f122cf7..00000000 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.runner.web - -import org.apache.spark.sql.Column -import org.apache.spark.sql.Dataset -import org.apache.spark.sql.Row -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.functions.* -import org.bson.types.ObjectId -import java.io.File - -/** - * A helper class for processing the experiment results using Apache Spark. - */ -public class ResultProcessor(private val master: String, private val outputPath: File) { - /** - * Process the results of the scenario with the given [id]. - */ - public fun process(id: ObjectId): Result { - val spark = SparkSession.builder() - .master(master) - .appName("opendc-simulator-$id") - .config("spark.driver.bindAddress", "0.0.0.0") // Needed to allow the worker to connect to driver - .orCreate - - try { - val hostMetrics = spark.read().parquet(File(outputPath, "host-metrics/scenario_id=$id").path) - val provisionerMetrics = spark.read().parquet(File(outputPath, "provisioner-metrics/scenario_id=$id").path) - val res = aggregate(hostMetrics, provisionerMetrics).collectAsList() - - if (res.isEmpty()) { - return Result( - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList() - ) - } else { - val head = res.first() - return Result( - head.getList<Long>(1), - head.getList<Long>(2), - head.getList<Long>(3), - head.getList<Long>(4), - head.getList<Double>(5), - head.getList<Double>(6), - head.getList<Double>(7), - head.getList<Int>(8), - head.getList<Long>(9), - head.getList<Long>(10), - head.getList<Long>(11), - head.getList<Int>(12), - head.getList<Int>(13), - head.getList<Int>(14), - head.getList<Int>(15) - ) - } - } finally { - spark.close() - } - } - - public data class Result( - public val totalRequestedBurst: List<Long>, - public val totalGrantedBurst: List<Long>, - public val totalOvercommittedBurst: List<Long>, - public val totalInterferedBurst: List<Long>, - public val meanCpuUsage: List<Double>, - public val meanCpuDemand: List<Double>, - public val meanNumDeployedImages: List<Double>, - public val maxNumDeployedImages: List<Int>, - public val totalPowerDraw: List<Long>, - public val totalFailureSlices: List<Long>, - public val totalFailureVmSlices: List<Long>, - public val totalVmsSubmitted: List<Int>, - public val totalVmsQueued: List<Int>, - public val totalVmsFinished: List<Int>, - public val totalVmsFailed: List<Int> - ) - - /** - * Perform aggregation of the experiment results. - */ - private fun aggregate(hostMetrics: Dataset<Row>, provisionerMetrics: Dataset<Row>): Dataset<Row> { - // Extrapolate the duration of the entries to span the entire trace - val hostMetricsExtra = hostMetrics - .withColumn("slice_counts", floor(col("duration") / lit(sliceLength))) - .withColumn("power_draw", col("power_draw") * col("slice_counts")) - .withColumn("state_int", states[col("state")]) - .withColumn("state_opposite_int", oppositeStates[col("state")]) - .withColumn("cpu_usage", col("cpu_usage") * col("slice_counts") * col("state_opposite_int")) - .withColumn("cpu_demand", col("cpu_demand") * col("slice_counts")) - .withColumn("failure_slice_count", col("slice_counts") * col("state_int")) - .withColumn("failure_vm_slice_count", col("slice_counts") * col("state_int") * col("vm_count")) - - // Process all data in a single run - val hostMetricsGrouped = hostMetricsExtra.groupBy("run_id") - - // Aggregate the summed total metrics - val systemMetrics = hostMetricsGrouped.agg( - sum("requested_burst").alias("total_requested_burst"), - sum("granted_burst").alias("total_granted_burst"), - sum("overcommissioned_burst").alias("total_overcommitted_burst"), - sum("interfered_burst").alias("total_interfered_burst"), - sum("power_draw").alias("total_power_draw"), - sum("failure_slice_count").alias("total_failure_slices"), - sum("failure_vm_slice_count").alias("total_failure_vm_slices") - ) - - // Aggregate metrics per host - val hvMetrics = hostMetrics - .groupBy("run_id", "host_id") - .agg( - sum("cpu_usage").alias("mean_cpu_usage"), - sum("cpu_demand").alias("mean_cpu_demand"), - avg("vm_count").alias("mean_num_deployed_images"), - count(lit(1)).alias("num_rows") - ) - .withColumn("mean_cpu_usage", col("mean_cpu_usage") / col("num_rows")) - .withColumn("mean_cpu_demand", col("mean_cpu_demand") / col("num_rows")) - .groupBy("run_id") - .agg( - avg("mean_cpu_usage").alias("mean_cpu_usage"), - avg("mean_cpu_demand").alias("mean_cpu_demand"), - avg("mean_num_deployed_images").alias("mean_num_deployed_images"), - max("mean_num_deployed_images").alias("max_num_deployed_images") - ) - - // Group the provisioner metrics per run - val provisionerMetricsGrouped = provisionerMetrics.groupBy("run_id") - - // Aggregate the provisioner metrics - val provisionerMetricsAggregated = provisionerMetricsGrouped.agg( - max("vm_total_count").alias("total_vms_submitted"), - max("vm_waiting_count").alias("total_vms_queued"), - max("vm_active_count").alias("total_vms_running"), - max("vm_inactive_count").alias("total_vms_finished"), - max("vm_failed_count").alias("total_vms_failed") - ) - - // Join the results into a single data frame - return systemMetrics - .join(hvMetrics, "run_id") - .join(provisionerMetricsAggregated, "run_id") - .select( - col("total_requested_burst"), - col("total_granted_burst"), - col("total_overcommitted_burst"), - col("total_interfered_burst"), - col("mean_cpu_usage"), - col("mean_cpu_demand"), - col("mean_num_deployed_images"), - col("max_num_deployed_images"), - col("total_power_draw"), - col("total_failure_slices"), - col("total_failure_vm_slices"), - col("total_vms_submitted"), - col("total_vms_queued"), - col("total_vms_finished"), - col("total_vms_failed") - ) - .groupBy(lit(1)) - .agg( - // TODO Check if order of values is correct - collect_list(col("total_requested_burst")).alias("total_requested_burst"), - collect_list(col("total_granted_burst")).alias("total_granted_burst"), - collect_list(col("total_overcommitted_burst")).alias("total_overcommitted_burst"), - collect_list(col("total_interfered_burst")).alias("total_interfered_burst"), - collect_list(col("mean_cpu_usage")).alias("mean_cpu_usage"), - collect_list(col("mean_cpu_demand")).alias("mean_cpu_demand"), - collect_list(col("mean_num_deployed_images")).alias("mean_num_deployed_images"), - collect_list(col("max_num_deployed_images")).alias("max_num_deployed_images"), - collect_list(col("total_power_draw")).alias("total_power_draw"), - collect_list(col("total_failure_slices")).alias("total_failure_slices"), - collect_list(col("total_failure_vm_slices")).alias("total_failure_vm_slices"), - collect_list(col("total_vms_submitted")).alias("total_vms_submitted"), - collect_list(col("total_vms_queued")).alias("total_vms_queued"), - collect_list(col("total_vms_finished")).alias("total_vms_finished"), - collect_list(col("total_vms_failed")).alias("total_vms_failed") - ) - } - - // Spark helper functions - private operator fun Column.times(other: Column): Column = `$times`(other) - private operator fun Column.div(other: Column): Column = `$div`(other) - private operator fun Column.get(other: Column): Column = this.apply(other) - - private val sliceLength = 5 * 60 * 1000 - private val states = map( - lit("ERROR"), - lit(1), - lit("ACTIVE"), - lit(0), - lit("SHUTOFF"), - lit(0) - ) - private val oppositeStates = map( - lit("ERROR"), - lit(0), - lit("ACTIVE"), - lit(1), - lit("SHUTOFF"), - lit(1) - ) -} diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt index 14f66757..a3907051 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt @@ -88,28 +88,27 @@ public class ScenarioManager(private val collection: MongoCollection<Document>) /** * Persist the specified results. */ - public fun finish(id: ObjectId, result: ResultProcessor.Result) { + public fun finish(id: ObjectId, results: List<WebExperimentMonitor.Result>) { collection.findOneAndUpdate( Filters.eq("_id", id), Updates.combine( Updates.set("simulation.state", "FINISHED"), Updates.unset("simulation.time"), - Updates.set("results.total_requested_burst", result.totalRequestedBurst), - Updates.set("results.total_granted_burst", result.totalGrantedBurst), - Updates.set("results.total_overcommitted_burst", result.totalOvercommittedBurst), - Updates.set("results.total_interfered_burst", result.totalInterferedBurst), - Updates.set("results.mean_cpu_usage", result.meanCpuUsage), - Updates.set("results.mean_cpu_demand", result.meanCpuDemand), - Updates.set("results.mean_num_deployed_images", result.meanNumDeployedImages), - Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), - Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), - Updates.set("results.total_power_draw", result.totalPowerDraw), - Updates.set("results.total_failure_slices", result.totalFailureSlices), - Updates.set("results.total_failure_vm_slices", result.totalFailureVmSlices), - Updates.set("results.total_vms_submitted", result.totalVmsSubmitted), - Updates.set("results.total_vms_queued", result.totalVmsQueued), - Updates.set("results.total_vms_finished", result.totalVmsFinished), - Updates.set("results.total_vms_failed", result.totalVmsFailed) + Updates.set("results.total_requested_burst", results.map { it.totalRequestedBurst }), + Updates.set("results.total_granted_burst", results.map { it.totalGrantedBurst }), + Updates.set("results.total_overcommitted_burst", results.map { it.totalOvercommittedBurst }), + Updates.set("results.total_interfered_burst", results.map { it.totalInterferedBurst }), + Updates.set("results.mean_cpu_usage", results.map { it.meanCpuUsage }), + Updates.set("results.mean_cpu_demand", results.map { it.meanCpuDemand }), + Updates.set("results.mean_num_deployed_images", results.map { it.meanNumDeployedImages }), + Updates.set("results.max_num_deployed_images", results.map { it.maxNumDeployedImages }), + Updates.set("results.total_power_draw", results.map { it.totalPowerDraw }), + Updates.set("results.total_failure_slices", results.map { it.totalFailureSlices }), + Updates.set("results.total_failure_vm_slices", results.map { it.totalFailureVmSlices }), + Updates.set("results.total_vms_submitted", results.map { it.totalVmsSubmitted }), + Updates.set("results.total_vms_queued", results.map { it.totalVmsQueued }), + Updates.set("results.total_vms_finished", results.map { it.totalVmsFinished }), + Updates.set("results.total_vms_failed", results.map { it.totalVmsFailed }) ) ) } diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt new file mode 100644 index 00000000..7ef25552 --- /dev/null +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt @@ -0,0 +1,273 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.runner.web + +import mu.KotlinLogging +import org.opendc.compute.core.Server +import org.opendc.compute.core.ServerState +import org.opendc.compute.core.virt.driver.VirtDriver +import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor +import org.opendc.experiments.sc20.telemetry.HostEvent +import kotlin.math.max + +/** + * An [ExperimentMonitor] that tracks the aggregate metrics for each repeat. + */ +public class WebExperimentMonitor : ExperimentMonitor { + private val logger = KotlinLogging.logger {} + private val currentHostEvent = mutableMapOf<Server, HostEvent>() + private var startTime = -1L + + override fun reportVmStateChange(time: Long, server: Server) { + if (startTime < 0) { + startTime = time + + // Update timestamp of initial event + currentHostEvent.replaceAll { _, v -> v.copy(timestamp = startTime) } + } + } + + override fun reportHostStateChange( + time: Long, + driver: VirtDriver, + server: Server + ) { + logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" } + + val previousEvent = currentHostEvent[server] + + val roundedTime = previousEvent?.let { + val duration = time - it.timestamp + val k = 5 * 60 * 1000L // 5 min in ms + val rem = duration % k + + if (rem == 0L) { + time + } else { + it.timestamp + duration + k - rem + } + } ?: time + + reportHostSlice( + roundedTime, + 0, + 0, + 0, + 0, + 0.0, + 0.0, + 0, + server + ) + } + + private val lastPowerConsumption = mutableMapOf<Server, Double>() + + override fun reportPowerConsumption(host: Server, draw: Double) { + lastPowerConsumption[host] = draw + } + + override fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + duration: Long + ) { + val previousEvent = currentHostEvent[hostServer] + when { + previousEvent == null -> { + val event = HostEvent( + time, + 5 * 60 * 1000L, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount + ) + + currentHostEvent[hostServer] = event + } + previousEvent.timestamp == time -> { + val event = HostEvent( + time, + previousEvent.duration, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount + ) + + currentHostEvent[hostServer] = event + } + else -> { + processHostEvent(previousEvent) + + val event = HostEvent( + time, + time - previousEvent.timestamp, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount + ) + + currentHostEvent[hostServer] = event + } + } + } + + private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() + private val hostMetrics: MutableMap<Server, HostMetrics> = mutableMapOf() + + private fun processHostEvent(event: HostEvent) { + val slices = event.duration / SLICE_LENGTH + + hostAggregateMetrics = AggregateHostMetrics( + hostAggregateMetrics.totalRequestedBurst + event.requestedBurst, + hostAggregateMetrics.totalGrantedBurst + event.grantedBurst, + hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst, + hostAggregateMetrics.totalInterferedBurst + event.interferedBurst, + hostAggregateMetrics.totalPowerDraw + (slices * (event.powerDraw / 12)), + hostAggregateMetrics.totalFailureSlices + if (event.host.state != ServerState.ACTIVE) slices.toLong() else 0, + hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != ServerState.ACTIVE) event.vmCount * slices.toLong() else 0 + ) + + hostMetrics.compute(event.host) { key, prev -> + HostMetrics( + (event.cpuUsage.takeIf { event.host.state == ServerState.ACTIVE } ?: 0.0) + (prev?.cpuUsage ?: 0.0), + (event.cpuDemand.takeIf { event.host.state == ServerState.ACTIVE } ?: 0.0) + (prev?.cpuDemand ?: 0.0), + event.vmCount + (prev?.vmCount ?: 0), + 1 + (prev?.count ?: 0) + ) + } + } + + private val SLICE_LENGTH: Long = 5 * 60 * 1000 + + public data class AggregateHostMetrics( + val totalRequestedBurst: Long = 0, + val totalGrantedBurst: Long = 0, + val totalOvercommittedBurst: Long = 0, + val totalInterferedBurst: Long = 0, + val totalPowerDraw: Double = 0.0, + val totalFailureSlices: Long = 0, + val totalFailureVmSlices: Long = 0, + ) + + public data class HostMetrics( + val cpuUsage: Double, + val cpuDemand: Double, + val vmCount: Long, + val count: Long + ) + + private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics() + + override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { + provisionerMetrics = AggregateProvisionerMetrics( + max(event.totalVmCount, provisionerMetrics.vmTotalCount), + max(event.waitingVmCount, provisionerMetrics.vmWaitingCount), + max(event.activeVmCount, provisionerMetrics.vmActiveCount), + max(event.inactiveVmCount, provisionerMetrics.vmInactiveCount), + max(event.failedVmCount, provisionerMetrics.vmFailedCount), + ) + } + + public data class AggregateProvisionerMetrics( + val vmTotalCount: Int = 0, + val vmWaitingCount: Int = 0, + val vmActiveCount: Int = 0, + val vmInactiveCount: Int = 0, + val vmFailedCount: Int = 0 + ) + + override fun close() { + for ((_, event) in currentHostEvent) { + processHostEvent(event) + } + currentHostEvent.clear() + } + + public fun getResult(): Result { + return Result( + hostAggregateMetrics.totalRequestedBurst, + hostAggregateMetrics.totalGrantedBurst, + hostAggregateMetrics.totalOvercommittedBurst, + hostAggregateMetrics.totalInterferedBurst, + hostMetrics.map { it.value.cpuUsage / it.value.count }.average(), + hostMetrics.map { it.value.cpuDemand / it.value.count }.average(), + hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.average(), + hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, + hostAggregateMetrics.totalPowerDraw, + hostAggregateMetrics.totalFailureSlices, + hostAggregateMetrics.totalFailureVmSlices, + provisionerMetrics.vmTotalCount, + provisionerMetrics.vmWaitingCount, + provisionerMetrics.vmInactiveCount, + provisionerMetrics.vmFailedCount, + ) + } + + public data class Result( + public val totalRequestedBurst: Long, + public val totalGrantedBurst: Long, + public val totalOvercommittedBurst: Long, + public val totalInterferedBurst: Long, + public val meanCpuUsage: Double, + public val meanCpuDemand: Double, + public val meanNumDeployedImages: Double, + public val maxNumDeployedImages: Double, + public val totalPowerDraw: Double, + public val totalFailureSlices: Long, + public val totalFailureVmSlices: Long, + public val totalVmsSubmitted: Int, + public val totalVmsQueued: Int, + public val totalVmsFinished: Int, + public val totalVmsFailed: Int + ) +} diff --git a/simulator/opendc-runner-web/src/main/resources/log4j2.xml b/simulator/opendc-runner-web/src/main/resources/log4j2.xml index ffc39890..87179332 100644 --- a/simulator/opendc-runner-web/src/main/resources/log4j2.xml +++ b/simulator/opendc-runner-web/src/main/resources/log4j2.xml @@ -30,18 +30,12 @@ </Console> </Appenders> <Loggers> - <Logger name="org.opendc" level="info" additivity="false"> + <Logger name="org.opendc" level="warn" additivity="false"> <AppenderRef ref="Console"/> </Logger> <Logger name="org.opendc.runner" level="info" additivity="false"> <AppenderRef ref="Console"/> </Logger> - <Logger name="org.apache.hadoop" level="warn" additivity="false"> - <AppenderRef ref="Console"/> - </Logger> - <Logger name="org.apache.spark" level="warn" additivity="false"> - <AppenderRef ref="Console"/> - </Logger> <Root level="error"> <AppenderRef ref="Console"/> </Root> |
