summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-09 16:10:00 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-02-15 18:08:37 +0100
commit02c215ad57e1e4d56c54d22be58e1845bdeebf25 (patch)
tree7794b53ca3bb6fa197a118cee92114135be15def /opendc-experiments/opendc-experiments-capelin/src
parent48c04fb74ee170f58f292b077c62b4da237f507e (diff)
refactor: Update OpenTelemetry to version 1.11
This change updates the OpenDC codebase to use OpenTelemetry v1.11, which stabilizes the metrics API. This stabilization brings quite a few breaking changes, so significant changes are necessary inside the OpenDC codebase.
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt18
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt159
3 files changed, 99 insertions, 80 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
index 4b35de95..bb9cb201 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
@@ -29,6 +29,7 @@ import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.compute.workload.*
+import org.opendc.compute.workload.telemetry.NoopTelemetryManager
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
import org.opendc.experiments.capelin.topology.clusterTopology
@@ -70,6 +71,7 @@ class CapelinBenchmarks {
val runner = ComputeServiceHelper(
coroutineContext,
clock,
+ NoopTelemetryManager(),
computeScheduler
)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index b548ae58..6604a190 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -29,6 +29,7 @@ import org.opendc.compute.workload.ComputeWorkloadLoader
import org.opendc.compute.workload.createComputeScheduler
import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter
import org.opendc.compute.workload.grid5000
+import org.opendc.compute.workload.telemetry.SdkTelemetryManager
import org.opendc.compute.workload.topology.apply
import org.opendc.compute.workload.util.VmInterferenceModelReader
import org.opendc.experiments.capelin.model.OperationalPhenomena
@@ -38,7 +39,6 @@ import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.time.Duration
@@ -109,9 +109,11 @@ abstract class Portfolio(name: String) : Experiment(name) {
grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()))
else
null
+ val telemetry = SdkTelemetryManager(clock)
val runner = ComputeServiceHelper(
coroutineContext,
clock,
+ telemetry,
computeScheduler,
failureModel,
performanceInterferenceModel?.withSeed(repeat.toLong())
@@ -122,7 +124,8 @@ abstract class Portfolio(name: String) : Experiment(name) {
"portfolio_id=$name/scenario_id=$id/run_id=$repeat",
4096
)
- val metricReader = CoroutineMetricReader(this, runner.producers, exporter)
+ telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
+
val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt"))
try {
@@ -133,17 +136,6 @@ abstract class Portfolio(name: String) : Experiment(name) {
runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong())
} finally {
runner.close()
- metricReader.close()
- }
-
- val monitorResults = collectServiceMetrics(runner.producers[0])
- logger.debug {
- "Scheduler " +
- "Success=${monitorResults.attemptsSuccess} " +
- "Failure=${monitorResults.attemptsFailure} " +
- "Error=${monitorResults.attemptsError} " +
- "Pending=${monitorResults.serversPending} " +
- "Active=${monitorResults.serversActive}"
}
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index eedc3131..aefd8304 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -32,17 +32,20 @@ import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.compute.workload.*
+import org.opendc.compute.workload.telemetry.SdkTelemetryManager
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
import org.opendc.compute.workload.util.VmInterferenceModelReader
import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.compute.ComputeMetricExporter
-import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.compute.table.HostTableReader
+import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.compute.table.ServiceTableReader
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.time.Duration
+import java.time.Instant
import java.util.*
/**
@@ -83,44 +86,47 @@ class CapelinIntegrationTest {
@Test
fun testLarge() = runBlockingSimulation {
val workload = createTestWorkload(1.0)
+ val telemetry = SdkTelemetryManager(clock)
val runner = ComputeServiceHelper(
coroutineContext,
clock,
+ telemetry,
computeScheduler
)
val topology = createTopology()
- val metricReader = CoroutineMetricReader(this, runner.producers, exporter)
+
+ telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
try {
runner.apply(topology)
runner.run(workload, 0)
+
+ val serviceMetrics = exporter.serviceMetrics
+ println(
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
+ )
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") },
+ { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
+ { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
+ { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
+ { assertEquals(223388307, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
+ { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
+ { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
+ { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } },
+ { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
+ )
} finally {
runner.close()
- metricReader.close()
+ telemetry.close()
}
-
- val serviceMetrics = collectServiceMetrics(runner.producers[0])
- println(
- "Scheduler " +
- "Success=${serviceMetrics.attemptsSuccess} " +
- "Failure=${serviceMetrics.attemptsFailure} " +
- "Error=${serviceMetrics.attemptsError} " +
- "Pending=${serviceMetrics.serversPending} " +
- "Active=${serviceMetrics.serversActive}"
- )
-
- // Note that these values have been verified beforehand
- assertAll(
- { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") },
- { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
- { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
- { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
- { assertEquals(223388307, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
- { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
- { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
- { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } },
- { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
- )
}
/**
@@ -130,33 +136,34 @@ class CapelinIntegrationTest {
fun testSmall() = runBlockingSimulation {
val seed = 1
val workload = createTestWorkload(0.25, seed)
-
- val simulator = ComputeServiceHelper(
+ val telemetry = SdkTelemetryManager(clock)
+ val runner = ComputeServiceHelper(
coroutineContext,
clock,
+ telemetry,
computeScheduler
)
val topology = createTopology("single")
- val metricReader = CoroutineMetricReader(this, simulator.producers, exporter)
+
+ telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
try {
- simulator.apply(topology)
- simulator.run(workload, seed.toLong())
+ runner.apply(topology)
+ runner.run(workload, seed.toLong())
+
+ println(
+ "Scheduler " +
+ "Success=${exporter.serviceMetrics.attemptsSuccess} " +
+ "Failure=${exporter.serviceMetrics.attemptsFailure} " +
+ "Error=${exporter.serviceMetrics.attemptsError} " +
+ "Pending=${exporter.serviceMetrics.serversPending} " +
+ "Active=${exporter.serviceMetrics.serversActive}"
+ )
} finally {
- simulator.close()
- metricReader.close()
+ runner.close()
+ telemetry.close()
}
- val serviceMetrics = collectServiceMetrics(simulator.producers[0])
- println(
- "Scheduler " +
- "Success=${serviceMetrics.attemptsSuccess} " +
- "Failure=${serviceMetrics.attemptsFailure} " +
- "Error=${serviceMetrics.attemptsError} " +
- "Pending=${serviceMetrics.serversPending} " +
- "Active=${serviceMetrics.serversActive}"
- )
-
// Note that these values have been verified beforehand
assertAll(
{ assertEquals(10999208, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
@@ -180,33 +187,35 @@ class CapelinIntegrationTest {
.read(perfInterferenceInput)
.withSeed(seed.toLong())
+ val telemetry = SdkTelemetryManager(clock)
val simulator = ComputeServiceHelper(
coroutineContext,
clock,
+ telemetry,
computeScheduler,
interferenceModel = performanceInterferenceModel
)
val topology = createTopology("single")
- val metricReader = CoroutineMetricReader(this, simulator.producers, exporter)
+
+ telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
try {
simulator.apply(topology)
simulator.run(workload, seed.toLong())
+
+ println(
+ "Scheduler " +
+ "Success=${exporter.serviceMetrics.attemptsSuccess} " +
+ "Failure=${exporter.serviceMetrics.attemptsFailure} " +
+ "Error=${exporter.serviceMetrics.attemptsError} " +
+ "Pending=${exporter.serviceMetrics.serversPending} " +
+ "Active=${exporter.serviceMetrics.serversActive}"
+ )
} finally {
simulator.close()
- metricReader.close()
+ telemetry.close()
}
- val serviceMetrics = collectServiceMetrics(simulator.producers[0])
- println(
- "Scheduler " +
- "Success=${serviceMetrics.attemptsSuccess} " +
- "Failure=${serviceMetrics.attemptsFailure} " +
- "Error=${serviceMetrics.attemptsError} " +
- "Pending=${serviceMetrics.serversPending} " +
- "Active=${serviceMetrics.serversActive}"
- )
-
// Note that these values have been verified beforehand
assertAll(
{ assertEquals(6027666, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
@@ -222,34 +231,36 @@ class CapelinIntegrationTest {
@Test
fun testFailures() = runBlockingSimulation {
val seed = 1
+ val telemetry = SdkTelemetryManager(clock)
val simulator = ComputeServiceHelper(
coroutineContext,
clock,
+ telemetry,
computeScheduler,
grid5000(Duration.ofDays(7))
)
val topology = createTopology("single")
val workload = createTestWorkload(0.25, seed)
- val metricReader = CoroutineMetricReader(this, simulator.producers, exporter)
+
+ telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
try {
simulator.apply(topology)
simulator.run(workload, seed.toLong())
+
+ println(
+ "Scheduler " +
+ "Success=${exporter.serviceMetrics.attemptsSuccess} " +
+ "Failure=${exporter.serviceMetrics.attemptsFailure} " +
+ "Error=${exporter.serviceMetrics.attemptsError} " +
+ "Pending=${exporter.serviceMetrics.serversPending} " +
+ "Active=${exporter.serviceMetrics.serversActive}"
+ )
} finally {
simulator.close()
- metricReader.close()
+ telemetry.close()
}
- val serviceMetrics = collectServiceMetrics(simulator.producers[0])
- println(
- "Scheduler " +
- "Success=${serviceMetrics.attemptsSuccess} " +
- "Failure=${serviceMetrics.attemptsFailure} " +
- "Error=${serviceMetrics.attemptsError} " +
- "Pending=${serviceMetrics.serversPending} " +
- "Active=${serviceMetrics.serversActive}"
- )
-
// Note that these values have been verified beforehand
assertAll(
{ assertEquals(10866961, exporter.idleTime) { "Idle time incorrect" } },
@@ -277,6 +288,7 @@ class CapelinIntegrationTest {
}
class TestComputeMetricExporter : ComputeMetricExporter() {
+ var serviceMetrics: ServiceData = ServiceData(Instant.ofEpochMilli(0), 0, 0, 0, 0, 0, 0, 0)
var idleTime = 0L
var activeTime = 0L
var stealTime = 0L
@@ -284,6 +296,19 @@ class CapelinIntegrationTest {
var energyUsage = 0.0
var uptime = 0L
+ override fun record(reader: ServiceTableReader) {
+ serviceMetrics = ServiceData(
+ reader.timestamp,
+ reader.hostsUp,
+ reader.hostsDown,
+ reader.serversPending,
+ reader.serversActive,
+ reader.attemptsSuccess,
+ reader.attemptsFailure,
+ reader.attemptsError
+ )
+ }
+
override fun record(reader: HostTableReader) {
idleTime += reader.cpuIdleTime
activeTime += reader.cpuActiveTime