From 7981e9aa3e6854ad593a5af85f8eb56874299d7e Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 6 May 2022 17:45:23 +0200 Subject: refactor(telemetry/compute): Support direct metric access This change introduces a `ComputeMetricReader` class that can be used as a replacement for the `CoroutineMetricReader` class when reading metrics from the Compute service. This implementation operates directly on a `ComputeService` instance, providing better performance. --- .../org/opendc/experiments/capelin/Portfolio.kt | 40 ++++-- .../experiments/capelin/CapelinIntegrationTest.kt | 148 +++++++++------------ 2 files changed, 93 insertions(+), 95 deletions(-) (limited to 'opendc-experiments/opendc-experiments-capelin/src') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 0bbf1443..6fd85e8c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -23,12 +23,14 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory +import kotlinx.coroutines.* +import org.opendc.compute.api.Server import org.opendc.compute.workload.ComputeServiceHelper import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.createComputeScheduler -import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter +import org.opendc.compute.workload.export.parquet.ParquetComputeMonitor import org.opendc.compute.workload.grid5000 -import org.opendc.compute.workload.telemetry.SdkTelemetryManager +import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology @@ -37,7 +39,7 @@ import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader +import org.opendc.telemetry.compute.ComputeMetricReader import java.io.File import java.time.Duration import java.util.* @@ -97,7 +99,7 @@ abstract class Portfolio(name: String) : Experiment(name) { else null val (vms, interferenceModel) = workload.source.resolve(workloadLoader, seeder) - val telemetry = SdkTelemetryManager(clock) + val telemetry = NoopTelemetryManager() val runner = ComputeServiceHelper( coroutineContext, clock, @@ -107,23 +109,35 @@ abstract class Portfolio(name: String) : Experiment(name) { interferenceModel?.withSeed(repeat.toLong()) ) - val exporter = ParquetComputeMetricExporter( - File(config.getString("output-path")), - "portfolio_id=$name/scenario_id=$id/run_id=$repeat", - 4096 - ) - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) - val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt")) + val servers = mutableListOf() + val exporter = ComputeMetricReader( + this, + clock, + runner.service, + servers, + ParquetComputeMonitor( + File(config.getString("output-path")), + "portfolio_id=$name/scenario_id=$id/run_id=$repeat", + bufferSize = 4096 + ), + exportInterval = Duration.ofMinutes(5) + ) try { // Instantiate the desired topology runner.apply(topology) - // Run the workload trace - runner.run(vms, seeder.nextLong()) + coroutineScope { + // Run the workload trace + runner.run(vms, seeder.nextLong(), servers) + + // Stop the metric collection + exporter.close() + } } finally { runner.close() + exporter.close() } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 01b2a8fe..62cdf123 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -26,25 +26,23 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import org.opendc.compute.api.Server import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.SdkTelemetryManager +import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.ComputeMetricExporter +import org.opendc.telemetry.compute.ComputeMetricReader +import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.telemetry.compute.table.ServiceData -import org.opendc.telemetry.compute.table.ServiceTableReader -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Duration -import java.time.Instant import java.util.* /** @@ -54,7 +52,7 @@ class CapelinIntegrationTest { /** * The monitor used to keep track of the metrics. */ - private lateinit var exporter: TestComputeMetricExporter + private lateinit var monitor: TestComputeMonitor /** * The [FilterScheduler] to use for all experiments. @@ -67,11 +65,11 @@ class CapelinIntegrationTest { private lateinit var workloadLoader: ComputeWorkloadLoader /** - * Setup the experimental environment. + * Set up the experimental environment. */ @BeforeEach fun setUp() { - exporter = TestComputeMetricExporter() + monitor = TestComputeMonitor() computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)) @@ -85,22 +83,22 @@ class CapelinIntegrationTest { @Test fun testLarge() = runBlockingSimulation { val (workload, _) = createTestWorkload(1.0) - val telemetry = SdkTelemetryManager(clock) val runner = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler ) val topology = createTopology() - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) + val servers = mutableListOf() + val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor) try { runner.apply(topology) - runner.run(workload, 0) + runner.run(workload, 0, servers) - val serviceMetrics = exporter.serviceMetrics + val serviceMetrics = runner.service.getSchedulerStats() println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -116,15 +114,15 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223393683, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, - { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, - { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, - { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, + { assertEquals(223393683, this@CapelinIntegrationTest.monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(66977508, this@CapelinIntegrationTest.monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(3160381, this@CapelinIntegrationTest.monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } }, ) } finally { runner.close() - telemetry.close() + reader.close() } } @@ -135,41 +133,41 @@ class CapelinIntegrationTest { fun testSmall() = runBlockingSimulation { val seed = 1 val (workload, _) = createTestWorkload(0.25, seed) - val telemetry = SdkTelemetryManager(clock) val runner = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler ) val topology = createTopology("single") - - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) + val servers = mutableListOf() + val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor) try { runner.apply(topology) - runner.run(workload, seed.toLong()) + runner.run(workload, seed.toLong(), servers) + val serviceMetrics = runner.service.getSchedulerStats() println( "Scheduler " + - "Success=${exporter.serviceMetrics.attemptsSuccess} " + - "Failure=${exporter.serviceMetrics.attemptsFailure} " + - "Error=${exporter.serviceMetrics.attemptsError} " + - "Pending=${exporter.serviceMetrics.serversPending} " + - "Active=${exporter.serviceMetrics.serversActive}" + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) } finally { runner.close() - telemetry.close() + reader.close() } // Note that these values have been verified beforehand assertAll( - { assertEquals(10999592, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(9741207, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, - { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }, - { assertEquals(7.011413569311495E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } } + { assertEquals(10999592, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(9741207, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(7.011413569311495E8, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } } ) } @@ -181,41 +179,41 @@ class CapelinIntegrationTest { val seed = 0 val (workload, interferenceModel) = createTestWorkload(1.0, seed) - val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler, interferenceModel = interferenceModel?.withSeed(seed.toLong()) ) val topology = createTopology("single") - - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) + val servers = mutableListOf() + val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) try { simulator.apply(topology) - simulator.run(workload, seed.toLong()) + simulator.run(workload, seed.toLong(), servers) + val serviceMetrics = simulator.service.getSchedulerStats() println( "Scheduler " + - "Success=${exporter.serviceMetrics.attemptsSuccess} " + - "Failure=${exporter.serviceMetrics.attemptsFailure} " + - "Error=${exporter.serviceMetrics.attemptsError} " + - "Pending=${exporter.serviceMetrics.serversPending} " + - "Active=${exporter.serviceMetrics.serversActive}" + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) } finally { simulator.close() - telemetry.close() + reader.close() } // Note that these values have been verified beforehand assertAll( - { assertEquals(6028050, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(14712749, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, - { assertEquals(12532907, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(467963, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } + { assertEquals(6028050, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(14712749, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(12532907, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(467963, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } } ) } @@ -225,43 +223,43 @@ class CapelinIntegrationTest { @Test fun testFailures() = runBlockingSimulation { val seed = 1 - val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( coroutineContext, clock, - telemetry, + NoopTelemetryManager(), computeScheduler, grid5000(Duration.ofDays(7)) ) val topology = createTopology("single") val (workload, _) = createTestWorkload(0.25, seed) - - telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) + val servers = mutableListOf() + val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor) try { simulator.apply(topology) - simulator.run(workload, seed.toLong()) + simulator.run(workload, seed.toLong(), servers) + val serviceMetrics = simulator.service.getSchedulerStats() println( "Scheduler " + - "Success=${exporter.serviceMetrics.attemptsSuccess} " + - "Failure=${exporter.serviceMetrics.attemptsFailure} " + - "Error=${exporter.serviceMetrics.attemptsError} " + - "Pending=${exporter.serviceMetrics.serversPending} " + - "Active=${exporter.serviceMetrics.serversActive}" + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) } finally { simulator.close() - telemetry.close() + reader.close() } // Note that these values have been verified beforehand assertAll( - { assertEquals(10867345, exporter.idleTime) { "Idle time incorrect" } }, - { assertEquals(9607095, exporter.activeTime) { "Active time incorrect" } }, - { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } }, - { assertEquals(2559305056, exporter.uptime) { "Uptime incorrect" } } + { assertEquals(10867345, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(9607095, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(2559305056, monitor.uptime) { "Uptime incorrect" } } ) } @@ -281,8 +279,7 @@ class CapelinIntegrationTest { return stream.use { clusterTopology(stream) } } - class TestComputeMetricExporter : ComputeMetricExporter() { - var serviceMetrics: ServiceData = ServiceData(Instant.ofEpochMilli(0), 0, 0, 0, 0, 0, 0, 0) + class TestComputeMonitor : ComputeMonitor { var idleTime = 0L var activeTime = 0L var stealTime = 0L @@ -290,19 +287,6 @@ class CapelinIntegrationTest { var energyUsage = 0.0 var uptime = 0L - override fun record(reader: ServiceTableReader) { - serviceMetrics = ServiceData( - reader.timestamp, - reader.hostsUp, - reader.hostsDown, - reader.serversPending, - reader.serversActive, - reader.attemptsSuccess, - reader.attemptsFailure, - reader.attemptsError - ) - } - override fun record(reader: HostTableReader) { idleTime += reader.cpuIdleTime activeTime += reader.cpuActiveTime -- cgit v1.2.3