diff options
Diffstat (limited to 'opendc-experiments')
4 files changed, 99 insertions, 83 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt index 4b35de95..bb9cb201 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt @@ -29,6 +29,7 @@ import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.workload.* +import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.topology.clusterTopology @@ -70,6 +71,7 @@ class CapelinBenchmarks { val runner = ComputeServiceHelper( coroutineContext, clock, + NoopTelemetryManager(), computeScheduler ) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index b548ae58..6604a190 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -29,6 +29,7 @@ import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.createComputeScheduler import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter import org.opendc.compute.workload.grid5000 +import org.opendc.compute.workload.telemetry.SdkTelemetryManager import org.opendc.compute.workload.topology.apply import org.opendc.compute.workload.util.VmInterferenceModelReader import org.opendc.experiments.capelin.model.OperationalPhenomena @@ -38,7 +39,6 @@ import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Duration @@ -109,9 +109,11 @@ abstract class Portfolio(name: String) : Experiment(name) { grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong())) else null + val telemetry = SdkTelemetryManager(clock) val runner = ComputeServiceHelper( coroutineContext, clock, + telemetry, computeScheduler, failureModel, performanceInterferenceModel?.withSeed(repeat.toLong()) @@ -122,7 +124,8 @@ abstract class Portfolio(name: String) : Experiment(name) { "portfolio_id=$name/scenario_id=$id/run_id=$repeat", 4096 ) - val metricReader = CoroutineMetricReader(this, runner.producers, exporter) + telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) + val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt")) try { @@ -133,17 +136,6 @@ abstract class Portfolio(name: String) : Experiment(name) { runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong()) } finally { runner.close() - metricReader.close() - } - - val monitorResults = collectServiceMetrics(runner.producers[0]) - logger.debug { - "Scheduler " + - "Success=${monitorResults.attemptsSuccess} " + - "Failure=${monitorResults.attemptsFailure} " + - "Error=${monitorResults.attemptsError} " + - "Pending=${monitorResults.serversPending} " + - "Active=${monitorResults.serversActive}" } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index eedc3131..aefd8304 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -32,17 +32,20 @@ import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.workload.* +import org.opendc.compute.workload.telemetry.SdkTelemetryManager import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply import org.opendc.compute.workload.util.VmInterferenceModelReader import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.ComputeMetricExporter -import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.ServiceTableReader import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Duration +import java.time.Instant import java.util.* /** @@ -83,44 +86,47 @@ class CapelinIntegrationTest { @Test fun testLarge() = runBlockingSimulation { val workload = createTestWorkload(1.0) + val telemetry = SdkTelemetryManager(clock) val runner = ComputeServiceHelper( coroutineContext, clock, + telemetry, computeScheduler ) val topology = createTopology() - val metricReader = CoroutineMetricReader(this, runner.producers, exporter) + + telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) try { runner.apply(topology) runner.run(workload, 0) + + val serviceMetrics = exporter.serviceMetrics + println( + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" + ) + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, + { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, + { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, + { assertEquals(223388307, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, + { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, + { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, + ) } finally { runner.close() - metricReader.close() + telemetry.close() } - - val serviceMetrics = collectServiceMetrics(runner.producers[0]) - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" - ) - - // Note that these values have been verified beforehand - assertAll( - { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") }, - { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, - { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, - { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223388307, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, - { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, - { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, - { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, - ) } /** @@ -130,33 +136,34 @@ class CapelinIntegrationTest { fun testSmall() = runBlockingSimulation { val seed = 1 val workload = createTestWorkload(0.25, seed) - - val simulator = ComputeServiceHelper( + val telemetry = SdkTelemetryManager(clock) + val runner = ComputeServiceHelper( coroutineContext, clock, + telemetry, computeScheduler ) val topology = createTopology("single") - val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) + + telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) try { - simulator.apply(topology) - simulator.run(workload, seed.toLong()) + runner.apply(topology) + runner.run(workload, seed.toLong()) + + println( + "Scheduler " + + "Success=${exporter.serviceMetrics.attemptsSuccess} " + + "Failure=${exporter.serviceMetrics.attemptsFailure} " + + "Error=${exporter.serviceMetrics.attemptsError} " + + "Pending=${exporter.serviceMetrics.serversPending} " + + "Active=${exporter.serviceMetrics.serversActive}" + ) } finally { - simulator.close() - metricReader.close() + runner.close() + telemetry.close() } - val serviceMetrics = collectServiceMetrics(simulator.producers[0]) - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" - ) - // Note that these values have been verified beforehand assertAll( { assertEquals(10999208, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, @@ -180,33 +187,35 @@ class CapelinIntegrationTest { .read(perfInterferenceInput) .withSeed(seed.toLong()) + val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( coroutineContext, clock, + telemetry, computeScheduler, interferenceModel = performanceInterferenceModel ) val topology = createTopology("single") - val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) + + telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) try { simulator.apply(topology) simulator.run(workload, seed.toLong()) + + println( + "Scheduler " + + "Success=${exporter.serviceMetrics.attemptsSuccess} " + + "Failure=${exporter.serviceMetrics.attemptsFailure} " + + "Error=${exporter.serviceMetrics.attemptsError} " + + "Pending=${exporter.serviceMetrics.serversPending} " + + "Active=${exporter.serviceMetrics.serversActive}" + ) } finally { simulator.close() - metricReader.close() + telemetry.close() } - val serviceMetrics = collectServiceMetrics(simulator.producers[0]) - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" - ) - // Note that these values have been verified beforehand assertAll( { assertEquals(6027666, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, @@ -222,34 +231,36 @@ class CapelinIntegrationTest { @Test fun testFailures() = runBlockingSimulation { val seed = 1 + val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( coroutineContext, clock, + telemetry, computeScheduler, grid5000(Duration.ofDays(7)) ) val topology = createTopology("single") val workload = createTestWorkload(0.25, seed) - val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) + + telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) try { simulator.apply(topology) simulator.run(workload, seed.toLong()) + + println( + "Scheduler " + + "Success=${exporter.serviceMetrics.attemptsSuccess} " + + "Failure=${exporter.serviceMetrics.attemptsFailure} " + + "Error=${exporter.serviceMetrics.attemptsError} " + + "Pending=${exporter.serviceMetrics.serversPending} " + + "Active=${exporter.serviceMetrics.serversActive}" + ) } finally { simulator.close() - metricReader.close() + telemetry.close() } - val serviceMetrics = collectServiceMetrics(simulator.producers[0]) - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" - ) - // Note that these values have been verified beforehand assertAll( { assertEquals(10866961, exporter.idleTime) { "Idle time incorrect" } }, @@ -277,6 +288,7 @@ class CapelinIntegrationTest { } class TestComputeMetricExporter : ComputeMetricExporter() { + var serviceMetrics: ServiceData = ServiceData(Instant.ofEpochMilli(0), 0, 0, 0, 0, 0, 0, 0) var idleTime = 0L var activeTime = 0L var stealTime = 0L @@ -284,6 +296,19 @@ class CapelinIntegrationTest { var energyUsage = 0.0 var uptime = 0L + override fun record(reader: ServiceTableReader) { + serviceMetrics = ServiceData( + reader.timestamp, + reader.hostsUp, + reader.hostsDown, + reader.serversPending, + reader.serversActive, + reader.attemptsSuccess, + reader.attemptsFailure, + reader.attemptsError + ) + } + override fun record(reader: HostTableReader) { idleTime += reader.cpuIdleTime activeTime += reader.cpuActiveTime diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index 1752802f..c751463d 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -23,7 +23,6 @@ package org.opendc.experiments.tf20.core import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter import kotlinx.coroutines.* import org.opendc.simulator.compute.SimBareMetalMachine @@ -82,7 +81,6 @@ public class SimTFDevice( .setDescription("The amount of device resources used") .setUnit("MHz") .build() - .bind(Attributes.of(deviceId, uid.toString())) /** * The power draw of the device. @@ -91,7 +89,6 @@ public class SimTFDevice( .setDescription("The power draw of the device") .setUnit("W") .build() - .bind(Attributes.of(deviceId, uid.toString())) /** * The workload that will be run by the device. |
