summaryrefslogtreecommitdiff
path: root/opendc-web/opendc-web-runner/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-27 16:41:55 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-07 14:34:30 +0200
commitaaedd4f3eed83d0c3ebc829fec08a1749a2bfba4 (patch)
tree3b43c8da1ab285c4b965a6042215fb694f6ee909 /opendc-web/opendc-web-runner/src
parentbefec2f1ddf3a6e6d15d9d1b9fd1ecbbc4f38960 (diff)
refactor(capelin): Move metric collection outside Capelin code
This change moves the metric collection outside the Capelin codebase in a separate module so other modules can also benefit from the compute metric collection code.
Diffstat (limited to 'opendc-web/opendc-web-runner/src')
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt20
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt10
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt145
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt191
4 files changed, 164 insertions, 202 deletions
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index 53d50357..65527141 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -47,6 +47,8 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.LinearPowerModel
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.collectServiceMetrics
+import org.opendc.telemetry.compute.withMonitor
import org.opendc.telemetry.sdk.toOtelClock
import org.opendc.web.client.ApiClient
import org.opendc.web.client.AuthConfiguration
@@ -131,7 +133,7 @@ class RunnerCli : CliktCommand(name = "runner") {
/**
* Run a single scenario.
*/
- private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, environment: EnvironmentReader): List<WebExperimentMonitor.Result> {
+ private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, environment: EnvironmentReader): List<WebComputeMonitor.Result> {
val id = scenario.id
logger.info { "Constructing performance interference model" }
@@ -176,8 +178,8 @@ class RunnerCli : CliktCommand(name = "runner") {
environment: EnvironmentReader,
traceReader: RawParquetTraceReader,
interferenceModel: VmInterferenceModel?
- ): WebExperimentMonitor.Result {
- val monitor = WebExperimentMonitor()
+ ): WebComputeMonitor.Result {
+ val monitor = WebComputeMonitor()
try {
runBlockingSimulation {
@@ -220,7 +222,7 @@ class RunnerCli : CliktCommand(name = "runner") {
null
}
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
trace,
@@ -233,8 +235,14 @@ class RunnerCli : CliktCommand(name = "runner") {
failureDomain?.cancel()
}
- val monitorResults = collectMetrics(metricProducer)
- logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
+ val monitorResults = collectServiceMetrics(clock.millis(), metricProducer)
+ logger.debug {
+ "Finish " +
+ "SUBMIT=${monitorResults.instanceCount} " +
+ "FAIL=${monitorResults.failedInstanceCount} " +
+ "QUEUE=${monitorResults.queuedInstanceCount} " +
+ "RUNNING=${monitorResults.runningInstanceCount}"
+ }
}
} catch (cause: Throwable) {
logger.warn(cause) { "Experiment failed" }
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
index 4044cec9..e0e3488f 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
@@ -61,14 +61,14 @@ public class ScenarioManager(private val client: ApiClient) {
/**
* Persist the specified results.
*/
- public suspend fun finish(id: String, results: List<WebExperimentMonitor.Result>) {
+ public suspend fun finish(id: String, results: List<WebComputeMonitor.Result>) {
client.updateJob(
id, SimulationState.FINISHED,
mapOf(
- "total_requested_burst" to results.map { it.totalRequestedBurst },
- "total_granted_burst" to results.map { it.totalGrantedBurst },
- "total_overcommitted_burst" to results.map { it.totalOvercommittedBurst },
- "total_interfered_burst" to results.map { it.totalInterferedBurst },
+ "total_requested_burst" to results.map { it.totalWork },
+ "total_granted_burst" to results.map { it.totalGrantedWork },
+ "total_overcommitted_burst" to results.map { it.totalOvercommittedWork },
+ "total_interfered_burst" to results.map { it.totalInterferedWork },
"mean_cpu_usage" to results.map { it.meanCpuUsage },
"mean_cpu_demand" to results.map { it.meanCpuDemand },
"mean_num_deployed_images" to results.map { it.meanNumDeployedImages },
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
new file mode 100644
index 00000000..c8e58dde
--- /dev/null
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
@@ -0,0 +1,145 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.web.runner
+
+import mu.KotlinLogging
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.driver.HostState
+import org.opendc.telemetry.compute.ComputeMonitor
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServiceData
+import kotlin.math.max
+
+/**
+ * A [ComputeMonitor] that tracks the aggregate metrics for each repeat.
+ */
+public class WebComputeMonitor : ComputeMonitor {
+ private val logger = KotlinLogging.logger {}
+
+ override fun onStateChange(time: Long, host: Host, newState: HostState) {
+ logger.debug { "Host ${host.uid} changed state $newState [$time]" }
+ }
+
+ override fun record(data: HostData) {
+ val duration = 5 * 60 * 1000L
+ val slices = duration / SLICE_LENGTH
+
+ hostAggregateMetrics = AggregateHostMetrics(
+ hostAggregateMetrics.totalWork + data.totalWork,
+ hostAggregateMetrics.totalGrantedWork + data.grantedWork,
+ hostAggregateMetrics.totalOvercommittedWork + data.overcommittedWork,
+ hostAggregateMetrics.totalInterferedWork + data.overcommittedWork,
+ hostAggregateMetrics.totalPowerDraw + (duration * data.powerDraw) / 3600,
+ hostAggregateMetrics.totalFailureSlices + if (data.host.state != HostState.UP) slices else 0,
+ hostAggregateMetrics.totalFailureVmSlices + if (data.host.state != HostState.UP) data.instanceCount * slices else 0
+ )
+
+ hostMetrics.compute(data.host) { _, prev ->
+ HostMetrics(
+ (data.cpuUsage.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0),
+ (data.cpuDemand.takeIf { data.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0),
+ data.instanceCount + (prev?.instanceCount ?: 0),
+ 1 + (prev?.count ?: 0)
+ )
+ }
+ }
+
+ private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
+ private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf()
+ private val SLICE_LENGTH: Long = 5 * 60 * 1000
+
+ data class AggregateHostMetrics(
+ val totalWork: Double = 0.0,
+ val totalGrantedWork: Double = 0.0,
+ val totalOvercommittedWork: Double = 0.0,
+ val totalInterferedWork: Double = 0.0,
+ val totalPowerDraw: Double = 0.0,
+ val totalFailureSlices: Long = 0,
+ val totalFailureVmSlices: Long = 0,
+ )
+
+ data class HostMetrics(
+ val cpuUsage: Double,
+ val cpuDemand: Double,
+ val instanceCount: Long,
+ val count: Long
+ )
+
+ private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics()
+
+ override fun record(data: ServiceData) {
+ serviceMetrics = AggregateServiceMetrics(
+ max(data.instanceCount, serviceMetrics.vmTotalCount),
+ max(data.queuedInstanceCount, serviceMetrics.vmWaitingCount),
+ max(data.runningInstanceCount, serviceMetrics.vmActiveCount),
+ max(data.finishedInstanceCount, serviceMetrics.vmInactiveCount),
+ max(data.failedInstanceCount, serviceMetrics.vmFailedCount),
+ )
+ }
+
+ public data class AggregateServiceMetrics(
+ val vmTotalCount: Int = 0,
+ val vmWaitingCount: Int = 0,
+ val vmActiveCount: Int = 0,
+ val vmInactiveCount: Int = 0,
+ val vmFailedCount: Int = 0
+ )
+
+ public fun getResult(): Result {
+ return Result(
+ hostAggregateMetrics.totalWork,
+ hostAggregateMetrics.totalGrantedWork,
+ hostAggregateMetrics.totalOvercommittedWork,
+ hostAggregateMetrics.totalInterferedWork,
+ hostMetrics.map { it.value.cpuUsage / it.value.count }.average(),
+ hostMetrics.map { it.value.cpuDemand / it.value.count }.average(),
+ hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(),
+ hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0,
+ hostAggregateMetrics.totalPowerDraw,
+ hostAggregateMetrics.totalFailureSlices,
+ hostAggregateMetrics.totalFailureVmSlices,
+ serviceMetrics.vmTotalCount,
+ serviceMetrics.vmWaitingCount,
+ serviceMetrics.vmInactiveCount,
+ serviceMetrics.vmFailedCount,
+ )
+ }
+
+ data class Result(
+ val totalWork: Double,
+ val totalGrantedWork: Double,
+ val totalOvercommittedWork: Double,
+ val totalInterferedWork: Double,
+ val meanCpuUsage: Double,
+ val meanCpuDemand: Double,
+ val meanNumDeployedImages: Double,
+ val maxNumDeployedImages: Double,
+ val totalPowerDraw: Double,
+ val totalFailureSlices: Long,
+ val totalFailureVmSlices: Long,
+ val totalVmsSubmitted: Int,
+ val totalVmsQueued: Int,
+ val totalVmsFinished: Int,
+ val totalVmsFailed: Int
+ )
+}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt
deleted file mode 100644
index 281c8dbb..00000000
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.web.runner
-
-import mu.KotlinLogging
-import org.opendc.compute.api.Server
-import org.opendc.compute.api.ServerState
-import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.driver.HostState
-import org.opendc.experiments.capelin.monitor.ExperimentMonitor
-import org.opendc.experiments.capelin.telemetry.HostEvent
-import kotlin.math.max
-
-/**
- * An [ExperimentMonitor] that tracks the aggregate metrics for each repeat.
- */
-public class WebExperimentMonitor : ExperimentMonitor {
- private val logger = KotlinLogging.logger {}
-
- override fun reportVmStateChange(time: Long, server: Server, newState: ServerState) {}
-
- override fun reportHostStateChange(time: Long, host: Host, newState: HostState) {
- logger.debug { "Host ${host.uid} changed state $newState [$time]" }
- }
-
- override fun reportHostData(
- time: Long,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- instanceCount: Int,
- uptime: Long,
- downtime: Long,
- host: Host,
- ) {
- processHostEvent(
- HostEvent(
- time,
- 5 * 60 * 1000L,
- host,
- instanceCount,
- totalWork.toLong(),
- grantedWork.toLong(),
- overcommittedWork.toLong(),
- interferedWork.toLong(),
- cpuUsage,
- cpuDemand,
- powerDraw,
- host.model.cpuCount
- )
- )
- }
-
- private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
- private val hostMetrics: MutableMap<Host, HostMetrics> = mutableMapOf()
-
- private fun processHostEvent(event: HostEvent) {
- val slices = event.duration / SLICE_LENGTH
-
- hostAggregateMetrics = AggregateHostMetrics(
- hostAggregateMetrics.totalRequestedBurst + event.requestedBurst,
- hostAggregateMetrics.totalGrantedBurst + event.grantedBurst,
- hostAggregateMetrics.totalOvercommittedBurst + event.overcommissionedBurst,
- hostAggregateMetrics.totalInterferedBurst + event.interferedBurst,
- hostAggregateMetrics.totalPowerDraw + (event.duration * event.powerDraw) / 3600,
- hostAggregateMetrics.totalFailureSlices + if (event.host.state != HostState.UP) slices else 0,
- hostAggregateMetrics.totalFailureVmSlices + if (event.host.state != HostState.UP) event.vmCount * slices else 0
- )
-
- hostMetrics.compute(event.host) { _, prev ->
- HostMetrics(
- (event.cpuUsage.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuUsage ?: 0.0),
- (event.cpuDemand.takeIf { event.host.state == HostState.UP } ?: 0.0) + (prev?.cpuDemand ?: 0.0),
- event.vmCount + (prev?.vmCount ?: 0),
- 1 + (prev?.count ?: 0)
- )
- }
- }
-
- private val SLICE_LENGTH: Long = 5 * 60 * 1000
-
- public data class AggregateHostMetrics(
- val totalRequestedBurst: Long = 0,
- val totalGrantedBurst: Long = 0,
- val totalOvercommittedBurst: Long = 0,
- val totalInterferedBurst: Long = 0,
- val totalPowerDraw: Double = 0.0,
- val totalFailureSlices: Long = 0,
- val totalFailureVmSlices: Long = 0,
- )
-
- public data class HostMetrics(
- val cpuUsage: Double,
- val cpuDemand: Double,
- val vmCount: Long,
- val count: Long
- )
-
- private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics()
-
- override fun reportServiceData(
- time: Long,
- totalHostCount: Int,
- availableHostCount: Int,
- totalVmCount: Int,
- activeVmCount: Int,
- inactiveVmCount: Int,
- waitingVmCount: Int,
- failedVmCount: Int
- ) {
- provisionerMetrics = AggregateProvisionerMetrics(
- max(totalVmCount, provisionerMetrics.vmTotalCount),
- max(waitingVmCount, provisionerMetrics.vmWaitingCount),
- max(activeVmCount, provisionerMetrics.vmActiveCount),
- max(inactiveVmCount, provisionerMetrics.vmInactiveCount),
- max(failedVmCount, provisionerMetrics.vmFailedCount),
- )
- }
-
- public data class AggregateProvisionerMetrics(
- val vmTotalCount: Int = 0,
- val vmWaitingCount: Int = 0,
- val vmActiveCount: Int = 0,
- val vmInactiveCount: Int = 0,
- val vmFailedCount: Int = 0
- )
-
- override fun close() {}
-
- public fun getResult(): Result {
- return Result(
- hostAggregateMetrics.totalRequestedBurst,
- hostAggregateMetrics.totalGrantedBurst,
- hostAggregateMetrics.totalOvercommittedBurst,
- hostAggregateMetrics.totalInterferedBurst,
- hostMetrics.map { it.value.cpuUsage / it.value.count }.average(),
- hostMetrics.map { it.value.cpuDemand / it.value.count }.average(),
- hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.average(),
- hostMetrics.map { it.value.vmCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0,
- hostAggregateMetrics.totalPowerDraw,
- hostAggregateMetrics.totalFailureSlices,
- hostAggregateMetrics.totalFailureVmSlices,
- provisionerMetrics.vmTotalCount,
- provisionerMetrics.vmWaitingCount,
- provisionerMetrics.vmInactiveCount,
- provisionerMetrics.vmFailedCount,
- )
- }
-
- public data class Result(
- public val totalRequestedBurst: Long,
- public val totalGrantedBurst: Long,
- public val totalOvercommittedBurst: Long,
- public val totalInterferedBurst: Long,
- public val meanCpuUsage: Double,
- public val meanCpuDemand: Double,
- public val meanNumDeployedImages: Double,
- public val maxNumDeployedImages: Double,
- public val totalPowerDraw: Double,
- public val totalFailureSlices: Long,
- public val totalFailureVmSlices: Long,
- public val totalVmsSubmitted: Int,
- public val totalVmsQueued: Int,
- public val totalVmsFinished: Int,
- public val totalVmsFailed: Int
- )
-}