diff options
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src/test')
2 files changed, 55 insertions, 43 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 2934bbe6..aed9a4bb 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -29,7 +29,6 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.service.driver.Host import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter @@ -38,7 +37,6 @@ import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader @@ -46,6 +44,10 @@ import org.opendc.experiments.capelin.trace.TraceReader import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMonitor +import org.opendc.telemetry.compute.collectServiceMetrics +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.withMonitor import java.io.File import java.util.* @@ -80,7 +82,6 @@ class CapelinIntegrationTest { ) val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - lateinit var monitorResults: ComputeMetrics val meterProvider = createMeterProvider(clock) withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> @@ -98,7 +99,7 @@ class CapelinIntegrationTest { null } - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -111,15 +112,21 @@ class CapelinIntegrationTest { failureDomain?.cancel() } - monitorResults = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( - { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") }, - { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, - { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, - { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, + { assertEquals(50, serviceMetrics.instanceCount, "The trace contains 50 VMs") }, + { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") }, + { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") }, + { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") }, { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } }, { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } }, { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, @@ -145,7 +152,7 @@ class CapelinIntegrationTest { val meterProvider = createMeterProvider(clock) withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -156,8 +163,14 @@ class CapelinIntegrationTest { } } - val metrics = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( @@ -184,12 +197,14 @@ class CapelinIntegrationTest { val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json")) val performanceInterferenceModel = - PerformanceInterferenceReader(perfInterferenceInput).use { VmInterferenceModel(it.read(), Random(seed.toLong())) } + PerformanceInterferenceReader() + .read(perfInterferenceInput) + .let { VmInterferenceModel(it, Random(seed.toLong())) } val meterProvider = createMeterProvider(clock) withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler -> - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -200,8 +215,14 @@ class CapelinIntegrationTest { } } - val metrics = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( @@ -239,7 +260,7 @@ class CapelinIntegrationTest { chan ) - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, @@ -252,8 +273,14 @@ class CapelinIntegrationTest { failureDomain.cancel() } - val metrics = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") + val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + println( + "Finish " + + "SUBMIT=${serviceMetrics.instanceCount} " + + "FAIL=${serviceMetrics.failedInstanceCount} " + + "QUEUE=${serviceMetrics.queuedInstanceCount} " + + "RUNNING=${serviceMetrics.runningInstanceCount}" + ) // Note that these values have been verified beforehand assertAll( @@ -283,32 +310,19 @@ class CapelinIntegrationTest { return ClusterEnvironmentReader(stream) } - class TestExperimentReporter : ExperimentMonitor { + class TestExperimentReporter : ComputeMonitor { var totalWork = 0L var totalGrantedWork = 0L var totalOvercommittedWork = 0L var totalInterferedWork = 0L var totalPowerDraw = 0.0 - override fun reportHostData( - time: Long, - totalWork: Double, - grantedWork: Double, - overcommittedWork: Double, - interferedWork: Double, - cpuUsage: Double, - cpuDemand: Double, - powerDraw: Double, - instanceCount: Int, - host: Host, - ) { - this.totalWork += totalWork.toLong() - totalGrantedWork += grantedWork.toLong() - totalOvercommittedWork += overcommittedWork.toLong() - totalInterferedWork += interferedWork.toLong() - totalPowerDraw += powerDraw + override fun record(data: HostData) { + this.totalWork += data.totalWork.toLong() + totalGrantedWork += data.grantedWork.toLong() + totalOvercommittedWork += data.overcommittedWork.toLong() + totalInterferedWork += data.interferedWork.toLong() + totalPowerDraw += data.powerDraw } - - override fun close() {} } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt index 9b1513dc..fbc39b87 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/trace/PerformanceInterferenceReaderTest.kt @@ -33,9 +33,7 @@ class PerformanceInterferenceReaderTest { @Test fun testSmoke() { val input = checkNotNull(PerformanceInterferenceReader::class.java.getResourceAsStream("/perf-interference.json")) - val reader = PerformanceInterferenceReader(input) - - val result = reader.use { reader.read() } + val result = PerformanceInterferenceReader().read(input) assertAll( { assertEquals(2, result.size) }, |
