summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src/test
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-27 16:41:55 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-07 14:34:30 +0200
commitaaedd4f3eed83d0c3ebc829fec08a1749a2bfba4 (patch)
tree3b43c8da1ab285c4b965a6042215fb694f6ee909 /opendc-experiments/opendc-experiments-capelin/src/test
parentbefec2f1ddf3a6e6d15d9d1b9fd1ecbbc4f38960 (diff)
refactor(capelin): Move metric collection outside Capelin code
This change moves the metric collection outside the Capelin codebase in a separate module so other modules can also benefit from the compute metric collection code.
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src/test')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt92
1 files changed, 51 insertions, 41 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 24d8f768..abd8efeb 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -29,7 +29,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.scheduler.FilterScheduler
import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
@@ -38,7 +37,6 @@ import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.model.Workload
-import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.ParquetTraceReader
import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader
import org.opendc.experiments.capelin.trace.RawParquetTraceReader
@@ -46,6 +44,10 @@ import org.opendc.experiments.capelin.trace.TraceReader
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.telemetry.compute.ComputeMonitor
+import org.opendc.telemetry.compute.collectServiceMetrics
+import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.withMonitor
import java.io.File
import java.util.*
@@ -80,7 +82,6 @@ class CapelinIntegrationTest {
)
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
- lateinit var monitorResults: ComputeMetrics
val meterProvider = createMeterProvider(clock)
withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
@@ -98,7 +99,7 @@ class CapelinIntegrationTest {
null
}
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
traceReader,
@@ -111,15 +112,21 @@ class CapelinIntegrationTest {
failureDomain?.cancel()
}
- monitorResults = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}")
+ val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ println(
+ "Finish " +
+ "SUBMIT=${serviceMetrics.instanceCount} " +
+ "FAIL=${serviceMetrics.failedInstanceCount} " +
+ "QUEUE=${serviceMetrics.queuedInstanceCount} " +
+ "RUNNING=${serviceMetrics.runningInstanceCount}"
+ )
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") },
- { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
- { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
- { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
+ { assertEquals(50, serviceMetrics.instanceCount, "The trace contains 50 VMs") },
+ { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") },
+ { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") },
+ { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") },
{ assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } },
{ assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } },
{ assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } },
@@ -145,7 +152,7 @@ class CapelinIntegrationTest {
val meterProvider = createMeterProvider(clock)
withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
traceReader,
@@ -156,8 +163,14 @@ class CapelinIntegrationTest {
}
}
- val metrics = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+ val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ println(
+ "Finish " +
+ "SUBMIT=${serviceMetrics.instanceCount} " +
+ "FAIL=${serviceMetrics.failedInstanceCount} " +
+ "QUEUE=${serviceMetrics.queuedInstanceCount} " +
+ "RUNNING=${serviceMetrics.runningInstanceCount}"
+ )
// Note that these values have been verified beforehand
assertAll(
@@ -189,7 +202,7 @@ class CapelinIntegrationTest {
val meterProvider = createMeterProvider(clock)
withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler ->
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
traceReader,
@@ -200,8 +213,14 @@ class CapelinIntegrationTest {
}
}
- val metrics = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+ val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ println(
+ "Finish " +
+ "SUBMIT=${serviceMetrics.instanceCount} " +
+ "FAIL=${serviceMetrics.failedInstanceCount} " +
+ "QUEUE=${serviceMetrics.queuedInstanceCount} " +
+ "RUNNING=${serviceMetrics.runningInstanceCount}"
+ )
// Note that these values have been verified beforehand
assertAll(
@@ -239,7 +258,7 @@ class CapelinIntegrationTest {
chan
)
- withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) {
+ withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
traceReader,
@@ -252,8 +271,14 @@ class CapelinIntegrationTest {
failureDomain.cancel()
}
- val metrics = collectMetrics(meterProvider as MetricProducer)
- println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}")
+ val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
+ println(
+ "Finish " +
+ "SUBMIT=${serviceMetrics.instanceCount} " +
+ "FAIL=${serviceMetrics.failedInstanceCount} " +
+ "QUEUE=${serviceMetrics.queuedInstanceCount} " +
+ "RUNNING=${serviceMetrics.runningInstanceCount}"
+ )
// Note that these values have been verified beforehand
assertAll(
@@ -283,34 +308,19 @@ class CapelinIntegrationTest {
return ClusterEnvironmentReader(stream)
}
- class TestExperimentReporter : ExperimentMonitor {
+ class TestExperimentReporter : ComputeMonitor {
var totalWork = 0L
var totalGrantedWork = 0L
var totalOvercommittedWork = 0L
var totalInterferedWork = 0L
var totalPowerDraw = 0.0
- override fun reportHostData(
- time: Long,
- totalWork: Double,
- grantedWork: Double,
- overcommittedWork: Double,
- interferedWork: Double,
- cpuUsage: Double,
- cpuDemand: Double,
- powerDraw: Double,
- instanceCount: Int,
- uptime: Long,
- downtime: Long,
- host: Host,
- ) {
- this.totalWork += totalWork.toLong()
- totalGrantedWork += grantedWork.toLong()
- totalOvercommittedWork += overcommittedWork.toLong()
- totalInterferedWork += interferedWork.toLong()
- totalPowerDraw += powerDraw
+ override fun record(data: HostData) {
+ this.totalWork += data.totalWork.toLong()
+ totalGrantedWork += data.grantedWork.toLong()
+ totalOvercommittedWork += data.overcommittedWork.toLong()
+ totalInterferedWork += data.interferedWork.toLong()
+ totalPowerDraw += data.powerDraw
}
-
- override fun close() {}
}
}