diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-10-29 23:45:54 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-10-29 23:59:44 +0100 |
| commit | 4ec2ace2e1ca37294f6e55c2965f1fc6f98d622c (patch) | |
| tree | 1f2239bb3758ac3099542fe7b06d559d58bdecd5 /simulator/opendc-runner-web/src/main/kotlin | |
| parent | eae920b9226ffd9c43699c7f555000d5fc1c623f (diff) | |
Eliminate Spark dependencies from Web Runner
This change eliminates the large Spark dependencies from the web runner.
Instead, we perform on the fly aggregation of the data and report
directly to MongoDB.
Diffstat (limited to 'simulator/opendc-runner-web/src/main/kotlin')
4 files changed, 300 insertions, 292 deletions
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt index 0b96db3d..80b3bb20 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt @@ -44,7 +44,6 @@ import org.opendc.experiments.sc20.experiment.attachMonitor import org.opendc.experiments.sc20.experiment.createFailureDomain import org.opendc.experiments.sc20.experiment.createProvisioner import org.opendc.experiments.sc20.experiment.model.Workload -import org.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor import org.opendc.experiments.sc20.experiment.processTrace import org.opendc.experiments.sc20.trace.Sc20ParquetTraceReader import org.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader @@ -124,27 +123,6 @@ public class RunnerCli : CliktCommand(name = "runner") { .defaultLazy { File("traces/") } /** - * The path to the output directory. - */ - private val outputPath by option( - "--output", - help = "path to the results directory", - envvar = "OPENDC_OUTPUT" - ) - .file(canBeFile = false) - .defaultLazy { File("results/") } - - /** - * The Spark master to connect to. - */ - private val spark by option( - "--spark", - help = "Spark master to connect to", - envvar = "OPENDC_SPARK" - ) - .default("local[*]") - - /** * Connect to the user-specified database. */ private fun createDatabase(): MongoDatabase { @@ -165,7 +143,7 @@ public class RunnerCli : CliktCommand(name = "runner") { /** * Run a single scenario. */ - private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection<Document>) { + private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection<Document>): List<WebExperimentMonitor.Result> { val id = scenario.getObjectId("_id") logger.info { "Constructing performance interference model" } @@ -189,12 +167,14 @@ public class RunnerCli : CliktCommand(name = "runner") { val targets = portfolio.get("targets", Document::class.java) - repeat(targets.getInteger("repeatsPerScenario")) { + val results = (0..targets.getInteger("repeatsPerScenario") - 1).map { logger.info { "Starting repeat $it" } runRepeat(scenario, it, topologies, traceReader, performanceInterferenceReader) } logger.info { "Finished simulation for scenario $id" } + + return results } /** @@ -206,8 +186,7 @@ public class RunnerCli : CliktCommand(name = "runner") { topologies: MongoCollection<Document>, traceReader: Sc20RawParquetTraceReader, performanceInterferenceReader: Sc20PerformanceInterferenceReader? - ) { - val id = scenario.getObjectId("_id") + ): WebExperimentMonitor.Result { val seed = repeat val traceDocument = scenario.get("trace", Document::class.java) val workloadName = traceDocument.getString("traceId") @@ -243,11 +222,7 @@ public class RunnerCli : CliktCommand(name = "runner") { ) val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java) val environment = TopologyParser(topologies, topologyId) - val monitor = ParquetExperimentMonitor( - outputPath, - "scenario_id=$id/run_id=$repeat", - 4096 - ) + val monitor = WebExperimentMonitor() testScope.launch { val (bareMetalProvisioner, scheduler) = createProvisioner( @@ -297,6 +272,8 @@ public class RunnerCli : CliktCommand(name = "runner") { } finally { monitor.close() } + + return monitor.getResult() } private val POLL_INTERVAL = 5000L // ms = 5 s @@ -310,9 +287,6 @@ public class RunnerCli : CliktCommand(name = "runner") { val portfolios = database.getCollection("portfolios") val topologies = database.getCollection("topologies") - logger.info { "Launching Spark" } - val resultProcessor = ResultProcessor(spark, outputPath) - logger.info { "Watching for queued scenarios" } while (true) { @@ -343,12 +317,11 @@ public class RunnerCli : CliktCommand(name = "runner") { try { val portfolio = portfolios.find(Filters.eq("_id", scenario.getObjectId("portfolioId"))).first()!! - runScenario(portfolio, scenario, topologies) + val results = runScenario(portfolio, scenario, topologies) - logger.info { "Starting result processing" } + logger.info { "Writing results to database" } - val result = resultProcessor.process(id) - manager.finish(id, result) + manager.finish(id, results) logger.info { "Successfully finished scenario $id" } } catch (e: Exception) { diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt deleted file mode 100644 index 7f122cf7..00000000 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.runner.web - -import org.apache.spark.sql.Column -import org.apache.spark.sql.Dataset -import org.apache.spark.sql.Row -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.functions.* -import org.bson.types.ObjectId -import java.io.File - -/** - * A helper class for processing the experiment results using Apache Spark. - */ -public class ResultProcessor(private val master: String, private val outputPath: File) { - /** - * Process the results of the scenario with the given [id]. - */ - public fun process(id: ObjectId): Result { - val spark = SparkSession.builder() - .master(master) - .appName("opendc-simulator-$id") - .config("spark.driver.bindAddress", "0.0.0.0") // Needed to allow the worker to connect to driver - .orCreate - - try { - val hostMetrics = spark.read().parquet(File(outputPath, "host-metrics/scenario_id=$id").path) - val provisionerMetrics = spark.read().parquet(File(outputPath, "provisioner-metrics/scenario_id=$id").path) - val res = aggregate(hostMetrics, provisionerMetrics).collectAsList() - - if (res.isEmpty()) { - return Result( - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList(), - emptyList() - ) - } else { - val head = res.first() - return Result( - head.getList<Long>(1), - head.getList<Long>(2), - head.getList<Long>(3), - head.getList<Long>(4), - head.getList<Double>(5), - head.getList<Double>(6), - head.getList<Double>(7), - head.getList<Int>(8), - head.getList<Long>(9), - head.getList<Long>(10), - head.getList<Long>(11), - head.getList<Int>(12), - head.getList<Int>(13), - head.getList<Int>(14), - head.getList<Int>(15) - ) - } - } finally { - spark.close() - } - } - - public data class Result( - public val totalRequestedBurst: List<Long>, - public val totalGrantedBurst: List<Long>, - public val totalOvercommittedBurst: List<Long>, - public val totalInterferedBurst: List<Long>, - public val meanCpuUsage: List<Double>, - public val meanCpuDemand: List<Double>, - public val meanNumDeployedImages: List<Double>, - public val maxNumDeployedImages: List<Int>, - public val totalPowerDraw: List<Long>, - public val totalFailureSlices: List<Long>, - public val totalFailureVmSlices: List<Long>, - public val totalVmsSubmitted: List<Int>, - public val totalVmsQueued: List<Int>, - public val totalVmsFinished: List<Int>, - public val totalVmsFailed: List<Int> - ) - - /** - * Perform aggregation of the experiment results. - */ - private fun aggregate(hostMetrics: Dataset<Row>, provisionerMetrics: Dataset<Row>): Dataset<Row> { - // Extrapolate the duration of the entries to span the entire trace - val hostMetricsExtra = hostMetrics - .withColumn("slice_counts", floor(col("duration") / lit(sliceLength))) - .withColumn("power_draw", col("power_draw") * col("slice_counts")) - .withColumn("state_int", states[col("state")]) - .withColumn("state_opposite_int", oppositeStates[col("state")]) - .withColumn("cpu_usage", col("cpu_usage") * col("slice_counts") * col("state_opposite_int")) - .withColumn("cpu_demand", col("cpu_demand") * col("slice_counts")) - .withColumn("failure_slice_count", col("slice_counts") * col("state_int")) - .withColumn("failure_vm_slice_count", col("slice_counts") * col("state_int") * col("vm_count")) - - // Process all data in a single run - val hostMetricsGrouped = hostMetricsExtra.groupBy("run_id") - - // Aggregate the summed total metrics - val systemMetrics = hostMetricsGrouped.agg( - sum("requested_burst").alias("total_requested_burst"), - sum("granted_burst").alias("total_granted_burst"), - sum("overcommissioned_burst").alias("total_overcommitted_burst"), - sum("interfered_burst").alias("total_interfered_burst"), - sum("power_draw").alias("total_power_draw"), - sum("failure_slice_count").alias("total_failure_slices"), - sum("failure_vm_slice_count").alias("total_failure_vm_slices") - ) - - // Aggregate metrics per host - val hvMetrics = hostMetrics - .groupBy("run_id", "host_id") - .agg( - sum("cpu_usage").alias("mean_cpu_usage"), - sum("cpu_demand").alias("mean_cpu_demand"), - avg("vm_count").alias("mean_num_deployed_images"), - count(lit(1)).alias("num_rows") - ) - .withColumn("mean_cpu_usage", col("mean_cpu_usage") / col("num_rows")) - .withColumn("mean_cpu_demand", col("mean_cpu_demand") / col("num_rows")) - .groupBy("run_id") - .agg( - avg("mean_cpu_usage").alias("mean_cpu_usage"), - avg("mean_cpu_demand").alias("mean_cpu_demand"), - avg("mean_num_deployed_images").alias("mean_num_deployed_images"), - max("mean_num_deployed_images").alias("max_num_deployed_images") - ) - - // Group the provisioner metrics per run - val provisionerMetricsGrouped = provisionerMetrics.groupBy("run_id") - - // Aggregate the provisioner metrics - val provisionerMetricsAggregated = provisionerMetricsGrouped.agg( - max("vm_total_count").alias("total_vms_submitted"), - max("vm_waiting_count").alias("total_vms_queued"), - max("vm_active_count").alias("total_vms_running"), - max("vm_inactive_count").alias("total_vms_finished"), - max("vm_failed_count").alias("total_vms_failed") - ) - - // Join the results into a single data frame - return systemMetrics - .join(hvMetrics, "run_id") - .join(provisionerMetricsAggregated, "run_id") - .select( - col("total_requested_burst"), - col("total_granted_burst"), - col("total_overcommitted_burst"), - col("total_interfered_burst"), - col("mean_cpu_usage"), - col("mean_cpu_demand"), - col("mean_num_deployed_images"), - col("max_num_deployed_images"), - col("total_power_draw"), - col("total_failure_slices"), - col("total_failure_vm_slices"), - col("total_vms_submitted"), - col("total_vms_queued"), - col("total_vms_finished"), - col("total_vms_failed") - ) - .groupBy(lit(1)) - .agg( - // TODO Check if order of values is correct - collect_list(col("total_requested_burst")).alias("total_requested_burst"), - collect_list(col("total_granted_burst")).alias("total_granted_burst"), - collect_list(col("total_overcommitted_burst")).alias("total_overcommitted_burst"), - collect_list(col("total_interfered_burst")).alias("total_interfered_burst"), - collect_list(col("mean_cpu_usage")).alias("mean_cpu_usage"), - collect_list(col("mean_cpu_demand")).alias("mean_cpu_demand"), - collect_list(col("mean_num_deployed_images")).alias("mean_num_deployed_images"), - collect_list(col("max_num_deployed_images")).alias("max_num_deployed_images"), - collect_list(col("total_power_draw")).alias("total_power_draw"), - collect_list(col("total_failure_slices")).alias("total_failure_slices"), - collect_list(col("total_failure_vm_slices")).alias("total_failure_vm_slices"), - collect_list(col("total_vms_submitted")).alias("total_vms_submitted"), - collect_list(col("total_vms_queued")).alias("total_vms_queued"), - collect_list(col("total_vms_finished")).alias("total_vms_finished"), - collect_list(col("total_vms_failed")).alias("total_vms_failed") - ) - } - - // Spark helper functions - private operator fun Column.times(other: Column): Column = `$times`(other) - private operator fun Column.div(other: Column): Column = `$div`(other) - private operator fun Column.get(other: Column): Column = this.apply(other) - - private val sliceLength = 5 * 60 * 1000 - private val states = map( - lit("ERROR"), - lit(1), - lit("ACTIVE"), - lit(0), - lit("SHUTOFF"), - lit(0) - ) - private val oppositeStates = map( - lit("ERROR"), - lit(0), - lit("ACTIVE"), - lit(1), - lit("SHUTOFF"), - lit(1) - ) -} diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt index 14f66757..a3907051 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt @@ -88,28 +88,27 @@ public class ScenarioManager(private val collection: MongoCollection<Document>) /** * Persist the specified results. */ - public fun finish(id: ObjectId, result: ResultProcessor.Result) { + public fun finish(id: ObjectId, results: List<WebExperimentMonitor.Result>) { collection.findOneAndUpdate( Filters.eq("_id", id), Updates.combine( Updates.set("simulation.state", "FINISHED"), Updates.unset("simulation.time"), - Updates.set("results.total_requested_burst", result.totalRequestedBurst), - Updates.set("results.total_granted_burst", result.totalGrantedBurst), - Updates.set("results.total_overcommitted_burst", result.totalOvercommittedBurst), - Updates.set("results.total_interfered_burst", result.totalInterferedBurst), - Updates.set("results.mean_cpu_usage", result.meanCpuUsage), - Updates.set("results.mean_cpu_demand", result.meanCpuDemand), - Updates.set("results.mean_num_deployed_images", result.meanNumDeployedImages), - Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), - Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages), - Updates.set("results.total_power_draw", result.totalPowerDraw), - Updates.set("results.total_failure_slices", result.totalFailureSlices), - Updates.set("results.total_failure_vm_slices", result.totalFailureVmSlices), - Updates.set("results.total_vms_submitted", result.totalVmsSubmitted), - Updates.set("results.total_vms_queued", result.totalVmsQueued), - Updates.set("results.total_vms_finished", result.totalVmsFinished), - Updates.set("results.total_vms_failed", result.totalVmsFailed) + Updates.set("results.total_requested_burst", results.map { it.totalRequestedBurst }), + Updates.set("results.total_granted_burst", results.map { it.totalGrantedBurst }), + Updates.set("results.total_overcommitted_burst", results.map { it.totalOvercommittedBurst }), + Updates.set("results.total_interfered_burst", results.map { it.totalInterferedBurst }), + Updates.set("results.mean_cpu_usage", results.map { it.meanCpuUsage }), + Updates.set("results.mean_cpu_demand", results.map { it.meanCpuDemand }), + Updates.set("results.mean_num_deployed_images", results.map { it.meanNumDeployedImages }), + Updates.set("results.max_num_deployed_images", results.map { it.maxNumDeployedImages }), + Updates.set("results.total_power_draw", results.map { it.totalPowerDraw }), + Updates.set("results.total_failure_slices", results.map { it.totalFailureSlices }), + Updates.set("results.total_failure_vm_slices", results.map { it.totalFailureVmSlices }), + Updates.set("results.total_vms_submitted", results.map { it.totalVmsSubmitted }), + Updates.set("results.total_vms_queued", results.map { it.totalVmsQueued }), + Updates.set("results.total_vms_finished", results.map { it.totalVmsFinished }), + Updates.set("results.total_vms_failed", results.map { it.totalVmsFailed }) ) ) } diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt new file mode 100644 index 00000000..7ef25552 --- /dev/null +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt @@ -0,0 +1,273 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.runner.web + +import mu.KotlinLogging +import org.opendc.compute.core.Server +import org.opendc.compute.core.ServerState +import org.opendc.compute.core.virt.driver.VirtDriver +import org.opendc.compute.core.virt.service.VirtProvisioningEvent +import org.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor +import org.opendc.experiments.sc20.telemetry.HostEvent +import kotlin.math.max + +/** + * An [ExperimentMonitor] that tracks the aggregate metrics for each repeat. + */ +public class WebExperimentMonitor : ExperimentMonitor { + private val logger = KotlinLogging.logger {} + private val currentHostEvent = mutableMapOf<Server, HostEvent>() + private var startTime = -1L + + override fun reportVmStateChange(time: Long, server: Server) { + if (startTime < 0) { + startTime = time + + // Update timestamp of initial event + currentHostEvent.replaceAll { _, v -> v.copy(timestamp = startTime) } + } + } + + override fun reportHostStateChange( + time: Long, + driver: VirtDriver, + server: Server + ) { + logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" } + + val previousEvent = currentHostEvent[server] + + val roundedTime = previousEvent?.let { + val duration = time - it.timestamp + val k = 5 * 60 * 1000L // 5 min in ms + val rem = duration % k + + if (rem == 0L) { + time + } else { + it.timestamp + duration + k - rem + } + } ?: time + + reportHostSlice( + roundedTime, + 0, + 0, + 0, + 0, + 0.0, + 0.0, + 0, + server + ) + } + + private val lastPowerConsumption = mutableMapOf<Server, Double>() + + override fun reportPowerConsumption(host: Server, draw: Double) { + lastPowerConsumption[host] = draw + } + + override fun reportHostSlice( + time: Long, + requestedBurst: Long, + grantedBurst: Long, + overcommissionedBurst: Long, + interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, + numberOfDeployedImages: Int, + hostServer: Server, + duration: Long + ) { + val previousEvent = currentHostEvent[hostServer] + when { + previousEvent == null -> { + val event = HostEvent( + time, + 5 * 60 * 1000L, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount + ) + + currentHostEvent[hostServer] = event + } + previousEvent.timestamp == time -> { + val event = HostEvent( + time, + previousEvent.duration, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount + ) + + currentHostEvent[hostServer] = event + } + else -> { + processHostEvent(previousEvent) + + val event = HostEvent( + time, + time - previousEvent.timestamp, + hostServer, + numberOfDeployedImages, + requestedBurst, + grantedBurst, + overcommissionedBurst, + interferedBurst, + cpuUsage, + cpuDemand, + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount + ) + + currentHostEvent[hostServer] = event + } + } + } + + private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() + private val hostMetrics: MutableMap<Server, HostMetrics> = mutableMapOf() + + private fun processHostEvent(event: HostEvent) { + val slices = event.duration / SLICE_LENGTH + + hostAggregateMetrics = AggregateHostMetrics( + hostAggregateMetrics.totalRequestedBurst + event.requestedBurst, + hostAggregateMetrics.totalGrantedBurst + event.grantedBurst, + hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst, + hostAggregateMetrics.totalInterferedBurst + event.interferedBurst, + hostAggregateMetrics.totalPowerDraw + (slices * (event.powerDraw / 12)), + hostAggregateMetrics.totalFailureSlices + if (event.host.state != ServerState.ACTIVE) slices.toLong() else 0, + hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != ServerState.ACTIVE) event.vmCount * slices.toLong() else 0 + ) + + hostMetrics.compute(event.host) { key, prev -> + HostMetrics( + (event.cpuUsage.takeIf { event.host.state == ServerState.ACTIVE } ?: 0.0) + (prev?.cpuUsage ?: 0.0), + (event.cpuDemand.takeIf { event.host.state == ServerState.ACTIVE } ?: 0.0) + (prev?.cpuDemand ?: 0.0), + event.vmCount + (prev?.vmCount ?: 0), + 1 + (prev?.count ?: 0) + ) + } + } + + private val SLICE_LENGTH: Long = 5 * 60 * 1000 + + public data class AggregateHostMetrics( + val totalRequestedBurst: Long = 0, + val totalGrantedBurst: Long = 0, + val totalOvercommittedBurst: Long = 0, + val totalInterferedBurst: Long = 0, + val totalPowerDraw: Double = 0.0, + val totalFailureSlices: Long = 0, + val totalFailureVmSlices: Long = 0, + ) + + public data class HostMetrics( + val cpuUsage: Double, + val cpuDemand: Double, + val vmCount: Long, + val count: Long + ) + + private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics() + + override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) { + provisionerMetrics = AggregateProvisionerMetrics( + max(event.totalVmCount, provisionerMetrics.vmTotalCount), + max(event.waitingVmCount, provisionerMetrics.vmWaitingCount), + max(event.activeVmCount, provisionerMetrics.vmActiveCount), + max(event.inactiveVmCount, provisionerMetrics.vmInactiveCount), + max(event.failedVmCount, provisionerMetrics.vmFailedCount), + ) + } + + public data class AggregateProvisionerMetrics( + val vmTotalCount: Int = 0, + val vmWaitingCount: Int = 0, + val vmActiveCount: Int = 0, + val vmInactiveCount: Int = 0, + val vmFailedCount: Int = 0 + ) + + override fun close() { + for ((_, event) in currentHostEvent) { + processHostEvent(event) + } + currentHostEvent.clear() + } + + public fun getResult(): Result { + return Result( + hostAggregateMetrics.totalRequestedBurst, + hostAggregateMetrics.totalGrantedBurst, + hostAggregateMetrics.totalOvercommittedBurst, + hostAggregateMetrics.totalInterferedBurst, + hostMetrics.map { it.value.cpuUsage / it.value.count }.average(), + hostMetrics.map { it.value.cpuDemand / it.value.count }.average(), + hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.average(), + hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, + hostAggregateMetrics.totalPowerDraw, + hostAggregateMetrics.totalFailureSlices, + hostAggregateMetrics.totalFailureVmSlices, + provisionerMetrics.vmTotalCount, + provisionerMetrics.vmWaitingCount, + provisionerMetrics.vmInactiveCount, + provisionerMetrics.vmFailedCount, + ) + } + + public data class Result( + public val totalRequestedBurst: Long, + public val totalGrantedBurst: Long, + public val totalOvercommittedBurst: Long, + public val totalInterferedBurst: Long, + public val meanCpuUsage: Double, + public val meanCpuDemand: Double, + public val meanNumDeployedImages: Double, + public val maxNumDeployedImages: Double, + public val totalPowerDraw: Double, + public val totalFailureSlices: Long, + public val totalFailureVmSlices: Long, + public val totalVmsSubmitted: Int, + public val totalVmsQueued: Int, + public val totalVmsFinished: Int, + public val totalVmsFailed: Int + ) +} |
