summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src/test
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-06 17:45:23 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-06 17:45:23 +0200
commit7981e9aa3e6854ad593a5af85f8eb56874299d7e (patch)
tree42cd170822424087ed7ba9f3920bd5f0c7065caa /opendc-experiments/opendc-experiments-capelin/src/test
parent564911a2458b3c54834d5cbfed91f502e9856566 (diff)
refactor(telemetry/compute): Support direct metric access
This change introduces a `ComputeMetricReader` class that can be used as a replacement for the `CoroutineMetricReader` class when reading metrics from the Compute service. This implementation operates directly on a `ComputeService` instance, providing better performance.
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src/test')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt148
1 files changed, 66 insertions, 82 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 01b2a8fe..62cdf123 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -26,25 +26,23 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
+import org.opendc.compute.api.Server
import org.opendc.compute.service.scheduler.FilterScheduler
import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.compute.workload.*
-import org.opendc.compute.workload.telemetry.SdkTelemetryManager
+import org.opendc.compute.workload.telemetry.NoopTelemetryManager
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.telemetry.compute.ComputeMetricExporter
+import org.opendc.telemetry.compute.ComputeMetricReader
+import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.table.HostTableReader
-import org.opendc.telemetry.compute.table.ServiceData
-import org.opendc.telemetry.compute.table.ServiceTableReader
-import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.time.Duration
-import java.time.Instant
import java.util.*
/**
@@ -54,7 +52,7 @@ class CapelinIntegrationTest {
/**
* The monitor used to keep track of the metrics.
*/
- private lateinit var exporter: TestComputeMetricExporter
+ private lateinit var monitor: TestComputeMonitor
/**
* The [FilterScheduler] to use for all experiments.
@@ -67,11 +65,11 @@ class CapelinIntegrationTest {
private lateinit var workloadLoader: ComputeWorkloadLoader
/**
- * Setup the experimental environment.
+ * Set up the experimental environment.
*/
@BeforeEach
fun setUp() {
- exporter = TestComputeMetricExporter()
+ monitor = TestComputeMonitor()
computeScheduler = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
@@ -85,22 +83,22 @@ class CapelinIntegrationTest {
@Test
fun testLarge() = runBlockingSimulation {
val (workload, _) = createTestWorkload(1.0)
- val telemetry = SdkTelemetryManager(clock)
val runner = ComputeServiceHelper(
coroutineContext,
clock,
- telemetry,
+ NoopTelemetryManager(),
computeScheduler
)
val topology = createTopology()
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
+ val servers = mutableListOf<Server>()
+ val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor)
try {
runner.apply(topology)
- runner.run(workload, 0)
+ runner.run(workload, 0, servers)
- val serviceMetrics = exporter.serviceMetrics
+ val serviceMetrics = runner.service.getSchedulerStats()
println(
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
@@ -116,15 +114,15 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
{ assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
- { assertEquals(223393683, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
- { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
- { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
- { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } },
- { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
+ { assertEquals(223393683, this@CapelinIntegrationTest.monitor.idleTime) { "Incorrect idle time" } },
+ { assertEquals(66977508, this@CapelinIntegrationTest.monitor.activeTime) { "Incorrect active time" } },
+ { assertEquals(3160381, this@CapelinIntegrationTest.monitor.stealTime) { "Incorrect steal time" } },
+ { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Incorrect lost time" } },
+ { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } },
)
} finally {
runner.close()
- telemetry.close()
+ reader.close()
}
}
@@ -135,41 +133,41 @@ class CapelinIntegrationTest {
fun testSmall() = runBlockingSimulation {
val seed = 1
val (workload, _) = createTestWorkload(0.25, seed)
- val telemetry = SdkTelemetryManager(clock)
val runner = ComputeServiceHelper(
coroutineContext,
clock,
- telemetry,
+ NoopTelemetryManager(),
computeScheduler
)
val topology = createTopology("single")
-
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
+ val servers = mutableListOf<Server>()
+ val reader = ComputeMetricReader(this, clock, runner.service, servers, monitor)
try {
runner.apply(topology)
- runner.run(workload, seed.toLong())
+ runner.run(workload, seed.toLong(), servers)
+ val serviceMetrics = runner.service.getSchedulerStats()
println(
"Scheduler " +
- "Success=${exporter.serviceMetrics.attemptsSuccess} " +
- "Failure=${exporter.serviceMetrics.attemptsFailure} " +
- "Error=${exporter.serviceMetrics.attemptsError} " +
- "Pending=${exporter.serviceMetrics.serversPending} " +
- "Active=${exporter.serviceMetrics.serversActive}"
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
} finally {
runner.close()
- telemetry.close()
+ reader.close()
}
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(10999592, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(9741207, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
- { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } },
- { assertEquals(7.011413569311495E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }
+ { assertEquals(10999592, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9741207, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } },
+ { assertEquals(7.011413569311495E8, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } }
)
}
@@ -181,41 +179,41 @@ class CapelinIntegrationTest {
val seed = 0
val (workload, interferenceModel) = createTestWorkload(1.0, seed)
- val telemetry = SdkTelemetryManager(clock)
val simulator = ComputeServiceHelper(
coroutineContext,
clock,
- telemetry,
+ NoopTelemetryManager(),
computeScheduler,
interferenceModel = interferenceModel?.withSeed(seed.toLong())
)
val topology = createTopology("single")
-
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
+ val servers = mutableListOf<Server>()
+ val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor)
try {
simulator.apply(topology)
- simulator.run(workload, seed.toLong())
+ simulator.run(workload, seed.toLong(), servers)
+ val serviceMetrics = simulator.service.getSchedulerStats()
println(
"Scheduler " +
- "Success=${exporter.serviceMetrics.attemptsSuccess} " +
- "Failure=${exporter.serviceMetrics.attemptsFailure} " +
- "Error=${exporter.serviceMetrics.attemptsError} " +
- "Pending=${exporter.serviceMetrics.serversPending} " +
- "Active=${exporter.serviceMetrics.serversActive}"
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
} finally {
simulator.close()
- telemetry.close()
+ reader.close()
}
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(6028050, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(14712749, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
- { assertEquals(12532907, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(467963, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
+ { assertEquals(6028050, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(14712749, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(12532907, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(467963, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } }
)
}
@@ -225,43 +223,43 @@ class CapelinIntegrationTest {
@Test
fun testFailures() = runBlockingSimulation {
val seed = 1
- val telemetry = SdkTelemetryManager(clock)
val simulator = ComputeServiceHelper(
coroutineContext,
clock,
- telemetry,
+ NoopTelemetryManager(),
computeScheduler,
grid5000(Duration.ofDays(7))
)
val topology = createTopology("single")
val (workload, _) = createTestWorkload(0.25, seed)
-
- telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
+ val servers = mutableListOf<Server>()
+ val reader = ComputeMetricReader(this, clock, simulator.service, servers, monitor)
try {
simulator.apply(topology)
- simulator.run(workload, seed.toLong())
+ simulator.run(workload, seed.toLong(), servers)
+ val serviceMetrics = simulator.service.getSchedulerStats()
println(
"Scheduler " +
- "Success=${exporter.serviceMetrics.attemptsSuccess} " +
- "Failure=${exporter.serviceMetrics.attemptsFailure} " +
- "Error=${exporter.serviceMetrics.attemptsError} " +
- "Pending=${exporter.serviceMetrics.serversPending} " +
- "Active=${exporter.serviceMetrics.serversActive}"
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
)
} finally {
simulator.close()
- telemetry.close()
+ reader.close()
}
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(10867345, exporter.idleTime) { "Idle time incorrect" } },
- { assertEquals(9607095, exporter.activeTime) { "Active time incorrect" } },
- { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } },
- { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } },
- { assertEquals(2559305056, exporter.uptime) { "Uptime incorrect" } }
+ { assertEquals(10867345, monitor.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9607095, monitor.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
+ { assertEquals(2559305056, monitor.uptime) { "Uptime incorrect" } }
)
}
@@ -281,8 +279,7 @@ class CapelinIntegrationTest {
return stream.use { clusterTopology(stream) }
}
- class TestComputeMetricExporter : ComputeMetricExporter() {
- var serviceMetrics: ServiceData = ServiceData(Instant.ofEpochMilli(0), 0, 0, 0, 0, 0, 0, 0)
+ class TestComputeMonitor : ComputeMonitor {
var idleTime = 0L
var activeTime = 0L
var stealTime = 0L
@@ -290,19 +287,6 @@ class CapelinIntegrationTest {
var energyUsage = 0.0
var uptime = 0L
- override fun record(reader: ServiceTableReader) {
- serviceMetrics = ServiceData(
- reader.timestamp,
- reader.hostsUp,
- reader.hostsDown,
- reader.serversPending,
- reader.serversActive,
- reader.attemptsSuccess,
- reader.attemptsFailure,
- reader.attemptsError
- )
- }
-
override fun record(reader: HostTableReader) {
idleTime += reader.cpuIdleTime
activeTime += reader.cpuActiveTime