summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src/test
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-14 14:41:05 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-17 16:48:53 +0200
commit3ca64e0110adab65526a0ccfd5b252e9f047ab10 (patch)
tree9cad172501154d01b00a1e5c45d94d7e2f1e5698 /opendc-experiments/opendc-experiments-capelin/src/test
parent5f0b6b372487d79594cf59010822e160f351e0be (diff)
refactor(telemetry): Create separate MeterProvider per service/host
This change refactors the telemetry implementation by creating a separate MeterProvider per service or host. This means we have to keep track of multiple metric producers, but that we can attach resource information to each of the MeterProviders like we would in a real world scenario.
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src/test')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt190
1 files changed, 94 insertions, 96 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index cf88535d..f4cf3e5e 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -22,7 +22,6 @@
package org.opendc.experiments.capelin
-import io.opentelemetry.sdk.metrics.export.MetricProducer
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
@@ -32,7 +31,6 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
-import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.model.Workload
@@ -40,15 +38,19 @@ import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
import org.opendc.experiments.capelin.trace.TraceReader
+import org.opendc.experiments.capelin.util.ComputeServiceSimulator
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.withMonitor
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
+import java.time.Duration
import java.util.*
+import kotlin.math.roundToLong
/**
* An integration test suite for the Capelin experiments.
@@ -60,11 +62,20 @@ class CapelinIntegrationTest {
private lateinit var monitor: TestExperimentReporter
/**
+ * The [FilterScheduler] to use for all experiments.
+ */
+ private lateinit var computeScheduler: FilterScheduler
+
+ /**
* Setup the experimental environment.
*/
@BeforeEach
fun setUp() {
monitor = TestExperimentReporter()
+ computeScheduler = FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
}
/**
@@ -72,26 +83,26 @@ class CapelinIntegrationTest {
*/
@Test
fun testLarge() = runBlockingSimulation {
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- val meterProvider = createMeterProvider(clock)
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
println(
"Finish " +
"SUBMIT=${serviceMetrics.instanceCount} " +
@@ -106,11 +117,11 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") },
{ assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") },
{ assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") },
- { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } },
- { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } },
- { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } },
+ { assertEquals(220346412191, monitor.totalWork) { "Incorrect requested burst" } },
+ { assertEquals(206667852689, monitor.totalGrantedWork) { "Incorrect granted burst" } },
+ { assertEquals(1151612221, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } },
{ assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } },
- { assertEquals(1.8175860403178412E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } },
+ { assertEquals(9.088769763540529E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } },
)
}
@@ -120,27 +131,26 @@ class CapelinIntegrationTest {
@Test
fun testSmall() = runBlockingSimulation {
val seed = 1
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
println(
"Finish " +
"SUBMIT=${serviceMetrics.instanceCount} " +
@@ -151,9 +161,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
+ { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } },
+ { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } },
+ { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
)
}
@@ -164,10 +174,6 @@ class CapelinIntegrationTest {
@Test
fun testInterference() = runBlockingSimulation {
val seed = 1
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
@@ -177,20 +183,24 @@ class CapelinIntegrationTest {
.read(perfInterferenceInput)
.let { VmInterferenceModel(it, Random(seed.toLong())) }
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler ->
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ interferenceModel = performanceInterferenceModel
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
println(
"Finish " +
"SUBMIT=${serviceMetrics.instanceCount} " +
@@ -201,10 +211,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
- { assertEquals(2960970230, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
+ { assertEquals(39183965664, monitor.totalWork) { "Total work incorrect" } },
+ { assertEquals(35649907631, monitor.totalGrantedWork) { "Total granted work incorrect" } },
+ { assertEquals(1043642275, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
+ { assertEquals(2960974524, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
)
}
@@ -214,39 +224,27 @@ class CapelinIntegrationTest {
@Test
fun testFailures() = runBlockingSimulation {
val seed = 1
- val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
- weighers = listOf(CoreRamWeigher(multiplier = 1.0))
- )
val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
- val meterProvider = createMeterProvider(clock)
-
- withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- val faultInjector =
- createFaultInjector(
- coroutineContext,
- clock,
- scheduler.hosts.map { it as SimHost }.toSet(),
- seed,
- 24.0 * 7,
- )
-
- withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
- faultInjector.start()
- processTrace(
- clock,
- traceReader,
- scheduler,
- monitor
- )
- }
-
- faultInjector.close()
+ val simulator = ComputeServiceSimulator(
+ coroutineContext,
+ clock,
+ computeScheduler,
+ environmentReader.read(),
+ grid5000(Duration.ofDays(7), seed)
+ )
+
+ val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+
+ try {
+ simulator.run(traceReader)
+ } finally {
+ simulator.close()
+ metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ val serviceMetrics = collectServiceMetrics(clock.millis(), simulator.producers[0])
println(
"Finish " +
"SUBMIT=${serviceMetrics.instanceCount} " +
@@ -257,9 +255,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(38385852453, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(34886665781, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(979997253, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
+ { assertEquals(38385856700, monitor.totalWork) { "Total requested work incorrect" } },
+ { assertEquals(34886670127, monitor.totalGrantedWork) { "Total granted work incorrect" } },
+ { assertEquals(979997628, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
)
}
@@ -291,10 +289,10 @@ class CapelinIntegrationTest {
var totalPowerDraw = 0.0
override fun record(data: HostData) {
- this.totalWork += data.totalWork.toLong()
- totalGrantedWork += data.grantedWork.toLong()
- totalOvercommittedWork += data.overcommittedWork.toLong()
- totalInterferedWork += data.interferedWork.toLong()
+ this.totalWork += data.totalWork.roundToLong()
+ totalGrantedWork += data.grantedWork.roundToLong()
+ totalOvercommittedWork += data.overcommittedWork.roundToLong()
+ totalInterferedWork += data.interferedWork.roundToLong()
totalPowerDraw += data.powerDraw
}
}