diff options
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src/test')
| -rw-r--r-- | opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt | 262 |
1 files changed, 133 insertions, 129 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 44cf92a8..727530e3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.capelin -import io.opentelemetry.sdk.metrics.export.MetricProducer import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -32,7 +31,6 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.model.Workload @@ -40,14 +38,17 @@ import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.experiments.capelin.trace.PerformanceInterferenceReader import org.opendc.experiments.capelin.trace.RawParquetTraceReader import org.opendc.experiments.capelin.trace.TraceReader +import org.opendc.experiments.capelin.util.ComputeServiceSimulator import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.withMonitor +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File +import java.time.Duration import java.util.* /** @@ -60,11 +61,20 @@ class CapelinIntegrationTest { private lateinit var monitor: TestExperimentReporter /** + * The [FilterScheduler] to use for all experiments. + */ + private lateinit var computeScheduler: FilterScheduler + + /** * Setup the experimental environment. */ @BeforeEach fun setUp() { monitor = TestExperimentReporter() + computeScheduler = FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) } /** @@ -72,45 +82,46 @@ class CapelinIntegrationTest { */ @Test fun testLarge() = runBlockingSimulation { - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - val meterProvider = createMeterProvider(clock) - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand assertAll( - { assertEquals(50, serviceMetrics.instanceCount, "The trace contains 50 VMs") }, - { assertEquals(0, serviceMetrics.runningInstanceCount, "All VMs should finish after a run") }, - { assertEquals(0, serviceMetrics.failedInstanceCount, "No VM should not be unscheduled") }, - { assertEquals(0, serviceMetrics.queuedInstanceCount, "No VM should not be in the queue") }, - { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } }, - { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } }, - { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, - { assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } }, - { assertEquals(1.7671768767192196E7, monitor.totalPowerDraw, 0.01) { "Incorrect power draw" } }, + { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, + { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, + { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, + { assertEquals(223856043, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(66481557, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(360441, monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.418336360461193E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } }, ) } @@ -120,41 +131,41 @@ class CapelinIntegrationTest { @Test fun testSmall() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand assertAll( - { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -164,10 +175,6 @@ class CapelinIntegrationTest { @Test fun testInterference() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") @@ -177,34 +184,39 @@ class CapelinIntegrationTest { .read(perfInterferenceInput) .let { VmInterferenceModel(it, Random(seed.toLong())) } - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy, performanceInterferenceModel) { scheduler -> - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + interferenceModel = performanceInterferenceModel + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand assertAll( - { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(2960970230, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(925305, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -214,53 +226,43 @@ class CapelinIntegrationTest { @Test fun testFailures() = runBlockingSimulation { val seed = 1 - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - val faultInjector = - createFaultInjector( - coroutineContext, - clock, - scheduler.hosts.map { it as SimHost }.toSet(), - seed, - 24.0 * 7, - ) - - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { - faultInjector.start() - processTrace( - clock, - traceReader, - scheduler, - monitor - ) - } - - faultInjector.close() + val simulator = ComputeServiceSimulator( + coroutineContext, + clock, + computeScheduler, + environmentReader.read(), + grid5000(Duration.ofDays(7), seed) + ) + + val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + + try { + simulator.run(traceReader) + } finally { + simulator.close() + metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) + val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) println( - "Finish " + - "SUBMIT=${serviceMetrics.instanceCount} " + - "FAIL=${serviceMetrics.failedInstanceCount} " + - "QUEUE=${serviceMetrics.queuedInstanceCount} " + - "RUNNING=${serviceMetrics.runningInstanceCount}" + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" ) // Note that these values have been verified beforehand assertAll( - { assertEquals(38385852453, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(34886665781, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(979997253, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(9836315, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(10902085, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(306249, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(2540877457, monitor.uptime) { "Uptime incorrect" } } ) } @@ -284,18 +286,20 @@ class CapelinIntegrationTest { } class TestExperimentReporter : ComputeMonitor { - var totalWork = 0L - var totalGrantedWork = 0L - var totalOvercommittedWork = 0L - var totalInterferedWork = 0L - var totalPowerDraw = 0.0 + var idleTime = 0L + var activeTime = 0L + var stealTime = 0L + var lostTime = 0L + var energyUsage = 0.0 + var uptime = 0L override fun record(data: HostData) { - this.totalWork += data.totalWork.toLong() - totalGrantedWork += data.grantedWork.toLong() - totalOvercommittedWork += data.overcommittedWork.toLong() - totalInterferedWork += data.interferedWork.toLong() - totalPowerDraw += data.powerDraw + idleTime += data.cpuIdleTime + activeTime += data.cpuActiveTime + stealTime += data.cpuStealTime + lostTime += data.cpuLostTime + energyUsage += data.powerTotal + uptime += data.uptime } } } |
