From 7981e9aa3e6854ad593a5af85f8eb56874299d7e Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 6 May 2022 17:45:23 +0200 Subject: refactor(telemetry/compute): Support direct metric access This change introduces a `ComputeMetricReader` class that can be used as a replacement for the `CoroutineMetricReader` class when reading metrics from the Compute service. This implementation operates directly on a `ComputeService` instance, providing better performance. --- .../compute/workload/ComputeServiceHelper.kt | 21 +- .../export/parquet/ParquetComputeMetricExporter.kt | 71 ---- .../export/parquet/ParquetComputeMonitor.kt | 67 ++++ .../org/opendc/experiments/capelin/Portfolio.kt | 40 +- .../experiments/capelin/CapelinIntegrationTest.kt | 148 ++++--- .../opendc-telemetry-compute/build.gradle.kts | 1 + .../telemetry/compute/ComputeMetricReader.kt | 424 +++++++++++++++++++++ .../sdk/metrics/export/CoroutineMetricReader.kt | 1 - .../kotlin/org/opendc/web/runner/OpenDCRunner.kt | 29 +- .../org/opendc/web/runner/internal/JobManager.kt | 2 +- .../runner/internal/WebComputeMetricExporter.kt | 143 ------- .../web/runner/internal/WebComputeMonitor.kt | 142 +++++++ 12 files changed, 759 insertions(+), 330 deletions(-) delete mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt create mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt delete mode 100644 opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt create mode 100644 opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index 4b0b343f..21cfdad2 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.yield +import org.opendc.compute.api.Server import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost @@ -81,9 +82,19 @@ public class ComputeServiceHelper( } /** - * Converge a simulation of the [ComputeService] by replaying the workload trace given by [trace]. + * Run a simulation of the [ComputeService] by replaying the workload trace given by [trace]. + * + * @param trace The trace to simulate. + * @param seed The seed for the simulation. + * @param servers A list to which the created servers is added. + * @param submitImmediately A flag to indicate that the servers are scheduled immediately (so not at their start time). */ - public suspend fun run(trace: List, seed: Long, submitImmediately: Boolean = false) { + public suspend fun run( + trace: List, + seed: Long, + servers: MutableList? = null, + submitImmediately: Boolean = false + ) { val random = Random(seed) val injector = failureModel?.createInjector(context, clock, service, random) val client = service.newClient() @@ -129,12 +140,14 @@ public class ComputeServiceHelper( meta = mapOf("workload" to workload) ) + servers?.add(server) + // Wait for the server reach its end time val endTime = entry.stopTime.toEpochMilli() delay(endTime + workloadOffset - clock.millis() + 5 * 60 * 1000) - // Delete the server after reaching the end-time of the virtual machine - server.delete() + // Stop the server after reaching the end-time of the virtual machine + server.stop() } } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt deleted file mode 100644 index a46885f4..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.export.parquet - -import io.opentelemetry.sdk.common.CompletableResultCode -import org.opendc.telemetry.compute.ComputeMetricExporter -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.telemetry.compute.table.ServerTableReader -import org.opendc.telemetry.compute.table.ServiceTableReader -import java.io.File - -/** - * A [ComputeMonitor] that logs the events to a Parquet file. - */ -public class ParquetComputeMetricExporter(base: File, partition: String, bufferSize: Int) : ComputeMetricExporter() { - private val serverWriter = ParquetServerDataWriter( - File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - private val hostWriter = ParquetHostDataWriter( - File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - private val serviceWriter = ParquetServiceDataWriter( - File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, - bufferSize - ) - - override fun record(reader: ServerTableReader) { - serverWriter.write(reader) - } - - override fun record(reader: HostTableReader) { - hostWriter.write(reader) - } - - override fun record(reader: ServiceTableReader) { - serviceWriter.write(reader) - } - - override fun shutdown(): CompletableResultCode { - hostWriter.close() - serviceWriter.close() - serverWriter.close() - - return CompletableResultCode.ofSuccess() - } -} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt new file mode 100644 index 00000000..6c515118 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServerTableReader +import org.opendc.telemetry.compute.table.ServiceTableReader +import java.io.File + +/** + * A [ComputeMonitor] that logs the events to a Parquet file. + */ +public class ParquetComputeMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { + private val serverWriter = ParquetServerDataWriter( + File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + private val hostWriter = ParquetHostDataWriter( + File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + private val serviceWriter = ParquetServiceDataWriter( + File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + + override fun record(reader: ServerTableReader) { + serverWriter.write(reader) + } + + override fun record(reader: HostTableReader) { + hostWriter.write(reader) + } + + override fun record(reader: ServiceTableReader) { + serviceWriter.write(reader) + } + + override fun close() { + hostWriter.close() + serviceWriter.close() + serverWriter.close() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 0bbf1443..6fd85e8c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -23,12 +23,14 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory +import kotlinx.coroutines.* +import org.opendc.compute.api.Server import org.opendc.compute.workload.ComputeServiceHelper import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.createComputeScheduler -import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter +import org.opendc.compute.workload.export.parquet.ParquetComputeMonitor import org.opendc.compute.workload.grid5000 -import org.opendc.compute.workload.telemetry.SdkTelemetryManager +import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology @@ -37,7 +39,7 @@ import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader +import org.opendc.telemetry.compute.ComputeMetricReader import java.io.File import java.time.Duration import java.util.* @@ -97,7 +99,7 @@ abstract class Portfolio(name: String) : Experiment(name) { else null val (vms, interferenceModel) = workload.source.resolve(workloadLoader, seeder) - val telemetry = SdkTelemetryManager(clock) + val telemetry = NoopTelemetryManager() val runner = ComputeServiceHelper( coroutineContext, clock, @@ -107,23 +109,35 @@ abstract class Portfolio(name: String) : Experiment(name) { interferenceModel?.withSeed(repeat.toLong()) ) - val exporter = ParquetComputeMetricExporter( - File(config.getString("output-path")), - "portfolio_id=$name/scenario_id=$id/run_id=$repeat", - 4096 - ) - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) - val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt")) + val servers = mutableListOf() + val exporter = ComputeMetricReader( + this, + clock, + runner.service, + servers, + ParquetComputeMonitor( + File(config.getString("output-path")), + "portfolio_id=$name/scenario_id=$id/run_id=$repeat", + bufferSize = 4096 + ), + exportInterval = Duration.ofMinutes(5) + ) try { // Instantiate the desired topology runner.apply(topology) - // Run the workload trace - runner.run(vms, seeder.nextLong()) + coroutineScope { + // Run the workload trace + runner.run(vms, seeder.nextLong(), servers) + + // Stop the metric collection + exporter.close() + } } finally { runner.close() + exporter.close() } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 01b2a8fe..62cdf123 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -26,25 +26,23 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import org.opendc.compute.api.Server import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.SdkTelemetryManager +import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.ComputeMetricExporter +import org.opendc.telemetry.compute.ComputeMetricReader +import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.telemetry.compute.table.ServiceData -import org.opendc.telemetry.compute.table.ServiceTableReader -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Duration -import java.time.Instant import java.util.* /** @@ -54,7 +52,7 @@ class CapelinIntegrationTest { /** * The monitor used to keep track of the metrics. */ - private lateinit var exporter: TestComputeMetricExporter + private lateinit var monitor: TestComputeMonitor /** * The [FilterScheduler] to use for all experiments. @@ -67,11 +65,11 @@ class CapelinIntegrationTest { private lateinit var workloadLoader: ComputeWorkloadLoader /** - * Setup the experimental environment. + * Set up the experimental environment. */ @BeforeEach fun setUp() { - exporter = TestComputeMetricExporter() + monitor = TestComputeMonitor() computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)) @@ -85,22 +83,22 @@ class CapelinIntegrationTest { @Test fun testLarge() = runBlockingSimulation { val (workload, _) = createTestWorkload(1.0) - val telemetry = SdkTelemetryManager(clock) val runner = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler ) val topology = createTopology() - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) + val servers = mutableListOf() + val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor) try { runner.apply(topology) - runner.run(workload, 0) + runner.run(workload, 0, servers) - val serviceMetrics = exporter.serviceMetrics + val serviceMetrics = runner.service.getSchedulerStats() println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -116,15 +114,15 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223393683, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, - { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, - { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, - { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, + { assertEquals(223393683, this@CapelinIntegrationTest.monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(66977508, this@CapelinIntegrationTest.monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(3160381, this@CapelinIntegrationTest.monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } }, ) } finally { runner.close() - telemetry.close() + reader.close() } } @@ -135,41 +133,41 @@ class CapelinIntegrationTest { fun testSmall() = runBlockingSimulation { val seed = 1 val (workload, _) = createTestWorkload(0.25, seed) - val telemetry = SdkTelemetryManager(clock) val runner = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler ) val topology = createTopology("single") - - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) + val servers = mutableListOf() + val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor) try { runner.apply(topology) - runner.run(workload, seed.toLong()) + runner.run(workload, seed.toLong(), servers) + val serviceMetrics = runner.service.getSchedulerStats() println( "Scheduler " + - "Success=${exporter.serviceMetrics.attemptsSuccess} " + - "Failure=${exporter.serviceMetrics.attemptsFailure} " + - "Error=${exporter.serviceMetrics.attemptsError} " + - "Pending=${exporter.serviceMetrics.serversPending} " + - "Active=${exporter.serviceMetrics.serversActive}" + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) } finally { runner.close() - telemetry.close() + reader.close() } // Note that these values have been verified beforehand assertAll( - { assertEquals(10999592, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(9741207, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, - { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }, - { assertEquals(7.011413569311495E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } } + { assertEquals(10999592, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(9741207, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(7.011413569311495E8, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } } ) } @@ -181,41 +179,41 @@ class CapelinIntegrationTest { val seed = 0 val (workload, interferenceModel) = createTestWorkload(1.0, seed) - val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler, interferenceModel = interferenceModel?.withSeed(seed.toLong()) ) val topology = createTopology("single") - - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) + val servers = mutableListOf() + val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) try { simulator.apply(topology) - simulator.run(workload, seed.toLong()) + simulator.run(workload, seed.toLong(), servers) + val serviceMetrics = simulator.service.getSchedulerStats() println( "Scheduler " + - "Success=${exporter.serviceMetrics.attemptsSuccess} " + - "Failure=${exporter.serviceMetrics.attemptsFailure} " + - "Error=${exporter.serviceMetrics.attemptsError} " + - "Pending=${exporter.serviceMetrics.serversPending} " + - "Active=${exporter.serviceMetrics.serversActive}" + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) } finally { simulator.close() - telemetry.close() + reader.close() } // Note that these values have been verified beforehand assertAll( - { assertEquals(6028050, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(14712749, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, - { assertEquals(12532907, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(467963, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } + { assertEquals(6028050, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(14712749, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(12532907, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(467963, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } } ) } @@ -225,43 +223,43 @@ class CapelinIntegrationTest { @Test fun testFailures() = runBlockingSimulation { val seed = 1 - val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler, grid5000(Duration.ofDays(7)) ) val topology = createTopology("single") val (workload, _) = createTestWorkload(0.25, seed) - - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) + val servers = mutableListOf() + val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) try { simulator.apply(topology) - simulator.run(workload, seed.toLong()) + simulator.run(workload, seed.toLong(), servers) + val serviceMetrics = simulator.service.getSchedulerStats() println( "Scheduler " + - "Success=${exporter.serviceMetrics.attemptsSuccess} " + - "Failure=${exporter.serviceMetrics.attemptsFailure} " + - "Error=${exporter.serviceMetrics.attemptsError} " + - "Pending=${exporter.serviceMetrics.serversPending} " + - "Active=${exporter.serviceMetrics.serversActive}" + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) } finally { simulator.close() - telemetry.close() + reader.close() } // Note that these values have been verified beforehand assertAll( - { assertEquals(10867345, exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(9607095, exporter.activeTime) { "Active time incorrect" } }, - { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } }, - { assertEquals(2559305056, exporter.uptime) { "Uptime incorrect" } } + { assertEquals(10867345, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(9607095, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(2559305056, monitor.uptime) { "Uptime incorrect" } } ) } @@ -281,8 +279,7 @@ class CapelinIntegrationTest { return stream.use { clusterTopology(stream) } } - class TestComputeMetricExporter : ComputeMetricExporter() { - var serviceMetrics: ServiceData = ServiceData(Instant.ofEpochMilli(0), 0, 0, 0, 0, 0, 0, 0) + class TestComputeMonitor : ComputeMonitor { var idleTime = 0L var activeTime = 0L var stealTime = 0L @@ -290,19 +287,6 @@ class CapelinIntegrationTest { var energyUsage = 0.0 var uptime = 0L - override fun record(reader: ServiceTableReader) { - serviceMetrics = ServiceData( - reader.timestamp, - reader.hostsUp, - reader.hostsDown, - reader.serversPending, - reader.serversActive, - reader.attemptsSuccess, - reader.attemptsFailure, - reader.attemptsError - ) - } - override fun record(reader: HostTableReader) { idleTime += reader.cpuIdleTime activeTime += reader.cpuActiveTime diff --git a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts index 47e30a14..b476a669 100644 --- a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts +++ b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts @@ -30,6 +30,7 @@ plugins { dependencies { api(projects.opendcTelemetry.opendcTelemetrySdk) + implementation(projects.opendcCompute.opendcComputeService) implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt new file mode 100644 index 00000000..593203fc --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt @@ -0,0 +1,424 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import mu.KotlinLogging +import org.opendc.compute.api.Server +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.driver.Host +import org.opendc.telemetry.compute.table.* +import java.time.Clock +import java.time.Duration +import java.time.Instant + +/** + * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every + * export interval. + * + * @param scope The [CoroutineScope] to run the reader in. + * @param clock The virtual clock. + * @param service The [ComputeService] to monitor. + * @param servers The [Server]s to monitor. + * @param monitor The monitor to export the metrics to. + * @param exportInterval The export interval. + */ +public class ComputeMetricReader( + scope: CoroutineScope, + clock: Clock, + private val service: ComputeService, + private val servers: List, + private val monitor: ComputeMonitor, + private val exportInterval: Duration = Duration.ofMinutes(5) +) : AutoCloseable { + private val logger = KotlinLogging.logger {} + + /** + * Aggregator for service metrics. + */ + private val serviceTableReader = ServiceTableReaderImpl(service) + + /** + * Mapping from [Host] instances to [HostTableReaderImpl] + */ + private val hostTableReaders = mutableMapOf() + + /** + * Mapping from [Server] instances to [ServerTableReaderImpl] + */ + private val serverTableReaders = mutableMapOf() + + /** + * The background job that is responsible for collecting the metrics every cycle. + */ + private val job = scope.launch { + val intervalMs = exportInterval.toMillis() + + try { + while (isActive) { + delay(intervalMs) + + try { + val now = clock.instant() + + for (host in service.hosts) { + val reader = hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) } + reader.record(now) + monitor.record(reader) + reader.reset() + } + + for (server in servers) { + val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) } + reader.record(now) + monitor.record(reader) + reader.reset() + } + + serviceTableReader.record(now) + monitor.record(serviceTableReader) + } catch (cause: Throwable) { + logger.warn(cause) { "Exporter threw an Exception" } + } + } + } finally { + if (monitor is AutoCloseable) { + monitor.close() + } + } + } + + override fun close() { + job.cancel() + } + + /** + * An aggregator for service metrics before they are reported. + */ + private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader { + private var _timestamp: Instant = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + override val hostsUp: Int + get() = _hostsUp + private var _hostsUp = 0 + + override val hostsDown: Int + get() = _hostsDown + private var _hostsDown = 0 + + override val serversPending: Int + get() = _serversPending + private var _serversPending = 0 + + override val serversActive: Int + get() = _serversActive + private var _serversActive = 0 + + override val attemptsSuccess: Int + get() = _attemptsSuccess + private var _attemptsSuccess = 0 + + override val attemptsFailure: Int + get() = _attemptsFailure + private var _attemptsFailure = 0 + + override val attemptsError: Int + get() = _attemptsError + private var _attemptsError = 0 + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + _timestamp = now + + val stats = service.getSchedulerStats() + _hostsUp = stats.hostsAvailable + _hostsDown = stats.hostsUnavailable + _serversPending = stats.serversPending + _serversActive = stats.serversActive + _attemptsSuccess = stats.attemptsSuccess.toInt() + _attemptsFailure = stats.attemptsFailure.toInt() + _attemptsError = stats.attemptsError.toInt() + } + } + + /** + * An aggregator for host metrics before they are reported. + */ + private class HostTableReaderImpl(host: Host) : HostTableReader { + private val _host = host + + override val host: HostInfo = HostInfo(host.uid.toString(), host.name, "x86", host.model.cpuCount, host.model.memoryCapacity) + + override val timestamp: Instant + get() = _timestamp + private var _timestamp = Instant.MIN + + override val guestsTerminated: Int + get() = _guestsTerminated + private var _guestsTerminated = 0 + + override val guestsRunning: Int + get() = _guestsRunning + private var _guestsRunning = 0 + + override val guestsError: Int + get() = _guestsError + private var _guestsError = 0 + + override val guestsInvalid: Int + get() = _guestsInvalid + private var _guestsInvalid = 0 + + override val cpuLimit: Double + get() = _cpuLimit + private var _cpuLimit = 0.0 + + override val cpuUsage: Double + get() = _cpuUsage + private var _cpuUsage = 0.0 + + override val cpuDemand: Double + get() = _cpuDemand + private var _cpuDemand = 0.0 + + override val cpuUtilization: Double + get() = _cpuUtilization + private var _cpuUtilization = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + override val powerUsage: Double + get() = _powerUsage + private var _powerUsage = 0.0 + + override val powerTotal: Double + get() = _powerTotal - previousPowerTotal + private var _powerTotal = 0.0 + private var previousPowerTotal = 0.0 + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime = 0L + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime = 0L + private var previousDowntime = 0L + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + val hostCpuStats = _host.getCpuStats() + val hostSysStats = _host.getSystemStats() + + _timestamp = now + _guestsTerminated = hostSysStats.guestsTerminated + _guestsRunning = hostSysStats.guestsRunning + _guestsError = hostSysStats.guestsError + _guestsInvalid = hostSysStats.guestsInvalid + _cpuLimit = hostCpuStats.capacity + _cpuDemand = hostCpuStats.demand + _cpuUsage = hostCpuStats.usage + _cpuUtilization = hostCpuStats.utilization + _cpuActiveTime = hostCpuStats.activeTime + _cpuIdleTime = hostCpuStats.idleTime + _cpuStealTime = hostCpuStats.stealTime + _cpuLostTime = hostCpuStats.lostTime + _powerUsage = hostSysStats.powerUsage + _powerTotal = hostSysStats.energyUsage + _uptime = hostSysStats.uptime.toMillis() + _downtime = hostSysStats.downtime.toMillis() + _bootTime = hostSysStats.bootTime + } + + /** + * Finish the aggregation for this cycle. + */ + fun reset() { + // Reset intermediate state for next aggregation + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + previousPowerTotal = _powerTotal + previousUptime = _uptime + previousDowntime = _downtime + + _guestsTerminated = 0 + _guestsRunning = 0 + _guestsError = 0 + _guestsInvalid = 0 + + _cpuLimit = 0.0 + _cpuUsage = 0.0 + _cpuDemand = 0.0 + _cpuUtilization = 0.0 + + _powerUsage = 0.0 + } + } + + /** + * An aggregator for server metrics before they are reported. + */ + private class ServerTableReaderImpl(private val service: ComputeService, server: Server) : ServerTableReader { + private val _server = server + + /** + * The static information about this server. + */ + override val server = ServerInfo( + server.uid.toString(), + server.name, + "vm", + "x86", + server.image.uid.toString(), + server.image.name, + server.flavor.cpuCount, + server.flavor.memorySize + ) + + /** + * The [HostInfo] of the host on which the server is hosted. + */ + override var host: HostInfo? = null + private var _host: Host? = null + + private var _timestamp = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime: Long = 0 + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime: Long = 0 + private var previousDowntime = 0L + + override val provisionTime: Instant? + get() = _provisionTime + private var _provisionTime: Instant? = null + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + override val cpuLimit: Double + get() = _cpuLimit + private var _cpuLimit = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + val newHost = service.lookupHost(_server) + if (newHost != null && newHost.uid != _host?.uid) { + _host = newHost + host = HostInfo(newHost.uid.toString(), newHost.name, "x86", newHost.model.cpuCount, newHost.model.memoryCapacity) + } + + val cpuStats = _host?.getCpuStats(_server) + val sysStats = _host?.getSystemStats(_server) + + _timestamp = now + _cpuLimit = cpuStats?.capacity ?: 0.0 + _cpuActiveTime = cpuStats?.activeTime ?: 0 + _cpuIdleTime = cpuStats?.idleTime ?: 0 + _cpuStealTime = cpuStats?.stealTime ?: 0 + _cpuLostTime = cpuStats?.lostTime ?: 0 + _uptime = sysStats?.uptime?.toMillis() ?: 0 + _downtime = sysStats?.downtime?.toMillis() ?: 0 + _provisionTime = _server.launchedAt + _bootTime = sysStats?.bootTime + } + + /** + * Finish the aggregation for this cycle. + */ + fun reset() { + previousUptime = _uptime + previousDowntime = _downtime + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + + _host = null + _cpuLimit = 0.0 + } + } +} diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt index a9290c47..ca5da079 100644 --- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt +++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt @@ -31,7 +31,6 @@ import io.opentelemetry.sdk.metrics.export.MetricReaderFactory import kotlinx.coroutines.* import mu.KotlinLogging import java.time.Duration -import java.util.* /** * A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index a150de4e..7c0c43ed 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -23,8 +23,9 @@ package org.opendc.web.runner import mu.KotlinLogging +import org.opendc.compute.api.Server import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.SdkTelemetryManager +import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.HostSpec import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply @@ -35,13 +36,12 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.LinearPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader +import org.opendc.telemetry.compute.ComputeMetricReader import org.opendc.web.client.runner.OpenDCRunnerClient import org.opendc.web.proto.runner.Job import org.opendc.web.proto.runner.Scenario import org.opendc.web.runner.internal.JobManager -import org.opendc.web.runner.internal.WebComputeMetricExporter +import org.opendc.web.runner.internal.WebComputeMonitor import java.io.File import java.time.Duration import java.util.* @@ -180,9 +180,9 @@ public class OpenDCRunner( private val scenario: Scenario, private val repeat: Int, private val topology: Topology, - ) : RecursiveTask() { - override fun compute(): WebComputeMetricExporter.Results { - val exporter = WebComputeMetricExporter() + ) : RecursiveTask() { + override fun compute(): WebComputeMonitor.Results { + val monitor = WebComputeMonitor() // Schedule task that interrupts the simulation if it runs for too long. val currentThread = Thread.currentThread() @@ -206,25 +206,24 @@ public class OpenDCRunner( else null - val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler, failureModel, interferenceModel.takeIf { phenomena.interference } ) - - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter, exportInterval = Duration.ofHours(1))) + val servers = mutableListOf() + val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) try { // Instantiate the topology onto the simulator simulator.apply(topology) // Run workload trace - simulator.run(vms, seeder.nextLong()) + simulator.run(vms, seeder.nextLong(), servers) - val serviceMetrics = collectServiceMetrics(telemetry.metricProducer) + val serviceMetrics = simulator.service.getSchedulerStats() logger.debug { "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -235,14 +234,14 @@ public class OpenDCRunner( } } finally { simulator.close() - telemetry.close() + reader.close() } } } finally { interruptTask.cancel(false) } - return exporter.collectResults() + return monitor.collectResults() } } diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt index 8de0cee4..99b8aaf1 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/JobManager.kt @@ -62,7 +62,7 @@ internal class JobManager(private val client: OpenDCRunnerClient) { /** * Persist the specified results. */ - fun finish(id: Long, results: List) { + fun finish(id: Long, results: List) { client.jobs.update( id, Job.Update( diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt deleted file mode 100644 index 04437a5f..00000000 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMetricExporter.kt +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.web.runner.internal - -import org.opendc.telemetry.compute.ComputeMetricExporter -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.telemetry.compute.table.ServiceTableReader -import kotlin.math.max -import kotlin.math.roundToLong - -/** - * A [ComputeMonitor] that tracks the aggregate metrics for each repeat. - */ -internal class WebComputeMetricExporter : ComputeMetricExporter() { - override fun record(reader: HostTableReader) { - val slices = reader.downtime / SLICE_LENGTH - - hostAggregateMetrics = AggregateHostMetrics( - hostAggregateMetrics.totalActiveTime + reader.cpuActiveTime, - hostAggregateMetrics.totalIdleTime + reader.cpuIdleTime, - hostAggregateMetrics.totalStealTime + reader.cpuStealTime, - hostAggregateMetrics.totalLostTime + reader.cpuLostTime, - hostAggregateMetrics.totalPowerDraw + reader.powerTotal, - hostAggregateMetrics.totalFailureSlices + slices, - hostAggregateMetrics.totalFailureVmSlices + reader.guestsRunning * slices - ) - - hostMetrics.compute(reader.host.id) { _, prev -> - HostMetrics( - reader.cpuUsage + (prev?.cpuUsage ?: 0.0), - reader.cpuDemand + (prev?.cpuDemand ?: 0.0), - reader.guestsRunning + (prev?.instanceCount ?: 0), - 1 + (prev?.count ?: 0) - ) - } - } - - private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() - private val hostMetrics: MutableMap = mutableMapOf() - private val SLICE_LENGTH: Long = 5 * 60L - - private data class AggregateHostMetrics( - val totalActiveTime: Long = 0L, - val totalIdleTime: Long = 0L, - val totalStealTime: Long = 0L, - val totalLostTime: Long = 0L, - val totalPowerDraw: Double = 0.0, - val totalFailureSlices: Double = 0.0, - val totalFailureVmSlices: Double = 0.0, - ) - - private data class HostMetrics( - val cpuUsage: Double, - val cpuDemand: Double, - val instanceCount: Long, - val count: Long - ) - - private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics() - - override fun record(reader: ServiceTableReader) { - serviceMetrics = AggregateServiceMetrics( - max(reader.attemptsSuccess, serviceMetrics.vmTotalCount), - max(reader.serversPending, serviceMetrics.vmWaitingCount), - max(reader.serversActive, serviceMetrics.vmActiveCount), - max(0, serviceMetrics.vmInactiveCount), - max(reader.attemptsFailure, serviceMetrics.vmFailedCount), - ) - } - - private data class AggregateServiceMetrics( - val vmTotalCount: Int = 0, - val vmWaitingCount: Int = 0, - val vmActiveCount: Int = 0, - val vmInactiveCount: Int = 0, - val vmFailedCount: Int = 0 - ) - - /** - * Collect the results of the simulation. - */ - fun collectResults(): Results { - return Results( - hostAggregateMetrics.totalActiveTime, - hostAggregateMetrics.totalIdleTime, - hostAggregateMetrics.totalStealTime, - hostAggregateMetrics.totalLostTime, - hostMetrics.map { it.value.cpuUsage / it.value.count }.average(), - hostMetrics.map { it.value.cpuDemand / it.value.count }.average(), - hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(), - hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, - hostAggregateMetrics.totalPowerDraw, - hostAggregateMetrics.totalFailureSlices.roundToLong(), - hostAggregateMetrics.totalFailureVmSlices.roundToLong(), - serviceMetrics.vmTotalCount, - serviceMetrics.vmWaitingCount, - serviceMetrics.vmInactiveCount, - serviceMetrics.vmFailedCount, - ) - } - - /** - * Structure of the results of a single simulation. - */ - data class Results( - val totalActiveTime: Long, - val totalIdleTime: Long, - val totalStealTime: Long, - val totalLostTime: Long, - val meanCpuUsage: Double, - val meanCpuDemand: Double, - val meanNumDeployedImages: Double, - val maxNumDeployedImages: Double, - val totalPowerDraw: Double, - val totalFailureSlices: Long, - val totalFailureVmSlices: Long, - val totalVmsSubmitted: Int, - val totalVmsQueued: Int, - val totalVmsFinished: Int, - val totalVmsFailed: Int - ) -} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt new file mode 100644 index 00000000..69350d8c --- /dev/null +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.web.runner.internal + +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServiceTableReader +import kotlin.math.max +import kotlin.math.roundToLong + +/** + * A [ComputeMonitor] that tracks the aggregate metrics for each repeat. + */ +internal class WebComputeMonitor : ComputeMonitor { + override fun record(reader: HostTableReader) { + val slices = reader.downtime / SLICE_LENGTH + + hostAggregateMetrics = AggregateHostMetrics( + hostAggregateMetrics.totalActiveTime + reader.cpuActiveTime, + hostAggregateMetrics.totalIdleTime + reader.cpuIdleTime, + hostAggregateMetrics.totalStealTime + reader.cpuStealTime, + hostAggregateMetrics.totalLostTime + reader.cpuLostTime, + hostAggregateMetrics.totalPowerDraw + reader.powerTotal, + hostAggregateMetrics.totalFailureSlices + slices, + hostAggregateMetrics.totalFailureVmSlices + reader.guestsRunning * slices + ) + + hostMetrics.compute(reader.host.id) { _, prev -> + HostMetrics( + reader.cpuUsage + (prev?.cpuUsage ?: 0.0), + reader.cpuDemand + (prev?.cpuDemand ?: 0.0), + reader.guestsRunning + (prev?.instanceCount ?: 0), + 1 + (prev?.count ?: 0) + ) + } + } + + private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics() + private val hostMetrics: MutableMap = mutableMapOf() + private val SLICE_LENGTH: Long = 5 * 60L + + private data class AggregateHostMetrics( + val totalActiveTime: Long = 0L, + val totalIdleTime: Long = 0L, + val totalStealTime: Long = 0L, + val totalLostTime: Long = 0L, + val totalPowerDraw: Double = 0.0, + val totalFailureSlices: Double = 0.0, + val totalFailureVmSlices: Double = 0.0, + ) + + private data class HostMetrics( + val cpuUsage: Double, + val cpuDemand: Double, + val instanceCount: Long, + val count: Long + ) + + private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics() + + override fun record(reader: ServiceTableReader) { + serviceMetrics = AggregateServiceMetrics( + max(reader.attemptsSuccess, serviceMetrics.vmTotalCount), + max(reader.serversPending, serviceMetrics.vmWaitingCount), + max(reader.serversActive, serviceMetrics.vmActiveCount), + max(0, serviceMetrics.vmInactiveCount), + max(reader.attemptsFailure, serviceMetrics.vmFailedCount), + ) + } + + private data class AggregateServiceMetrics( + val vmTotalCount: Int = 0, + val vmWaitingCount: Int = 0, + val vmActiveCount: Int = 0, + val vmInactiveCount: Int = 0, + val vmFailedCount: Int = 0 + ) + + /** + * Collect the results of the simulation. + */ + fun collectResults(): Results { + return Results( + hostAggregateMetrics.totalActiveTime, + hostAggregateMetrics.totalIdleTime, + hostAggregateMetrics.totalStealTime, + hostAggregateMetrics.totalLostTime, + hostMetrics.map { it.value.cpuUsage / it.value.count }.average(), + hostMetrics.map { it.value.cpuDemand / it.value.count }.average(), + hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.average(), + hostMetrics.map { it.value.instanceCount.toDouble() / it.value.count }.maxOrNull() ?: 0.0, + hostAggregateMetrics.totalPowerDraw, + hostAggregateMetrics.totalFailureSlices.roundToLong(), + hostAggregateMetrics.totalFailureVmSlices.roundToLong(), + serviceMetrics.vmTotalCount, + serviceMetrics.vmWaitingCount, + serviceMetrics.vmInactiveCount, + serviceMetrics.vmFailedCount, + ) + } + + /** + * Structure of the results of a single simulation. + */ + data class Results( + val totalActiveTime: Long, + val totalIdleTime: Long, + val totalStealTime: Long, + val totalLostTime: Long, + val meanCpuUsage: Double, + val meanCpuDemand: Double, + val meanNumDeployedImages: Double, + val maxNumDeployedImages: Double, + val totalPowerDraw: Double, + val totalFailureSlices: Long, + val totalFailureVmSlices: Long, + val totalVmsSubmitted: Int, + val totalVmsQueued: Int, + val totalVmsFinished: Int, + val totalVmsFailed: Int + ) +} -- cgit v1.2.3