summaryrefslogtreecommitdiff
path: root/simulator
diff options
context:
space:
mode:
Diffstat (limited to 'simulator')
-rw-r--r--simulator/opendc-runner-web/build.gradle.kts7
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt49
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt237
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt33
-rw-r--r--simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt273
-rw-r--r--simulator/opendc-runner-web/src/main/resources/log4j2.xml8
6 files changed, 304 insertions, 303 deletions
diff --git a/simulator/opendc-runner-web/build.gradle.kts b/simulator/opendc-runner-web/build.gradle.kts
index cf437843..a46e430f 100644
--- a/simulator/opendc-runner-web/build.gradle.kts
+++ b/simulator/opendc-runner-web/build.gradle.kts
@@ -42,12 +42,11 @@ dependencies {
implementation("com.github.ajalt:clikt:2.8.0")
implementation("io.github.microutils:kotlin-logging:1.7.10")
+ implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") {
+ exclude("org.jetbrains.kotlin", module = "kotlin-reflect")
+ }
implementation("org.mongodb:mongodb-driver-sync:4.0.5")
- implementation("org.apache.spark:spark-sql_2.12:3.0.0") {
- exclude(group = "org.slf4j", module = "slf4j-log4j12")
- exclude(group = "log4j")
- }
runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.13.1")
runtimeOnly("org.apache.logging.log4j:log4j-1.2-api:2.13.1")
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
index 0b96db3d..80b3bb20 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/Main.kt
@@ -44,7 +44,6 @@ import org.opendc.experiments.sc20.experiment.attachMonitor
import org.opendc.experiments.sc20.experiment.createFailureDomain
import org.opendc.experiments.sc20.experiment.createProvisioner
import org.opendc.experiments.sc20.experiment.model.Workload
-import org.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor
import org.opendc.experiments.sc20.experiment.processTrace
import org.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
import org.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
@@ -124,27 +123,6 @@ public class RunnerCli : CliktCommand(name = "runner") {
.defaultLazy { File("traces/") }
/**
- * The path to the output directory.
- */
- private val outputPath by option(
- "--output",
- help = "path to the results directory",
- envvar = "OPENDC_OUTPUT"
- )
- .file(canBeFile = false)
- .defaultLazy { File("results/") }
-
- /**
- * The Spark master to connect to.
- */
- private val spark by option(
- "--spark",
- help = "Spark master to connect to",
- envvar = "OPENDC_SPARK"
- )
- .default("local[*]")
-
- /**
* Connect to the user-specified database.
*/
private fun createDatabase(): MongoDatabase {
@@ -165,7 +143,7 @@ public class RunnerCli : CliktCommand(name = "runner") {
/**
* Run a single scenario.
*/
- private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection<Document>) {
+ private suspend fun runScenario(portfolio: Document, scenario: Document, topologies: MongoCollection<Document>): List<WebExperimentMonitor.Result> {
val id = scenario.getObjectId("_id")
logger.info { "Constructing performance interference model" }
@@ -189,12 +167,14 @@ public class RunnerCli : CliktCommand(name = "runner") {
val targets = portfolio.get("targets", Document::class.java)
- repeat(targets.getInteger("repeatsPerScenario")) {
+ val results = (0..targets.getInteger("repeatsPerScenario") - 1).map {
logger.info { "Starting repeat $it" }
runRepeat(scenario, it, topologies, traceReader, performanceInterferenceReader)
}
logger.info { "Finished simulation for scenario $id" }
+
+ return results
}
/**
@@ -206,8 +186,7 @@ public class RunnerCli : CliktCommand(name = "runner") {
topologies: MongoCollection<Document>,
traceReader: Sc20RawParquetTraceReader,
performanceInterferenceReader: Sc20PerformanceInterferenceReader?
- ) {
- val id = scenario.getObjectId("_id")
+ ): WebExperimentMonitor.Result {
val seed = repeat
val traceDocument = scenario.get("trace", Document::class.java)
val workloadName = traceDocument.getString("traceId")
@@ -243,11 +222,7 @@ public class RunnerCli : CliktCommand(name = "runner") {
)
val topologyId = scenario.getEmbedded(listOf("topology", "topologyId"), ObjectId::class.java)
val environment = TopologyParser(topologies, topologyId)
- val monitor = ParquetExperimentMonitor(
- outputPath,
- "scenario_id=$id/run_id=$repeat",
- 4096
- )
+ val monitor = WebExperimentMonitor()
testScope.launch {
val (bareMetalProvisioner, scheduler) = createProvisioner(
@@ -297,6 +272,8 @@ public class RunnerCli : CliktCommand(name = "runner") {
} finally {
monitor.close()
}
+
+ return monitor.getResult()
}
private val POLL_INTERVAL = 5000L // ms = 5 s
@@ -310,9 +287,6 @@ public class RunnerCli : CliktCommand(name = "runner") {
val portfolios = database.getCollection("portfolios")
val topologies = database.getCollection("topologies")
- logger.info { "Launching Spark" }
- val resultProcessor = ResultProcessor(spark, outputPath)
-
logger.info { "Watching for queued scenarios" }
while (true) {
@@ -343,12 +317,11 @@ public class RunnerCli : CliktCommand(name = "runner") {
try {
val portfolio = portfolios.find(Filters.eq("_id", scenario.getObjectId("portfolioId"))).first()!!
- runScenario(portfolio, scenario, topologies)
+ val results = runScenario(portfolio, scenario, topologies)
- logger.info { "Starting result processing" }
+ logger.info { "Writing results to database" }
- val result = resultProcessor.process(id)
- manager.finish(id, result)
+ manager.finish(id, results)
logger.info { "Successfully finished scenario $id" }
} catch (e: Exception) {
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt
deleted file mode 100644
index 7f122cf7..00000000
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ResultProcessor.kt
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.runner.web
-
-import org.apache.spark.sql.Column
-import org.apache.spark.sql.Dataset
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.functions.*
-import org.bson.types.ObjectId
-import java.io.File
-
-/**
- * A helper class for processing the experiment results using Apache Spark.
- */
-public class ResultProcessor(private val master: String, private val outputPath: File) {
- /**
- * Process the results of the scenario with the given [id].
- */
- public fun process(id: ObjectId): Result {
- val spark = SparkSession.builder()
- .master(master)
- .appName("opendc-simulator-$id")
- .config("spark.driver.bindAddress", "0.0.0.0") // Needed to allow the worker to connect to driver
- .orCreate
-
- try {
- val hostMetrics = spark.read().parquet(File(outputPath, "host-metrics/scenario_id=$id").path)
- val provisionerMetrics = spark.read().parquet(File(outputPath, "provisioner-metrics/scenario_id=$id").path)
- val res = aggregate(hostMetrics, provisionerMetrics).collectAsList()
-
- if (res.isEmpty()) {
- return Result(
- emptyList(),
- emptyList(),
- emptyList(),
- emptyList(),
- emptyList(),
- emptyList(),
- emptyList(),
- emptyList(),
- emptyList(),
- emptyList(),
- emptyList(),
- emptyList(),
- emptyList(),
- emptyList(),
- emptyList()
- )
- } else {
- val head = res.first()
- return Result(
- head.getList<Long>(1),
- head.getList<Long>(2),
- head.getList<Long>(3),
- head.getList<Long>(4),
- head.getList<Double>(5),
- head.getList<Double>(6),
- head.getList<Double>(7),
- head.getList<Int>(8),
- head.getList<Long>(9),
- head.getList<Long>(10),
- head.getList<Long>(11),
- head.getList<Int>(12),
- head.getList<Int>(13),
- head.getList<Int>(14),
- head.getList<Int>(15)
- )
- }
- } finally {
- spark.close()
- }
- }
-
- public data class Result(
- public val totalRequestedBurst: List<Long>,
- public val totalGrantedBurst: List<Long>,
- public val totalOvercommittedBurst: List<Long>,
- public val totalInterferedBurst: List<Long>,
- public val meanCpuUsage: List<Double>,
- public val meanCpuDemand: List<Double>,
- public val meanNumDeployedImages: List<Double>,
- public val maxNumDeployedImages: List<Int>,
- public val totalPowerDraw: List<Long>,
- public val totalFailureSlices: List<Long>,
- public val totalFailureVmSlices: List<Long>,
- public val totalVmsSubmitted: List<Int>,
- public val totalVmsQueued: List<Int>,
- public val totalVmsFinished: List<Int>,
- public val totalVmsFailed: List<Int>
- )
-
- /**
- * Perform aggregation of the experiment results.
- */
- private fun aggregate(hostMetrics: Dataset<Row>, provisionerMetrics: Dataset<Row>): Dataset<Row> {
- // Extrapolate the duration of the entries to span the entire trace
- val hostMetricsExtra = hostMetrics
- .withColumn("slice_counts", floor(col("duration") / lit(sliceLength)))
- .withColumn("power_draw", col("power_draw") * col("slice_counts"))
- .withColumn("state_int", states[col("state")])
- .withColumn("state_opposite_int", oppositeStates[col("state")])
- .withColumn("cpu_usage", col("cpu_usage") * col("slice_counts") * col("state_opposite_int"))
- .withColumn("cpu_demand", col("cpu_demand") * col("slice_counts"))
- .withColumn("failure_slice_count", col("slice_counts") * col("state_int"))
- .withColumn("failure_vm_slice_count", col("slice_counts") * col("state_int") * col("vm_count"))
-
- // Process all data in a single run
- val hostMetricsGrouped = hostMetricsExtra.groupBy("run_id")
-
- // Aggregate the summed total metrics
- val systemMetrics = hostMetricsGrouped.agg(
- sum("requested_burst").alias("total_requested_burst"),
- sum("granted_burst").alias("total_granted_burst"),
- sum("overcommissioned_burst").alias("total_overcommitted_burst"),
- sum("interfered_burst").alias("total_interfered_burst"),
- sum("power_draw").alias("total_power_draw"),
- sum("failure_slice_count").alias("total_failure_slices"),
- sum("failure_vm_slice_count").alias("total_failure_vm_slices")
- )
-
- // Aggregate metrics per host
- val hvMetrics = hostMetrics
- .groupBy("run_id", "host_id")
- .agg(
- sum("cpu_usage").alias("mean_cpu_usage"),
- sum("cpu_demand").alias("mean_cpu_demand"),
- avg("vm_count").alias("mean_num_deployed_images"),
- count(lit(1)).alias("num_rows")
- )
- .withColumn("mean_cpu_usage", col("mean_cpu_usage") / col("num_rows"))
- .withColumn("mean_cpu_demand", col("mean_cpu_demand") / col("num_rows"))
- .groupBy("run_id")
- .agg(
- avg("mean_cpu_usage").alias("mean_cpu_usage"),
- avg("mean_cpu_demand").alias("mean_cpu_demand"),
- avg("mean_num_deployed_images").alias("mean_num_deployed_images"),
- max("mean_num_deployed_images").alias("max_num_deployed_images")
- )
-
- // Group the provisioner metrics per run
- val provisionerMetricsGrouped = provisionerMetrics.groupBy("run_id")
-
- // Aggregate the provisioner metrics
- val provisionerMetricsAggregated = provisionerMetricsGrouped.agg(
- max("vm_total_count").alias("total_vms_submitted"),
- max("vm_waiting_count").alias("total_vms_queued"),
- max("vm_active_count").alias("total_vms_running"),
- max("vm_inactive_count").alias("total_vms_finished"),
- max("vm_failed_count").alias("total_vms_failed")
- )
-
- // Join the results into a single data frame
- return systemMetrics
- .join(hvMetrics, "run_id")
- .join(provisionerMetricsAggregated, "run_id")
- .select(
- col("total_requested_burst"),
- col("total_granted_burst"),
- col("total_overcommitted_burst"),
- col("total_interfered_burst"),
- col("mean_cpu_usage"),
- col("mean_cpu_demand"),
- col("mean_num_deployed_images"),
- col("max_num_deployed_images"),
- col("total_power_draw"),
- col("total_failure_slices"),
- col("total_failure_vm_slices"),
- col("total_vms_submitted"),
- col("total_vms_queued"),
- col("total_vms_finished"),
- col("total_vms_failed")
- )
- .groupBy(lit(1))
- .agg(
- // TODO Check if order of values is correct
- collect_list(col("total_requested_burst")).alias("total_requested_burst"),
- collect_list(col("total_granted_burst")).alias("total_granted_burst"),
- collect_list(col("total_overcommitted_burst")).alias("total_overcommitted_burst"),
- collect_list(col("total_interfered_burst")).alias("total_interfered_burst"),
- collect_list(col("mean_cpu_usage")).alias("mean_cpu_usage"),
- collect_list(col("mean_cpu_demand")).alias("mean_cpu_demand"),
- collect_list(col("mean_num_deployed_images")).alias("mean_num_deployed_images"),
- collect_list(col("max_num_deployed_images")).alias("max_num_deployed_images"),
- collect_list(col("total_power_draw")).alias("total_power_draw"),
- collect_list(col("total_failure_slices")).alias("total_failure_slices"),
- collect_list(col("total_failure_vm_slices")).alias("total_failure_vm_slices"),
- collect_list(col("total_vms_submitted")).alias("total_vms_submitted"),
- collect_list(col("total_vms_queued")).alias("total_vms_queued"),
- collect_list(col("total_vms_finished")).alias("total_vms_finished"),
- collect_list(col("total_vms_failed")).alias("total_vms_failed")
- )
- }
-
- // Spark helper functions
- private operator fun Column.times(other: Column): Column = `$times`(other)
- private operator fun Column.div(other: Column): Column = `$div`(other)
- private operator fun Column.get(other: Column): Column = this.apply(other)
-
- private val sliceLength = 5 * 60 * 1000
- private val states = map(
- lit("ERROR"),
- lit(1),
- lit("ACTIVE"),
- lit(0),
- lit("SHUTOFF"),
- lit(0)
- )
- private val oppositeStates = map(
- lit("ERROR"),
- lit(0),
- lit("ACTIVE"),
- lit(1),
- lit("SHUTOFF"),
- lit(1)
- )
-}
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt
index 14f66757..a3907051 100644
--- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/ScenarioManager.kt
@@ -88,28 +88,27 @@ public class ScenarioManager(private val collection: MongoCollection<Document>)
/**
* Persist the specified results.
*/
- public fun finish(id: ObjectId, result: ResultProcessor.Result) {
+ public fun finish(id: ObjectId, results: List<WebExperimentMonitor.Result>) {
collection.findOneAndUpdate(
Filters.eq("_id", id),
Updates.combine(
Updates.set("simulation.state", "FINISHED"),
Updates.unset("simulation.time"),
- Updates.set("results.total_requested_burst", result.totalRequestedBurst),
- Updates.set("results.total_granted_burst", result.totalGrantedBurst),
- Updates.set("results.total_overcommitted_burst", result.totalOvercommittedBurst),
- Updates.set("results.total_interfered_burst", result.totalInterferedBurst),
- Updates.set("results.mean_cpu_usage", result.meanCpuUsage),
- Updates.set("results.mean_cpu_demand", result.meanCpuDemand),
- Updates.set("results.mean_num_deployed_images", result.meanNumDeployedImages),
- Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages),
- Updates.set("results.max_num_deployed_images", result.maxNumDeployedImages),
- Updates.set("results.total_power_draw", result.totalPowerDraw),
- Updates.set("results.total_failure_slices", result.totalFailureSlices),
- Updates.set("results.total_failure_vm_slices", result.totalFailureVmSlices),
- Updates.set("results.total_vms_submitted", result.totalVmsSubmitted),
- Updates.set("results.total_vms_queued", result.totalVmsQueued),
- Updates.set("results.total_vms_finished", result.totalVmsFinished),
- Updates.set("results.total_vms_failed", result.totalVmsFailed)
+ Updates.set("results.total_requested_burst", results.map { it.totalRequestedBurst }),
+ Updates.set("results.total_granted_burst", results.map { it.totalGrantedBurst }),
+ Updates.set("results.total_overcommitted_burst", results.map { it.totalOvercommittedBurst }),
+ Updates.set("results.total_interfered_burst", results.map { it.totalInterferedBurst }),
+ Updates.set("results.mean_cpu_usage", results.map { it.meanCpuUsage }),
+ Updates.set("results.mean_cpu_demand", results.map { it.meanCpuDemand }),
+ Updates.set("results.mean_num_deployed_images", results.map { it.meanNumDeployedImages }),
+ Updates.set("results.max_num_deployed_images", results.map { it.maxNumDeployedImages }),
+ Updates.set("results.total_power_draw", results.map { it.totalPowerDraw }),
+ Updates.set("results.total_failure_slices", results.map { it.totalFailureSlices }),
+ Updates.set("results.total_failure_vm_slices", results.map { it.totalFailureVmSlices }),
+ Updates.set("results.total_vms_submitted", results.map { it.totalVmsSubmitted }),
+ Updates.set("results.total_vms_queued", results.map { it.totalVmsQueued }),
+ Updates.set("results.total_vms_finished", results.map { it.totalVmsFinished }),
+ Updates.set("results.total_vms_failed", results.map { it.totalVmsFailed })
)
)
}
diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
new file mode 100644
index 00000000..7ef25552
--- /dev/null
+++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt
@@ -0,0 +1,273 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.runner.web
+
+import mu.KotlinLogging
+import org.opendc.compute.core.Server
+import org.opendc.compute.core.ServerState
+import org.opendc.compute.core.virt.driver.VirtDriver
+import org.opendc.compute.core.virt.service.VirtProvisioningEvent
+import org.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor
+import org.opendc.experiments.sc20.telemetry.HostEvent
+import kotlin.math.max
+
+/**
+ * An [ExperimentMonitor] that tracks the aggregate metrics for each repeat.
+ */
+public class WebExperimentMonitor : ExperimentMonitor {
+ private val logger = KotlinLogging.logger {}
+ private val currentHostEvent = mutableMapOf<Server, HostEvent>()
+ private var startTime = -1L
+
+ override fun reportVmStateChange(time: Long, server: Server) {
+ if (startTime < 0) {
+ startTime = time
+
+ // Update timestamp of initial event
+ currentHostEvent.replaceAll { _, v -> v.copy(timestamp = startTime) }
+ }
+ }
+
+ override fun reportHostStateChange(
+ time: Long,
+ driver: VirtDriver,
+ server: Server
+ ) {
+ logger.debug { "Host ${server.uid} changed state ${server.state} [$time]" }
+
+ val previousEvent = currentHostEvent[server]
+
+ val roundedTime = previousEvent?.let {
+ val duration = time - it.timestamp
+ val k = 5 * 60 * 1000L // 5 min in ms
+ val rem = duration % k
+
+ if (rem == 0L) {
+ time
+ } else {
+ it.timestamp + duration + k - rem
+ }
+ } ?: time
+
+ reportHostSlice(
+ roundedTime,
+ 0,
+ 0,
+ 0,
+ 0,
+ 0.0,
+ 0.0,
+ 0,
+ server
+ )
+ }
+
+ private val lastPowerConsumption = mutableMapOf<Server, Double>()
+
+ override fun reportPowerConsumption(host: Server, draw: Double) {
+ lastPowerConsumption[host] = draw
+ }
+
+ override fun reportHostSlice(
+ time: Long,
+ requestedBurst: Long,
+ grantedBurst: Long,
+ overcommissionedBurst: Long,
+ interferedBurst: Long,
+ cpuUsage: Double,
+ cpuDemand: Double,
+ numberOfDeployedImages: Int,
+ hostServer: Server,
+ duration: Long
+ ) {
+ val previousEvent = currentHostEvent[hostServer]
+ when {
+ previousEvent == null -> {
+ val event = HostEvent(
+ time,
+ 5 * 60 * 1000L,
+ hostServer,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ lastPowerConsumption[hostServer] ?: 200.0,
+ hostServer.flavor.cpuCount
+ )
+
+ currentHostEvent[hostServer] = event
+ }
+ previousEvent.timestamp == time -> {
+ val event = HostEvent(
+ time,
+ previousEvent.duration,
+ hostServer,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ lastPowerConsumption[hostServer] ?: 200.0,
+ hostServer.flavor.cpuCount
+ )
+
+ currentHostEvent[hostServer] = event
+ }
+ else -> {
+ processHostEvent(previousEvent)
+
+ val event = HostEvent(
+ time,
+ time - previousEvent.timestamp,
+ hostServer,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ lastPowerConsumption[hostServer] ?: 200.0,
+ hostServer.flavor.cpuCount
+ )
+
+ currentHostEvent[hostServer] = event
+ }
+ }
+ }
+
+ private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
+ private val hostMetrics: MutableMap<Server, HostMetrics> = mutableMapOf()
+
+ private fun processHostEvent(event: HostEvent) {
+ val slices = event.duration / SLICE_LENGTH
+
+ hostAggregateMetrics = AggregateHostMetrics(
+ hostAggregateMetrics.totalRequestedBurst + event.requestedBurst,
+ hostAggregateMetrics.totalGrantedBurst + event.grantedBurst,
+ hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst,
+ hostAggregateMetrics.totalInterferedBurst + event.interferedBurst,
+ hostAggregateMetrics.totalPowerDraw + (slices * (event.powerDraw / 12)),
+ hostAggregateMetrics.totalFailureSlices + if (event.host.state != ServerState.ACTIVE) slices.toLong() else 0,
+ hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != ServerState.ACTIVE) event.vmCount * slices.toLong() else 0
+ )
+
+ hostMetrics.compute(event.host) { key, prev ->
+ HostMetrics(
+ (event.cpuUsage.takeIf { event.host.state == ServerState.ACTIVE } ?: 0.0) + (prev?.cpuUsage ?: 0.0),
+ (event.cpuDemand.takeIf { event.host.state == ServerState.ACTIVE } ?: 0.0) + (prev?.cpuDemand ?: 0.0),
+ event.vmCount + (prev?.vmCount ?: 0),
+ 1 + (prev?.count ?: 0)
+ )
+ }
+ }
+
+ private val SLICE_LENGTH: Long = 5 * 60 * 1000
+
+ public data class AggregateHostMetrics(
+ val totalRequestedBurst: Long = 0,
+ val totalGrantedBurst: Long = 0,
+ val totalOvercommittedBurst: Long = 0,
+ val totalInterferedBurst: Long = 0,
+ val totalPowerDraw: Double = 0.0,
+ val totalFailureSlices: Long = 0,
+ val totalFailureVmSlices: Long = 0,
+ )
+
+ public data class HostMetrics(
+ val cpuUsage: Double,
+ val cpuDemand: Double,
+ val vmCount: Long,
+ val count: Long
+ )
+
+ private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics()
+
+ override fun reportProvisionerMetrics(time: Long, event: VirtProvisioningEvent.MetricsAvailable) {
+ provisionerMetrics = AggregateProvisionerMetrics(
+ max(event.totalVmCount, provisionerMetrics.vmTotalCount),
+ max(event.waitingVmCount, provisionerMetrics.vmWaitingCount),
+ max(event.activeVmCount, provisionerMetrics.vmActiveCount),
+ max(event.inactiveVmCount, provisionerMetrics.vmInactiveCount),
+ max(event.failedVmCount, provisionerMetrics.vmFailedCount),
+ )
+ }
+
+ public data class AggregateProvisionerMetrics(
+ val vmTotalCount: Int = 0,
+ val vmWaitingCount: Int = 0,
+ val vmActiveCount: Int = 0,
+ val vmInactiveCount: Int = 0,
+ val vmFailedCount: Int = 0
+ )
+
+ override fun close() {
+ for ((_, event) in currentHostEvent) {
+ processHostEvent(event)
+ }
+ currentHostEvent.clear()
+ }
+
+ public fun getResult(): Result {
+ return Result(
+ hostAggregateMetrics.totalRequestedBurst,
+ hostAggregateMetrics.totalGrantedBurst,
+ hostAggregateMetrics.totalOvercommittedBurst,
+ hostAggregateMetrics.totalInterferedBurst,
+ hostMetrics.map { it.value.cpuUsage / it.value.count }.average(),
+ hostMetrics.map { it.value.cpuDemand / it.value.count }.average(),
+ hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.average(),
+ hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0,
+ hostAggregateMetrics.totalPowerDraw,
+ hostAggregateMetrics.totalFailureSlices,
+ hostAggregateMetrics.totalFailureVmSlices,
+ provisionerMetrics.vmTotalCount,
+ provisionerMetrics.vmWaitingCount,
+ provisionerMetrics.vmInactiveCount,
+ provisionerMetrics.vmFailedCount,
+ )
+ }
+
+ public data class Result(
+ public val totalRequestedBurst: Long,
+ public val totalGrantedBurst: Long,
+ public val totalOvercommittedBurst: Long,
+ public val totalInterferedBurst: Long,
+ public val meanCpuUsage: Double,
+ public val meanCpuDemand: Double,
+ public val meanNumDeployedImages: Double,
+ public val maxNumDeployedImages: Double,
+ public val totalPowerDraw: Double,
+ public val totalFailureSlices: Long,
+ public val totalFailureVmSlices: Long,
+ public val totalVmsSubmitted: Int,
+ public val totalVmsQueued: Int,
+ public val totalVmsFinished: Int,
+ public val totalVmsFailed: Int
+ )
+}
diff --git a/simulator/opendc-runner-web/src/main/resources/log4j2.xml b/simulator/opendc-runner-web/src/main/resources/log4j2.xml
index ffc39890..87179332 100644
--- a/simulator/opendc-runner-web/src/main/resources/log4j2.xml
+++ b/simulator/opendc-runner-web/src/main/resources/log4j2.xml
@@ -30,18 +30,12 @@
</Console>
</Appenders>
<Loggers>
- <Logger name="org.opendc" level="info" additivity="false">
+ <Logger name="org.opendc" level="warn" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="org.opendc.runner" level="info" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
- <Logger name="org.apache.hadoop" level="warn" additivity="false">
- <AppenderRef ref="Console"/>
- </Logger>
- <Logger name="org.apache.spark" level="warn" additivity="false">
- <AppenderRef ref="Console"/>
- </Logger>
<Root level="error">
<AppenderRef ref="Console"/>
</Root>