diff options
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src/test')
7 files changed, 237 insertions, 125 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 2d5cc68c..e34c5bdc 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -22,185 +22,276 @@ package org.opendc.experiments.capelin -import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.Channel import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.service.driver.Host import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher -import org.opendc.experiments.capelin.model.Workload -import org.opendc.experiments.capelin.monitor.ExperimentMonitor -import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader -import org.opendc.experiments.capelin.trace.Sc20RawParquetTraceReader -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.workload.* +import org.opendc.compute.workload.topology.Topology +import org.opendc.compute.workload.topology.apply +import org.opendc.compute.workload.util.PerformanceInterferenceReader +import org.opendc.experiments.capelin.topology.clusterTopology +import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.telemetry.compute.ComputeMetricExporter +import org.opendc.telemetry.compute.collectServiceMetrics +import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File +import java.time.Duration +import java.util.* /** - * An integration test suite for the SC20 experiments. + * An integration test suite for the Capelin experiments. */ class CapelinIntegrationTest { /** * The monitor used to keep track of the metrics. */ - private lateinit var monitor: TestExperimentReporter + private lateinit var exporter: TestComputeMetricExporter + + /** + * The [FilterScheduler] to use for all experiments. + */ + private lateinit var computeScheduler: FilterScheduler + + /** + * The [ComputeWorkloadLoader] responsible for loading the traces. + */ + private lateinit var workloadLoader: ComputeWorkloadLoader /** * Setup the experimental environment. */ @BeforeEach fun setUp() { - monitor = TestExperimentReporter() + exporter = TestComputeMetricExporter() + computeScheduler = FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) + workloadLoader = ComputeWorkloadLoader(File("src/test/resources/trace")) } + /** + * Test a large simulation setup. + */ @Test fun testLarge() = runBlockingSimulation { - val failures = false - val seed = 0 - val chan = Channel<Unit>(Channel.CONFLATED) - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(CoreMemoryWeigher() to -1.0) + val workload = createTestWorkload(1.0) + val runner = ComputeWorkloadRunner( + coroutineContext, + clock, + computeScheduler ) - val traceReader = createTestTraceReader() - val environmentReader = createTestEnvironmentReader() - lateinit var monitorResults: ComputeMetrics - - val meterProvider = createMeterProvider(clock) - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - val failureDomain = if (failures) { - println("ENABLING failures") - createFailureDomain( - this, - clock, - seed, - 24.0 * 7, - scheduler, - chan - ) - } else { - null - } - - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { - processTrace( - clock, - traceReader, - scheduler, - chan, - monitor - ) - } - - failureDomain?.cancel() + val topology = createTopology() + val metricReader = CoroutineMetricReader(this, runner.producers, exporter) + + try { + runner.apply(topology) + runner.run(workload, 0) + } finally { + runner.close() + metricReader.close() } - monitorResults = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}") + val serviceMetrics = collectServiceMetrics(runner.producers[0]) + println( + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" + ) // Note that these values have been verified beforehand assertAll( - { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") }, - { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, - { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, - { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, - { assertEquals(207389912923, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, - { assertEquals(207122087280, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, - { assertEquals(267825640, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, - { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } + { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, + { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, + { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, + { assertEquals(223325655, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, + { assertEquals(67006560, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, + { assertEquals(3159377, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.840212485920686E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, ) } + /** + * Test a small simulation setup. + */ @Test fun testSmall() = runBlockingSimulation { val seed = 1 - val chan = Channel<Unit>(Channel.CONFLATED) - val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(CoreMemoryWeigher() to -1.0) + val workload = createTestWorkload(0.25, seed) + + val simulator = ComputeWorkloadRunner( + coroutineContext, + clock, + computeScheduler ) - val traceReader = createTestTraceReader(0.5, seed) - val environmentReader = createTestEnvironmentReader("single") - - val meterProvider = createMeterProvider(clock) - - withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { - processTrace( - clock, - traceReader, - scheduler, - chan, - monitor - ) - } + val topology = createTopology("single") + val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) + + try { + simulator.apply(topology) + simulator.run(workload, seed.toLong()) + } finally { + simulator.close() + metricReader.close() } - val metrics = collectMetrics(meterProvider as MetricProducer) - println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") + val serviceMetrics = collectServiceMetrics(simulator.producers[0]) + println( + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" + ) // Note that these values have been verified beforehand assertAll( - { assertEquals(96350072517, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(96330335057, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(19737460, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, - { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } + { assertEquals(10997726, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(9740289, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }, + { assertEquals(7.0099453912813E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } } ) } /** - * Obtain the trace reader for the test. + * Test a small simulation setup with interference. */ - private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> { - return Sc20ParquetTraceReader( - listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))), - emptyMap(), - Workload("test", fraction), - seed + @Test + fun testInterference() = runBlockingSimulation { + val seed = 0 + val workload = createTestWorkload(1.0, seed) + val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json")) + val performanceInterferenceModel = + PerformanceInterferenceReader() + .read(perfInterferenceInput) + .let { VmInterferenceModel(it, Random(seed.toLong())) } + + val simulator = ComputeWorkloadRunner( + coroutineContext, + clock, + computeScheduler, + interferenceModel = performanceInterferenceModel + ) + val topology = createTopology("single") + val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) + + try { + simulator.apply(topology) + simulator.run(workload, seed.toLong()) + } finally { + simulator.close() + metricReader.close() + } + + val serviceMetrics = collectServiceMetrics(simulator.producers[0]) + println( + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" + ) + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(6013515, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(14724500, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, + { assertEquals(481251, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } /** - * Obtain the environment reader for the test. + * Test a small simulation setup with failures. */ - private fun createTestEnvironmentReader(name: String = "topology"): EnvironmentReader { - val stream = object {}.javaClass.getResourceAsStream("/env/$name.txt") - return Sc20ClusterEnvironmentReader(stream) - } + @Test + fun testFailures() = runBlockingSimulation { + val seed = 1 + val simulator = ComputeWorkloadRunner( + coroutineContext, + clock, + computeScheduler, + grid5000(Duration.ofDays(7)) + ) + val topology = createTopology("single") + val workload = createTestWorkload(0.25, seed) + val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) - class TestExperimentReporter : ExperimentMonitor { - var totalRequestedBurst = 0L - var totalGrantedBurst = 0L - var totalOvercommissionedBurst = 0L - var totalInterferedBurst = 0L - - override fun reportHostSlice( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, - cpuUsage: Double, - cpuDemand: Double, - powerDraw: Double, - numberOfDeployedImages: Int, - host: Host, - ) { - totalRequestedBurst += requestedBurst - totalGrantedBurst += grantedBurst - totalOvercommissionedBurst += overcommissionedBurst - totalInterferedBurst += interferedBurst + try { + simulator.apply(topology) + simulator.run(workload, seed.toLong()) + } finally { + simulator.close() + metricReader.close() } - override fun close() {} + val serviceMetrics = collectServiceMetrics(simulator.producers[0]) + println( + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" + ) + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(10865478, exporter.idleTime) { "Idle time incorrect" } }, + { assertEquals(9606177, exporter.activeTime) { "Active time incorrect" } }, + { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } }, + { assertEquals(2559005056, exporter.uptime) { "Uptime incorrect" } } + ) + } + + /** + * Obtain the trace reader for the test. + */ + private fun createTestWorkload(fraction: Double, seed: Int = 0): List<VirtualMachine> { + val source = trace("bitbrains-small").sampleByLoad(fraction) + return source.resolve(workloadLoader, Random(seed.toLong())) + } + + /** + * Obtain the topology factory for the test. + */ + private fun createTopology(name: String = "topology"): Topology { + val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/env/$name.txt")) + return stream.use { clusterTopology(stream) } + } + + class TestComputeMetricExporter : ComputeMetricExporter() { + var idleTime = 0L + var activeTime = 0L + var stealTime = 0L + var lostTime = 0L + var energyUsage = 0.0 + var uptime = 0L + + override fun record(data: HostData) { + idleTime += data.cpuIdleTime + activeTime += data.cpuActiveTime + stealTime += data.cpuStealTime + lostTime += data.cpuLostTime + energyUsage += data.powerTotal + uptime += data.uptime + } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json b/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json new file mode 100644 index 00000000..51fc6366 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/bitbrains-perf-interference.json @@ -0,0 +1,21 @@ +[ + { + "vms": [ + "141", + "379", + "851", + "116" + ], + "minServerLoad": 0.0, + "performanceScore": 0.8830158730158756 + }, + { + "vms": [ + "205", + "116", + "463" + ], + "minServerLoad": 0.0, + "performanceScore": 0.7133055555552751 + } +] diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt index 53b3c2d7..5642003d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt @@ -1,3 +1,3 @@ ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost -A01;A01;8;3.2;64;1;64;8 +A01;A01;8;3.2;128;1;128;8 diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet Binary files differnew file mode 100644 index 00000000..da6e5330 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet Binary files differnew file mode 100644 index 00000000..fe0a254c --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet Binary files differdeleted file mode 100644 index ce7a812c..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet +++ /dev/null diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet Binary files differdeleted file mode 100644 index 1d7ce882..00000000 --- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet +++ /dev/null |
