From 7981e9aa3e6854ad593a5af85f8eb56874299d7e Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 6 May 2022 17:45:23 +0200 Subject: refactor(telemetry/compute): Support direct metric access This change introduces a `ComputeMetricReader` class that can be used as a replacement for the `CoroutineMetricReader` class when reading metrics from the Compute service. This implementation operates directly on a `ComputeService` instance, providing better performance. --- .../kotlin/org/opendc/web/runner/OpenDCRunner.kt | 29 ++--- .../org/opendc/web/runner/internal/JobManager.kt | 2 +- .../runner/internal/WebComputeMetricExporter.kt | 143 --------------------- .../web/runner/internal/WebComputeMonitor.kt | 142 ++++++++++++++++++++ 4 files changed, 157 insertions(+), 159 deletions(-) delete mode 100644 opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt create mode 100644 opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt (limited to 'opendc-web/opendc-web-runner/src') diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index a150de4e..7c0c43ed 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -23,8 +23,9 @@ package org.opendc.web.runner import mu.KotlinLogging +import org.opendc.compute.api.Server import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.SdkTelemetryManager +import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.HostSpec import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply @@ -35,13 +36,12 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.LinearPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader +import org.opendc.telemetry.compute.ComputeMetricReader import org.opendc.web.client.runner.OpenDCRunnerClient import org.opendc.web.proto.runner.Job import org.opendc.web.proto.runner.Scenario import org.opendc.web.runner.internal.JobManager -import org.opendc.web.runner.internal.WebComputeMetricExporter +import org.opendc.web.runner.internal.WebComputeMonitor import java.io.File import java.time.Duration import java.util.* @@ -180,9 +180,9 @@ public class OpenDCRunner( private val scenario: Scenario, private val repeat: Int, private val topology: Topology, - ) : RecursiveTask() { - override fun compute(): WebComputeMetricExporter.Results { - val exporter = WebComputeMetricExporter() + ) : RecursiveTask() { + override fun compute(): WebComputeMonitor.Results { + val monitor = WebComputeMonitor() // Schedule task that interrupts the simulation if it runs for too long. val currentThread = Thread.currentThread() @@ -206,25 +206,24 @@ public class OpenDCRunner( else null - val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler, failureModel, interferenceModel.takeIf { phenomena.interference } ) - - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter, exportInterval = Duration.ofHours(1))) + val servers = mutableListOf() + val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) try { // Instantiate the topology onto the simulator simulator.apply(topology) // Run workload trace - simulator.run(vms, seeder.nextLong()) + simulator.run(vms, seeder.nextLong(), servers) - val serviceMetrics = collectServiceMetrics(telemetry.metricProducer) + val serviceMetrics = simulator.service.getSchedulerStats() logger.debug { "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -235,14 +234,14 @@ public class OpenDCRunner( } } finally { simulator.close() - telemetry.close() + reader.close() } } } finally { interruptTask.cancel(false) } - return exporter.collectResults() + return monitor.collectResults() } } diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt index 8de0cee4..99b8aaf1 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt @@ -62,7 +62,7 @@ internal class JobManager(private val client: OpenDCRunnerClient) { /** * Persist the specified results. */ - fun finish(id: Long, results: List) { + fun finish(id: Long, results: List) { client.jobs.update( id, Job.Update( diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt deleted file mode 100644 index 04437a5f..00000000 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.web.runner.internal - -import org.opendc.telemetry.compute.ComputeMetricExporter -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.telemetry.compute.table.ServiceTableReader -import kotlin.math.max -import kotlin.math.roundToLong - -/** - * A [ComputeMonitor] that tracks the aggregate metrics for each repeat. - */ -internal class WebComputeMetricExporter : ComputeMetricExporter() { - override fun record(reader: HostTableReader) { - val slices = reader.downtime / SLICE_LENGTH - - hostAggregateMetrics = AggregateHostMetrics( - hostAggregateMetrics.totalActiveTime + reader.cpuActiveTime, - hostAggregateMetrics.totalIdleTime + reader.cpuIdleTime, - hostAggregateMetrics.totalStealTime + reader.cpuStealTime, - hostAggregateMetrics.totalLostTime + reader.cpuLostTime, - hostAggregateMetrics.totalPowerDraw + reader.powerTotal, - hostAggregateMetrics.totalFailureSlices + slices, - hostAggregateMetrics.totalFailureVmSlices + reader.guestsRunning * slices - ) - - hostMetrics.compute(reader.host.id) { _, prev -> - HostMetrics( - reader.cpuUsage + (prev?.cpuUsage ?: 0.0), - reader.cpuDemand + (prev?.cpuDemand ?: 0.0), - reader.guestsRunning + (prev?.instanceCount ?: 0), - 1 + (prev?.count ?: 0) - ) - } - } - - private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() - private val hostMetrics: MutableMap = mutableMapOf() - private val SLICE_LENGTH: Long = 5 * 60L - - private data class AggregateHostMetrics( - val totalActiveTime: Long = 0L, - val totalIdleTime: Long = 0L, - val totalStealTime: Long = 0L, - val totalLostTime: Long = 0L, - val totalPowerDraw: Double = 0.0, - val totalFailureSlices: Double = 0.0, - val totalFailureVmSlices: Double = 0.0, - ) - - private data class HostMetrics( - val cpuUsage: Double, - val cpuDemand: Double, - val instanceCount: Long, - val count: Long - ) - - private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics() - - override fun record(reader: ServiceTableReader) { - serviceMetrics = AggregateServiceMetrics( - max(reader.attemptsSuccess, serviceMetrics.vmTotalCount), - max(reader.serversPending, serviceMetrics.vmWaitingCount), - max(reader.serversActive, serviceMetrics.vmActiveCount), - max(0, serviceMetrics.vmInactiveCount), - max(reader.attemptsFailure, serviceMetrics.vmFailedCount), - ) - } - - private data class AggregateServiceMetrics( - val vmTotalCount: Int = 0, - val vmWaitingCount: Int = 0, - val vmActiveCount: Int = 0, - val vmInactiveCount: Int = 0, - val vmFailedCount: Int = 0 - ) - - /** - * Collect the results of the simulation. - */ - fun collectResults(): Results { - return Results( - hostAggregateMetrics.totalActiveTime, - hostAggregateMetrics.totalIdleTime, - hostAggregateMetrics.totalStealTime, - hostAggregateMetrics.totalLostTime, - hostMetrics.map { it.value.cpuUsage / it.value.count }.average(), - hostMetrics.map { it.value.cpuDemand / it.value.count }.average(), - hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(), - hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, - hostAggregateMetrics.totalPowerDraw, - hostAggregateMetrics.totalFailureSlices.roundToLong(), - hostAggregateMetrics.totalFailureVmSlices.roundToLong(), - serviceMetrics.vmTotalCount, - serviceMetrics.vmWaitingCount, - serviceMetrics.vmInactiveCount, - serviceMetrics.vmFailedCount, - ) - } - - /** - * Structure of the results of a single simulation. - */ - data class Results( - val totalActiveTime: Long, - val totalIdleTime: Long, - val totalStealTime: Long, - val totalLostTime: Long, - val meanCpuUsage: Double, - val meanCpuDemand: Double, - val meanNumDeployedImages: Double, - val maxNumDeployedImages: Double, - val totalPowerDraw: Double, - val totalFailureSlices: Long, - val totalFailureVmSlices: Long, - val totalVmsSubmitted: Int, - val totalVmsQueued: Int, - val totalVmsFinished: Int, - val totalVmsFailed: Int - ) -} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt new file mode 100644 index 00000000..69350d8c --- /dev/null +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.runner.internal + +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServiceTableReader +import kotlin.math.max +import kotlin.math.roundToLong + +/** + * A [ComputeMonitor] that tracks the aggregate metrics for each repeat. + */ +internal class WebComputeMonitor : ComputeMonitor { + override fun record(reader: HostTableReader) { + val slices = reader.downtime / SLICE_LENGTH + + hostAggregateMetrics = AggregateHostMetrics( + hostAggregateMetrics.totalActiveTime + reader.cpuActiveTime, + hostAggregateMetrics.totalIdleTime + reader.cpuIdleTime, + hostAggregateMetrics.totalStealTime + reader.cpuStealTime, + hostAggregateMetrics.totalLostTime + reader.cpuLostTime, + hostAggregateMetrics.totalPowerDraw + reader.powerTotal, + hostAggregateMetrics.totalFailureSlices + slices, + hostAggregateMetrics.totalFailureVmSlices + reader.guestsRunning * slices + ) + + hostMetrics.compute(reader.host.id) { _, prev -> + HostMetrics( + reader.cpuUsage + (prev?.cpuUsage ?: 0.0), + reader.cpuDemand + (prev?.cpuDemand ?: 0.0), + reader.guestsRunning + (prev?.instanceCount ?: 0), + 1 + (prev?.count ?: 0) + ) + } + } + + private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() + private val hostMetrics: MutableMap = mutableMapOf() + private val SLICE_LENGTH: Long = 5 * 60L + + private data class AggregateHostMetrics( + val totalActiveTime: Long = 0L, + val totalIdleTime: Long = 0L, + val totalStealTime: Long = 0L, + val totalLostTime: Long = 0L, + val totalPowerDraw: Double = 0.0, + val totalFailureSlices: Double = 0.0, + val totalFailureVmSlices: Double = 0.0, + ) + + private data class HostMetrics( + val cpuUsage: Double, + val cpuDemand: Double, + val instanceCount: Long, + val count: Long + ) + + private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics() + + override fun record(reader: ServiceTableReader) { + serviceMetrics = AggregateServiceMetrics( + max(reader.attemptsSuccess, serviceMetrics.vmTotalCount), + max(reader.serversPending, serviceMetrics.vmWaitingCount), + max(reader.serversActive, serviceMetrics.vmActiveCount), + max(0, serviceMetrics.vmInactiveCount), + max(reader.attemptsFailure, serviceMetrics.vmFailedCount), + ) + } + + private data class AggregateServiceMetrics( + val vmTotalCount: Int = 0, + val vmWaitingCount: Int = 0, + val vmActiveCount: Int = 0, + val vmInactiveCount: Int = 0, + val vmFailedCount: Int = 0 + ) + + /** + * Collect the results of the simulation. + */ + fun collectResults(): Results { + return Results( + hostAggregateMetrics.totalActiveTime, + hostAggregateMetrics.totalIdleTime, + hostAggregateMetrics.totalStealTime, + hostAggregateMetrics.totalLostTime, + hostMetrics.map { it.value.cpuUsage / it.value.count }.average(), + hostMetrics.map { it.value.cpuDemand / it.value.count }.average(), + hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(), + hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, + hostAggregateMetrics.totalPowerDraw, + hostAggregateMetrics.totalFailureSlices.roundToLong(), + hostAggregateMetrics.totalFailureVmSlices.roundToLong(), + serviceMetrics.vmTotalCount, + serviceMetrics.vmWaitingCount, + serviceMetrics.vmInactiveCount, + serviceMetrics.vmFailedCount, + ) + } + + /** + * Structure of the results of a single simulation. + */ + data class Results( + val totalActiveTime: Long, + val totalIdleTime: Long, + val totalStealTime: Long, + val totalLostTime: Long, + val meanCpuUsage: Double, + val meanCpuDemand: Double, + val meanNumDeployedImages: Double, + val maxNumDeployedImages: Double, + val totalPowerDraw: Double, + val totalFailureSlices: Long, + val totalFailureVmSlices: Long, + val totalVmsSubmitted: Int, + val totalVmsQueued: Int, + val totalVmsFinished: Int, + val totalVmsFailed: Int + ) +} -- cgit v1.2.3