summaryrefslogtreecommitdiff
path: root/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-07-02 17:52:12 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-07-05 11:52:03 +0200
commit6752b6d50faab447b3edc13bddf14f53401392f1 (patch)
tree953ed9998107f46d5892addc7266e39b3484fdfa /opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner
parentfa7ffd9d1594a5bc9dba4fc65af0a4100988341b (diff)
runner: Use public API for scheduling simulation jobs
This change updates the web runner to not require direct database access for scheduling simulation jobs. Instead, the runner polls the public REST API for available jobs and reports its results through there.
Diffstat (limited to 'opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner')
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt404
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt86
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt189
3 files changed, 679 insertions, 0 deletions
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
new file mode 100644
index 00000000..5b5ef802
--- /dev/null
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -0,0 +1,404 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.web.runner
+
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.parameters.options.*
+import com.github.ajalt.clikt.parameters.types.file
+import com.github.ajalt.clikt.parameters.types.long
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.Channel
+import mu.KotlinLogging
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.weights.*
+import org.opendc.experiments.capelin.*
+import org.opendc.experiments.capelin.model.Workload
+import org.opendc.experiments.capelin.trace.ParquetTraceReader
+import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
+import org.opendc.experiments.capelin.trace.RawParquetTraceReader
+import org.opendc.format.environment.EnvironmentReader
+import org.opendc.format.environment.MachineDef
+import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
+import org.opendc.simulator.compute.model.MachineModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.power.LinearPowerModel
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.sdk.toOtelClock
+import org.opendc.web.client.ApiClient
+import org.opendc.web.client.AuthConfiguration
+import org.opendc.web.client.model.Scenario
+import org.opendc.web.client.model.Topology
+import java.io.File
+import java.net.URI
+import java.util.*
+import kotlin.random.Random
+import kotlin.random.asJavaRandom
+import org.opendc.web.client.model.Portfolio as ClientPortfolio
+
+private val logger = KotlinLogging.logger {}
+
+/**
+ * Represents the CLI command for starting the OpenDC web runner.
+ */
+class RunnerCli : CliktCommand(name = "runner") {
+ /**
+ * The URL to the OpenDC API.
+ */
+ private val apiUrl by option(
+ "--api-url",
+ help = "url to the OpenDC API",
+ envvar = "OPENDC_API_URL"
+ )
+ .convert { URI(it) }
+ .default(URI("https://api.opendc.org/v2"))
+
+ /**
+ * The auth domain to use.
+ */
+ private val authDomain by option(
+ "--auth-domain",
+ help = "auth domain of the OpenDC API",
+ envvar = "AUTH0_DOMAIN"
+ )
+ .required()
+
+ /**
+ * The auth client ID to use.
+ */
+ private val authClientId by option(
+ "--auth-id",
+ help = "auth client id of the OpenDC API",
+ envvar = "AUTH0_CLIENT_ID"
+ )
+ .required()
+
+ /**
+ * The auth client secret to use.
+ */
+ private val authClientSecret by option(
+ "--auth-secret",
+ help = "auth client secret of the OpenDC API",
+ envvar = "AUTH0_CLIENT_SECRET"
+ )
+ .required()
+
+ /**
+ * The path to the traces directory.
+ */
+ private val tracePath by option(
+ "--traces",
+ help = "path to the directory containing the traces",
+ envvar = "OPENDC_TRACES"
+ )
+ .file(canBeFile = false)
+ .defaultLazy { File("traces/") }
+
+ /**
+ * The maximum duration of a single experiment run.
+ */
+ private val runTimeout by option(
+ "--run-timeout",
+ help = "maximum duration of experiment in seconds",
+ envvar = "OPENDC_RUN_TIMEOUT"
+ )
+ .long()
+ .default(60L * 3) // Experiment may run for a maximum of three minutes
+
+ /**
+ * Run a single scenario.
+ */
+ private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, environment: EnvironmentReader): List<WebExperimentMonitor.Result> {
+ val id = scenario.id
+
+ logger.info { "Constructing performance interference model" }
+
+ val traceDir = File(
+ tracePath,
+ scenario.trace.traceId
+ )
+ val traceReader = RawParquetTraceReader(traceDir)
+ val interferenceGroups = let {
+ val path = File(traceDir, "performance-interference-model.json")
+ val operational = scenario.operationalPhenomena
+ val enabled = operational.performanceInterferenceEnabled
+
+ if (!enabled || !path.exists()) {
+ return@let null
+ }
+
+ PerformanceInterferenceReader(path.inputStream()).use { reader -> reader.read() }
+ }
+
+ val targets = portfolio.targets
+ val results = (0 until targets.repeatsPerScenario).map { repeat ->
+ logger.info { "Starting repeat $repeat" }
+ withTimeout(runTimeout * 1000) {
+ val interferenceModel = interferenceGroups?.let { VmInterferenceModel(it, Random(repeat.toLong()).asJavaRandom()) }
+ runRepeat(scenario, repeat, environment, traceReader, interferenceModel)
+ }
+ }
+
+ logger.info { "Finished simulation for scenario $id" }
+
+ return results
+ }
+
+ /**
+ * Run a single repeat.
+ */
+ private suspend fun runRepeat(
+ scenario: Scenario,
+ repeat: Int,
+ environment: EnvironmentReader,
+ traceReader: RawParquetTraceReader,
+ interferenceModel: VmInterferenceModel?
+ ): WebExperimentMonitor.Result {
+ val monitor = WebExperimentMonitor()
+
+ try {
+ runBlockingSimulation {
+ val seed = repeat
+ val workloadName = scenario.trace.traceId
+ val workloadFraction = scenario.trace.loadSamplingFraction
+
+ val seeder = Random(seed)
+
+ val chan = Channel<Unit>(Channel.CONFLATED)
+
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+ val metricProducer = meterProvider as MetricProducer
+
+ val operational = scenario.operationalPhenomena
+ val allocationPolicy =
+ when (val policyName = operational.schedulerName) {
+ "mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(MemoryWeigher() to -1.0)
+ )
+ "mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(MemoryWeigher() to 1.0)
+ )
+ "core-mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(CoreMemoryWeigher() to -1.0)
+ )
+ "core-mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(CoreMemoryWeigher() to 1.0)
+ )
+ "active-servers" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(ProvisionedCoresWeigher() to -1.0)
+ )
+ "active-servers-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(InstanceCountWeigher() to 1.0)
+ )
+ "provisioned-cores" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(ProvisionedCoresWeigher() to -1.0)
+ )
+ "provisioned-cores-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(ProvisionedCoresWeigher() to 1.0)
+ )
+ "random" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(RandomWeigher(java.util.Random(seeder.nextLong())) to 1.0)
+ )
+ else -> throw IllegalArgumentException("Unknown policy $policyName")
+ }
+
+ val trace = ParquetTraceReader(
+ listOf(traceReader),
+ Workload(workloadName, workloadFraction),
+ seed
+ )
+ val failureFrequency = if (operational.failuresEnabled) 24.0 * 7 else 0.0
+
+ withComputeService(clock, meterProvider, environment, allocationPolicy, interferenceModel) { scheduler ->
+ val failureDomain = if (failureFrequency > 0) {
+ logger.debug { "ENABLING failures" }
+ createFailureDomain(
+ this,
+ clock,
+ seeder.nextInt(),
+ failureFrequency,
+ scheduler,
+ chan
+ )
+ } else {
+ null
+ }
+
+ withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ processTrace(
+ clock,
+ trace,
+ scheduler,
+ chan,
+ monitor
+ )
+ }
+
+ failureDomain?.cancel()
+ }
+
+ val monitorResults = collectMetrics(metricProducer)
+ logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
+ }
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Experiment failed" }
+ }
+
+ return monitor.getResult()
+ }
+
+ private val POLL_INTERVAL = 30000L // ms = 30 s
+ private val HEARTBEAT_INTERVAL = 60000L // ms = 1 min
+
+ override fun run(): Unit = runBlocking(Dispatchers.Default) {
+ logger.info { "Starting OpenDC web runner" }
+
+ val client = ApiClient(baseUrl = apiUrl, AuthConfiguration(authDomain, authClientId, authClientSecret))
+ val manager = ScenarioManager(client)
+
+ logger.info { "Watching for queued scenarios" }
+
+ while (true) {
+ val scenario = manager.findNext()
+
+ if (scenario == null) {
+ delay(POLL_INTERVAL)
+ continue
+ }
+
+ val id = scenario.id
+
+ logger.info { "Found queued scenario $id: attempting to claim" }
+
+ if (!manager.claim(id)) {
+ logger.info { "Failed to claim scenario" }
+ continue
+ }
+
+ coroutineScope {
+ // Launch heartbeat process
+ val heartbeat = launch {
+ while (true) {
+ manager.heartbeat(id)
+ delay(HEARTBEAT_INTERVAL)
+ }
+ }
+
+ try {
+ val scenarioModel = client.getScenario(id)!!
+ val portfolio = client.getPortfolio(scenarioModel.portfolioId)!!
+ val environment = convert(client.getTopology(scenarioModel.topology.topologyId)!!)
+ val results = runScenario(portfolio, scenarioModel, environment)
+
+ logger.info { "Writing results to database" }
+
+ manager.finish(id, results)
+
+ logger.info { "Successfully finished scenario $id" }
+ } catch (e: Exception) {
+ logger.error(e) { "Scenario failed to finish" }
+ manager.fail(id)
+ } finally {
+ heartbeat.cancel()
+ }
+ }
+ }
+ }
+
+ /**
+ * Convert the specified [topology] into an [EnvironmentReader] understood by Capelin.
+ */
+ private fun convert(topology: Topology): EnvironmentReader {
+ val nodes = mutableListOf<MachineDef>()
+ val random = Random(0)
+
+ val machines = topology.rooms.asSequence()
+ .flatMap { room ->
+ room.tiles.flatMap { tile ->
+ tile.rack?.machines?.map { machine -> tile.rack to machine } ?: emptyList()
+ }
+ }
+ for ((rack, machine) in machines) {
+ val clusterId = rack.id
+ val position = machine.position
+
+ val processors = machine.cpus.flatMap { cpu ->
+ val cores = cpu.numberOfCores
+ val speed = cpu.clockRateMhz
+ // TODO Remove hard coding of vendor
+ val node = ProcessingNode("Intel", "amd64", cpu.name, cores)
+ List(cores) { coreId ->
+ ProcessingUnit(node, coreId, speed)
+ }
+ }
+ val memoryUnits = machine.memory.map { memory ->
+ MemoryUnit(
+ "Samsung",
+ memory.name,
+ memory.speedMbPerS,
+ memory.sizeMb.toLong()
+ )
+ }
+
+ val energyConsumptionW = machine.cpus.sumOf { it.energyConsumptionW }
+
+ nodes.add(
+ MachineDef(
+ UUID(random.nextLong(), random.nextLong()),
+ "node-$clusterId-$position",
+ mapOf("cluster" to clusterId),
+ MachineModel(processors, memoryUnits),
+ LinearPowerModel(2 * energyConsumptionW, energyConsumptionW * 0.5)
+ )
+ )
+ }
+
+ return object : EnvironmentReader {
+ override fun read(): List<MachineDef> = nodes
+ override fun close() {}
+ }
+ }
+}
+
+/**
+ * Main entry point of the runner.
+ */
+fun main(args: Array<String>): Unit = RunnerCli().main(args)
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
new file mode 100644
index 00000000..4044cec9
--- /dev/null
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.web.runner
+
+import org.opendc.web.client.ApiClient
+import org.opendc.web.client.model.Job
+import org.opendc.web.client.model.SimulationState
+
+/**
+ * Manages the queue of scenarios that need to be processed.
+ */
+public class ScenarioManager(private val client: ApiClient) {
+ /**
+ * Find the next job that the simulator needs to process.
+ */
+ public suspend fun findNext(): Job? {
+ return client.getJobs().firstOrNull()
+ }
+
+ /**
+ * Claim the simulation job with the specified id.
+ */
+ public suspend fun claim(id: String): Boolean {
+ return client.updateJob(id, SimulationState.CLAIMED)
+ }
+
+ /**
+ * Update the heartbeat of the specified scenario.
+ */
+ public suspend fun heartbeat(id: String) {
+ client.updateJob(id, SimulationState.RUNNING)
+ }
+
+ /**
+ * Mark the scenario as failed.
+ */
+ public suspend fun fail(id: String) {
+ client.updateJob(id, SimulationState.FAILED)
+ }
+
+ /**
+ * Persist the specified results.
+ */
+ public suspend fun finish(id: String, results: List<WebExperimentMonitor.Result>) {
+ client.updateJob(
+ id, SimulationState.FINISHED,
+ mapOf(
+ "total_requested_burst" to results.map { it.totalRequestedBurst },
+ "total_granted_burst" to results.map { it.totalGrantedBurst },
+ "total_overcommitted_burst" to results.map { it.totalOvercommittedBurst },
+ "total_interfered_burst" to results.map { it.totalInterferedBurst },
+ "mean_cpu_usage" to results.map { it.meanCpuUsage },
+ "mean_cpu_demand" to results.map { it.meanCpuDemand },
+ "mean_num_deployed_images" to results.map { it.meanNumDeployedImages },
+ "max_num_deployed_images" to results.map { it.maxNumDeployedImages },
+ "total_power_draw" to results.map { it.totalPowerDraw },
+ "total_failure_slices" to results.map { it.totalFailureSlices },
+ "total_failure_vm_slices" to results.map { it.totalFailureVmSlices },
+ "total_vms_submitted" to results.map { it.totalVmsSubmitted },
+ "total_vms_queued" to results.map { it.totalVmsQueued },
+ "total_vms_finished" to results.map { it.totalVmsFinished },
+ "total_vms_failed" to results.map { it.totalVmsFailed }
+ )
+ )
+ }
+}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt
new file mode 100644
index 00000000..d4445810
--- /dev/null
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt
@@ -0,0 +1,189 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.web.runner
+
+import mu.KotlinLogging
+import org.opendc.compute.api.Server
+import org.opendc.compute.api.ServerState
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.driver.HostState
+import org.opendc.experiments.capelin.monitor.ExperimentMonitor
+import org.opendc.experiments.capelin.telemetry.HostEvent
+import kotlin.math.max
+
+/**
+ * An [ExperimentMonitor] that tracks the aggregate metrics for each repeat.
+ */
+public class WebExperimentMonitor : ExperimentMonitor {
+ private val logger = KotlinLogging.logger {}
+
+ override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
+
+ override fun reportHostStateChange(time: Long, host: Host, newState: HostState) {
+ logger.debug { "Host ${host.uid} changed state $newState [$time]" }
+ }
+
+ override fun reportHostSlice(
+ time: Long,
+ requestedBurst: Long,
+ grantedBurst: Long,
+ overcommissionedBurst: Long,
+ interferedBurst: Long,
+ cpuUsage: Double,
+ cpuDemand: Double,
+ powerDraw: Double,
+ numberOfDeployedImages: Int,
+ host: Host,
+ ) {
+ processHostEvent(
+ HostEvent(
+ time,
+ 5 * 60 * 1000L,
+ host,
+ numberOfDeployedImages,
+ requestedBurst,
+ grantedBurst,
+ overcommissionedBurst,
+ interferedBurst,
+ cpuUsage,
+ cpuDemand,
+ powerDraw,
+ host.model.cpuCount
+ )
+ )
+ }
+
+ private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
+ private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf()
+
+ private fun processHostEvent(event: HostEvent) {
+ val slices = event.duration / SLICE_LENGTH
+
+ hostAggregateMetrics = AggregateHostMetrics(
+ hostAggregateMetrics.totalRequestedBurst + event.requestedBurst,
+ hostAggregateMetrics.totalGrantedBurst + event.grantedBurst,
+ hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst,
+ hostAggregateMetrics.totalInterferedBurst + event.interferedBurst,
+ hostAggregateMetrics.totalPowerDraw + (event.duration * event.powerDraw) / 3600,
+ hostAggregateMetrics.totalFailureSlices + if (event.host.state != HostState.UP) slices else 0,
+ hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != HostState.UP) event.vmCount * slices else 0
+ )
+
+ hostMetrics.compute(event.host) { _, prev ->
+ HostMetrics(
+ (event.cpuUsage.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0),
+ (event.cpuDemand.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0),
+ event.vmCount + (prev?.vmCount ?: 0),
+ 1 + (prev?.count ?: 0)
+ )
+ }
+ }
+
+ private val SLICE_LENGTH: Long = 5 * 60 * 1000
+
+ public data class AggregateHostMetrics(
+ val totalRequestedBurst: Long = 0,
+ val totalGrantedBurst: Long = 0,
+ val totalOvercommittedBurst: Long = 0,
+ val totalInterferedBurst: Long = 0,
+ val totalPowerDraw: Double = 0.0,
+ val totalFailureSlices: Long = 0,
+ val totalFailureVmSlices: Long = 0,
+ )
+
+ public data class HostMetrics(
+ val cpuUsage: Double,
+ val cpuDemand: Double,
+ val vmCount: Long,
+ val count: Long
+ )
+
+ private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics()
+
+ override fun reportProvisionerMetrics(
+ time: Long,
+ totalHostCount: Int,
+ availableHostCount: Int,
+ totalVmCount: Int,
+ activeVmCount: Int,
+ inactiveVmCount: Int,
+ waitingVmCount: Int,
+ failedVmCount: Int
+ ) {
+ provisionerMetrics = AggregateProvisionerMetrics(
+ max(totalVmCount, provisionerMetrics.vmTotalCount),
+ max(waitingVmCount, provisionerMetrics.vmWaitingCount),
+ max(activeVmCount, provisionerMetrics.vmActiveCount),
+ max(inactiveVmCount, provisionerMetrics.vmInactiveCount),
+ max(failedVmCount, provisionerMetrics.vmFailedCount),
+ )
+ }
+
+ public data class AggregateProvisionerMetrics(
+ val vmTotalCount: Int = 0,
+ val vmWaitingCount: Int = 0,
+ val vmActiveCount: Int = 0,
+ val vmInactiveCount: Int = 0,
+ val vmFailedCount: Int = 0
+ )
+
+ override fun close() {}
+
+ public fun getResult(): Result {
+ return Result(
+ hostAggregateMetrics.totalRequestedBurst,
+ hostAggregateMetrics.totalGrantedBurst,
+ hostAggregateMetrics.totalOvercommittedBurst,
+ hostAggregateMetrics.totalInterferedBurst,
+ hostMetrics.map { it.value.cpuUsage / it.value.count }.average(),
+ hostMetrics.map { it.value.cpuDemand / it.value.count }.average(),
+ hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.average(),
+ hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0,
+ hostAggregateMetrics.totalPowerDraw,
+ hostAggregateMetrics.totalFailureSlices,
+ hostAggregateMetrics.totalFailureVmSlices,
+ provisionerMetrics.vmTotalCount,
+ provisionerMetrics.vmWaitingCount,
+ provisionerMetrics.vmInactiveCount,
+ provisionerMetrics.vmFailedCount,
+ )
+ }
+
+ public data class Result(
+ public val totalRequestedBurst: Long,
+ public val totalGrantedBurst: Long,
+ public val totalOvercommittedBurst: Long,
+ public val totalInterferedBurst: Long,
+ public val meanCpuUsage: Double,
+ public val meanCpuDemand: Double,
+ public val meanNumDeployedImages: Double,
+ public val maxNumDeployedImages: Double,
+ public val totalPowerDraw: Double,
+ public val totalFailureSlices: Long,
+ public val totalFailureVmSlices: Long,
+ public val totalVmsSubmitted: Int,
+ public val totalVmsQueued: Int,
+ public val totalVmsFinished: Int,
+ public val totalVmsFailed: Int
+ )
+}