diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-07 16:04:46 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-09-07 16:04:46 +0200 |
| commit | 3eda751b725448139217dc1929dca1fc354e2a4e (patch) | |
| tree | 11d933753c515140a6ae846fe96448ad64b165aa /opendc-experiments/opendc-experiments-capelin/src/test | |
| parent | eb4de7f832c6d26725e0d7c29644c704ea82604e (diff) | |
| parent | 18ff316a6b6ab984ebf8283ea48ed98ec69d8295 (diff) | |
merge: Prepare for risk analysis experiments
This pull request adds the necessary code in preparation for the risk analysis experiments:
- Track provisioning time
- Track host up/down time
- Track guest up/down time
- Support overcommitted memory
- Do not fail inactive guests
- Mark unschedulable server as terminated
- Make ExperimentMonitor optional for trace processing
- Report up/downtime metrics in experiment monitor
- Move metric collection outside Capelin code
- Resolve kotlin-reflect incompatibility
- Restructure input reading classes
**Breaking API Changes**
- `ExperimentMonitor` replaced in favour of `ComputeMonitor`
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src/test')
2 files changed, 55 insertions, 43 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 2934bbe6..aed9a4bb 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -29,7 +29,6 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.service.driver.Host import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter @@ -38,7 +37,6 @@ import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader @@ -46,6 +44,10 @@ import org.opendc.experiments.capelin.trace.TraceReader import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.collectServiceMetrics +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.withMonitor import java.io.File import java.util.* @@ -80,7 +82,6 @@ class CapelinIntegrationTest { ) val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - lateinit var monitorResults: ComputeMetrics val meterProvider = createMeterProvider(clock) withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> @@ -98,7 +99,7 @@ class CapelinIntegrationTest { null } - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -111,15 +112,21 @@ class CapelinIntegrationTest { failureDomain?.cancel() } - monitorResults = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( - { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") }, - { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, - { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, - { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, + { assertEquals(50, serviceMetrics.instanceCount, "The trace contains 50 VMs") }, + { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") }, + { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") }, + { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") }, { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } }, { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } }, { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, @@ -145,7 +152,7 @@ class CapelinIntegrationTest { val meterProvider = createMeterProvider(clock) withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -156,8 +163,14 @@ class CapelinIntegrationTest { } } - val metrics = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( @@ -184,12 +197,14 @@ class CapelinIntegrationTest { val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json")) val performanceInterferenceModel = - PerformanceInterferenceReader(perfInterferenceInput).use { VmInterferenceModel(it.read(), Random(seed.toLong())) } + PerformanceInterferenceReader() + .read(perfInterferenceInput) + .let { VmInterferenceModel(it, Random(seed.toLong())) } val meterProvider = createMeterProvider(clock) withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler -> - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -200,8 +215,14 @@ class CapelinIntegrationTest { } } - val metrics = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( @@ -239,7 +260,7 @@ class CapelinIntegrationTest { chan ) - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -252,8 +273,14 @@ class CapelinIntegrationTest { failureDomain.cancel() } - val metrics = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( @@ -283,32 +310,19 @@ class CapelinIntegrationTest { return ClusterEnvironmentReader(stream) } - class TestExperimentReporter : ExperimentMonitor { + class TestExperimentReporter : ComputeMonitor { var totalWork = 0L var totalGrantedWork = 0L var totalOvercommittedWork = 0L var totalInterferedWork = 0L var totalPowerDraw = 0.0 - override fun reportHostData( - time: Long, - totalWork: Double, - grantedWork: Double, - overcommittedWork: Double, - interferedWork: Double, - cpuUsage: Double, - cpuDemand: Double, - powerDraw: Double, - instanceCount: Int, - host: Host, - ) { - this.totalWork += totalWork.toLong() - totalGrantedWork += grantedWork.toLong() - totalOvercommittedWork += overcommittedWork.toLong() - totalInterferedWork += interferedWork.toLong() - totalPowerDraw += powerDraw + override fun record(data: HostData) { + this.totalWork += data.totalWork.toLong() + totalGrantedWork += data.grantedWork.toLong() + totalOvercommittedWork += data.overcommittedWork.toLong() + totalInterferedWork += data.interferedWork.toLong() + totalPowerDraw += data.powerDraw } - - override fun close() {} } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt index 9b1513dc..fbc39b87 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt @@ -33,9 +33,7 @@ class PerformanceInterferenceReaderTest { @Test fun testSmoke() { val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json")) - val reader = PerformanceInterferenceReader(input) - - val result = reader.use { reader.read() } + val result = PerformanceInterferenceReader().read(input) assertAll( { assertEquals(2, result.size) }, |
