From 30cd010f1f98262aa7f264bb3c3eb6028b8495c5 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 22 Sep 2021 12:43:01 +0200 Subject: refactor(telemetry): Do not require clock for ComputeMetricExporter This change drops the requirement for a clock parameter when constructing a ComputeMetricExporter, since it will now derive the timestamp from the recorded metrics. --- .../org/opendc/experiments/capelin/Portfolio.kt | 10 ++-- .../experiments/capelin/CapelinIntegrationTest.kt | 59 +++++++++++----------- 2 files changed, 33 insertions(+), 36 deletions(-) (limited to 'opendc-experiments/opendc-experiments-capelin') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 2201a6b4..21ff3ab0 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -27,7 +27,7 @@ import mu.KotlinLogging import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.ComputeWorkloadRunner import org.opendc.compute.workload.createComputeScheduler -import org.opendc.compute.workload.export.parquet.ParquetExportMonitor +import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter import org.opendc.compute.workload.grid5000 import org.opendc.compute.workload.topology.apply import org.opendc.compute.workload.util.PerformanceInterferenceReader @@ -39,7 +39,6 @@ import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File @@ -120,12 +119,12 @@ abstract class Portfolio(name: String) : Experiment(name) { performanceInterferenceModel ) - val monitor = ParquetExportMonitor( + val exporter = ParquetComputeMetricExporter( File(config.getString("output-path")), "portfolio_id=$name/scenario_id=$id/run_id=$repeat", 4096 ) - val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor)) + val metricReader = CoroutineMetricReader(this, runner.producers, exporter) val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt")) try { @@ -137,10 +136,9 @@ abstract class Portfolio(name: String) : Experiment(name) { } finally { runner.close() metricReader.close() - monitor.close() } - val monitorResults = collectServiceMetrics(clock.instant(), runner.producers[0]) + val monitorResults = collectServiceMetrics(runner.producers[0]) logger.debug { "Scheduler " + "Success=${monitorResults.attemptsSuccess} " + diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index ac2ea646..30cc1466 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -39,7 +39,6 @@ import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.ComputeMetricExporter -import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.compute.table.HostData import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader @@ -54,7 +53,7 @@ class CapelinIntegrationTest { /** * The monitor used to keep track of the metrics. */ - private lateinit var monitor: TestExperimentReporter + private lateinit var exporter: TestComputeMetricExporter /** * The [FilterScheduler] to use for all experiments. @@ -71,7 +70,7 @@ class CapelinIntegrationTest { */ @BeforeEach fun setUp() { - monitor = TestExperimentReporter() + exporter = TestComputeMetricExporter() computeScheduler = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)) @@ -91,7 +90,7 @@ class CapelinIntegrationTest { computeScheduler ) val topology = createTopology() - val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor)) + val metricReader = CoroutineMetricReader(this, runner.producers, exporter) try { runner.apply(topology) @@ -101,7 +100,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.instant(), runner.producers[0]) + val serviceMetrics = collectServiceMetrics(runner.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -117,11 +116,11 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223331032, monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(67006568, monitor.activeTime) { "Incorrect active time" } }, - { assertEquals(3159379, monitor.stealTime) { "Incorrect steal time" } }, - { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.841120890240688E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } }, + { assertEquals(223331032, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, + { assertEquals(67006568, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, + { assertEquals(3159379, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.841120890240688E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, ) } @@ -139,7 +138,7 @@ class CapelinIntegrationTest { computeScheduler ) val topology = createTopology("single") - val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) try { simulator.apply(topology) @@ -149,7 +148,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(simulator.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -161,10 +160,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(10998110, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(9740290, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } } + { assertEquals(10998110, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(9740290, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } @@ -188,7 +187,7 @@ class CapelinIntegrationTest { interferenceModel = performanceInterferenceModel ) val topology = createTopology("single") - val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) try { simulator.apply(topology) @@ -198,7 +197,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(simulator.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -210,10 +209,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(6013899, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(14724501, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(12530742, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(473394, monitor.lostTime) { "Lost time incorrect" } } + { assertEquals(6013899, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(14724501, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, + { assertEquals(473394, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } @@ -231,7 +230,7 @@ class CapelinIntegrationTest { ) val topology = createTopology("single") val workload = createTestWorkload(0.25, seed) - val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) try { simulator.apply(topology) @@ -241,7 +240,7 @@ class CapelinIntegrationTest { metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(simulator.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -253,11 +252,11 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(11134319, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(9604081, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(2559005056, monitor.uptime) { "Uptime incorrect" } } + { assertEquals(11134319, exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(9604081, exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } }, + { assertEquals(2559005056, exporter.uptime) { "Uptime incorrect" } } ) } @@ -277,7 +276,7 @@ class CapelinIntegrationTest { return stream.use { clusterTopology(stream) } } - class TestExperimentReporter : ComputeMonitor { + class TestComputeMetricExporter : ComputeMetricExporter() { var idleTime = 0L var activeTime = 0L var stealTime = 0L -- cgit v1.2.3