diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-09 16:10:00 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-02-15 18:08:37 +0100 |
| commit | 02c215ad57e1e4d56c54d22be58e1845bdeebf25 (patch) | |
| tree | 7794b53ca3bb6fa197a118cee92114135be15def /opendc-experiments/opendc-experiments-capelin/src/test | |
| parent | 48c04fb74ee170f58f292b077c62b4da237f507e (diff) | |
refactor: Update OpenTelemetry to version 1.11
This change updates the OpenDC codebase to use OpenTelemetry v1.11,
which stabilizes the metrics API. This stabilization brings quite a few
breaking changes, so significant changes are necessary inside the OpenDC
codebase.
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src/test')
| -rw-r--r-- | opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt | 159 |
1 files changed, 92 insertions, 67 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index eedc3131..aefd8304 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -32,17 +32,20 @@ import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.workload.* +import org.opendc.compute.workload.telemetry.SdkTelemetryManager import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply import org.opendc.compute.workload.util.VmInterferenceModelReader import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.ComputeMetricExporter -import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.ServiceTableReader import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Duration +import java.time.Instant import java.util.* /** @@ -83,44 +86,47 @@ class CapelinIntegrationTest { @Test fun testLarge() = runBlockingSimulation { val workload = createTestWorkload(1.0) + val telemetry = SdkTelemetryManager(clock) val runner = ComputeServiceHelper( coroutineContext, clock, + telemetry, computeScheduler ) val topology = createTopology() - val metricReader = CoroutineMetricReader(this, runner.producers, exporter) + + telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) try { runner.apply(topology) runner.run(workload, 0) + + val serviceMetrics = exporter.serviceMetrics + println( + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" + ) + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, + { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, + { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, + { assertEquals(223388307, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, + { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, + { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, + ) } finally { runner.close() - metricReader.close() + telemetry.close() } - - val serviceMetrics = collectServiceMetrics(runner.producers[0]) - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" - ) - - // Note that these values have been verified beforehand - assertAll( - { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") }, - { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, - { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, - { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223388307, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, - { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, - { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, - { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, - ) } /** @@ -130,33 +136,34 @@ class CapelinIntegrationTest { fun testSmall() = runBlockingSimulation { val seed = 1 val workload = createTestWorkload(0.25, seed) - - val simulator = ComputeServiceHelper( + val telemetry = SdkTelemetryManager(clock) + val runner = ComputeServiceHelper( coroutineContext, clock, + telemetry, computeScheduler ) val topology = createTopology("single") - val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) + + telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) try { - simulator.apply(topology) - simulator.run(workload, seed.toLong()) + runner.apply(topology) + runner.run(workload, seed.toLong()) + + println( + "Scheduler " + + "Success=${exporter.serviceMetrics.attemptsSuccess} " + + "Failure=${exporter.serviceMetrics.attemptsFailure} " + + "Error=${exporter.serviceMetrics.attemptsError} " + + "Pending=${exporter.serviceMetrics.serversPending} " + + "Active=${exporter.serviceMetrics.serversActive}" + ) } finally { - simulator.close() - metricReader.close() + runner.close() + telemetry.close() } - val serviceMetrics = collectServiceMetrics(simulator.producers[0]) - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" - ) - // Note that these values have been verified beforehand assertAll( { assertEquals(10999208, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, @@ -180,33 +187,35 @@ class CapelinIntegrationTest { .read(perfInterferenceInput) .withSeed(seed.toLong()) + val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( coroutineContext, clock, + telemetry, computeScheduler, interferenceModel = performanceInterferenceModel ) val topology = createTopology("single") - val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) + + telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) try { simulator.apply(topology) simulator.run(workload, seed.toLong()) + + println( + "Scheduler " + + "Success=${exporter.serviceMetrics.attemptsSuccess} " + + "Failure=${exporter.serviceMetrics.attemptsFailure} " + + "Error=${exporter.serviceMetrics.attemptsError} " + + "Pending=${exporter.serviceMetrics.serversPending} " + + "Active=${exporter.serviceMetrics.serversActive}" + ) } finally { simulator.close() - metricReader.close() + telemetry.close() } - val serviceMetrics = collectServiceMetrics(simulator.producers[0]) - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" - ) - // Note that these values have been verified beforehand assertAll( { assertEquals(6027666, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, @@ -222,34 +231,36 @@ class CapelinIntegrationTest { @Test fun testFailures() = runBlockingSimulation { val seed = 1 + val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( coroutineContext, clock, + telemetry, computeScheduler, grid5000(Duration.ofDays(7)) ) val topology = createTopology("single") val workload = createTestWorkload(0.25, seed) - val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) + + telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) try { simulator.apply(topology) simulator.run(workload, seed.toLong()) + + println( + "Scheduler " + + "Success=${exporter.serviceMetrics.attemptsSuccess} " + + "Failure=${exporter.serviceMetrics.attemptsFailure} " + + "Error=${exporter.serviceMetrics.attemptsError} " + + "Pending=${exporter.serviceMetrics.serversPending} " + + "Active=${exporter.serviceMetrics.serversActive}" + ) } finally { simulator.close() - metricReader.close() + telemetry.close() } - val serviceMetrics = collectServiceMetrics(simulator.producers[0]) - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" - ) - // Note that these values have been verified beforehand assertAll( { assertEquals(10866961, exporter.idleTime) { "Idle time incorrect" } }, @@ -277,6 +288,7 @@ class CapelinIntegrationTest { } class TestComputeMetricExporter : ComputeMetricExporter() { + var serviceMetrics: ServiceData = ServiceData(Instant.ofEpochMilli(0), 0, 0, 0, 0, 0, 0, 0) var idleTime = 0L var activeTime = 0L var stealTime = 0L @@ -284,6 +296,19 @@ class CapelinIntegrationTest { var energyUsage = 0.0 var uptime = 0L + override fun record(reader: ServiceTableReader) { + serviceMetrics = ServiceData( + reader.timestamp, + reader.hostsUp, + reader.hostsDown, + reader.serversPending, + reader.serversActive, + reader.attemptsSuccess, + reader.attemptsFailure, + reader.attemptsError + ) + } + override fun record(reader: HostTableReader) { idleTime += reader.cpuIdleTime activeTime += reader.cpuActiveTime |
