From b14df2a0924774c5aed15cedeb1027abf8ee5361 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 16 Sep 2021 16:52:00 +0200 Subject: refactor(capelin): Make workload sampling model extensible This change updates the workload sampling implementation to be more flexible in the way the workload is constructed. Users can now sample multiple workloads at the same time using multiple samplers and use them as a single workload to simulate. --- .../experiments/capelin/CapelinIntegrationTest.kt | 84 ++++++++++----------- .../resources/trace/bitbrains-small/meta.parquet | Bin 0 -> 2081 bytes .../resources/trace/bitbrains-small/trace.parquet | Bin 0 -> 1647189 bytes .../src/test/resources/trace/meta.parquet | Bin 2081 -> 0 bytes .../src/test/resources/trace/trace.parquet | Bin 1647189 -> 0 bytes 5 files changed, 40 insertions(+), 44 deletions(-) create mode 100644 opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet create mode 100644 opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet delete mode 100644 opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet (limited to 'opendc-experiments/opendc-experiments-capelin/src/test') diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index c1386bfe..140a84db 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -31,18 +31,12 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher -import org.opendc.compute.workload.ComputeWorkloadRunner -import org.opendc.compute.workload.grid5000 +import org.opendc.compute.workload.* import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply -import org.opendc.compute.workload.trace.RawParquetTraceReader -import org.opendc.compute.workload.trace.TraceReader import org.opendc.compute.workload.util.PerformanceInterferenceReader -import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.topology.clusterTopology -import org.opendc.experiments.capelin.trace.ParquetTraceReader import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel -import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor @@ -67,6 +61,11 @@ class CapelinIntegrationTest { */ private lateinit var computeScheduler: FilterScheduler + /** + * The [ComputeWorkloadLoader] responsible for loading the traces. + */ + private lateinit var workloadLoader: ComputeWorkloadLoader + /** * Setup the experimental environment. */ @@ -77,6 +76,7 @@ class CapelinIntegrationTest { filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)) ) + workloadLoader = ComputeWorkloadLoader(File("src/test/resources/trace")) } /** @@ -84,24 +84,24 @@ class CapelinIntegrationTest { */ @Test fun testLarge() = runBlockingSimulation { - val traceReader = createTestTraceReader() - val simulator = ComputeWorkloadRunner( + val workload = createTestWorkload(1.0) + val runner = ComputeWorkloadRunner( coroutineContext, clock, computeScheduler ) val topology = createTopology() - val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) + val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor)) try { - simulator.apply(topology) - simulator.run(traceReader) + runner.apply(topology) + runner.run(workload, 0) } finally { - simulator.close() + runner.close() metricReader.close() } - val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0]) + val serviceMetrics = collectServiceMetrics(clock.instant(), runner.producers[0]) println( "Scheduler " + "Success=${serviceMetrics.attemptsSuccess} " + @@ -117,11 +117,11 @@ class CapelinIntegrationTest { { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223856043, monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(66481557, monitor.activeTime) { "Incorrect active time" } }, - { assertEquals(360441, monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(221949826, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(68421374, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(947010, monitor.stealTime) { "Incorrect steal time" } }, { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.418336360461193E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } }, + { assertEquals(5.783711298639437E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } }, ) } @@ -131,7 +131,7 @@ class CapelinIntegrationTest { @Test fun testSmall() = runBlockingSimulation { val seed = 1 - val traceReader = createTestTraceReader(0.25, seed) + val workload = createTestWorkload(0.25, seed) val simulator = ComputeWorkloadRunner( coroutineContext, @@ -143,7 +143,7 @@ class CapelinIntegrationTest { try { simulator.apply(topology) - simulator.run(traceReader) + simulator.run(workload, seed.toLong()) } finally { simulator.close() metricReader.close() @@ -161,9 +161,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(8545158, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(12195642, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(941038, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -173,9 +173,8 @@ class CapelinIntegrationTest { */ @Test fun testInterference() = runBlockingSimulation { - val seed = 1 - val traceReader = createTestTraceReader(0.25, seed) - + val seed = 0 + val workload = createTestWorkload(1.0, seed) val perfInterferenceInput = checkNotNull(CapelinIntegrationTest::class.java.getResourceAsStream("/bitbrains-perf-interference.json")) val performanceInterferenceModel = PerformanceInterferenceReader() @@ -193,7 +192,7 @@ class CapelinIntegrationTest { try { simulator.apply(topology) - simulator.run(traceReader) + simulator.run(workload, seed.toLong()) } finally { simulator.close() metricReader.close() @@ -211,10 +210,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(9597804, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(11140596, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(326138, monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(925305, monitor.lostTime) { "Lost time incorrect" } } + { assertEquals(8545158, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(12195642, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(941038, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(3378, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -228,15 +227,15 @@ class CapelinIntegrationTest { coroutineContext, clock, computeScheduler, - grid5000(Duration.ofDays(7), seed) + grid5000(Duration.ofDays(7)) ) val topology = createTopology("single") - val traceReader = createTestTraceReader(0.25, seed) + val workload = createTestWorkload(0.25, seed) val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor)) try { simulator.apply(topology) - simulator.run(traceReader) + simulator.run(workload, seed.toLong()) } finally { simulator.close() metricReader.close() @@ -254,23 +253,20 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(9836315, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(10902085, monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(306249, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(8640140, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(12100660, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(939456, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(2540877457, monitor.uptime) { "Uptime incorrect" } } + { assertEquals(2559305056, monitor.uptime) { "Uptime incorrect" } } ) } /** * Obtain the trace reader for the test. */ - private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader { - return ParquetTraceReader( - listOf(RawParquetTraceReader(File("src/test/resources/trace"))), - Workload("test", fraction), - seed - ) + private fun createTestWorkload(fraction: Double, seed: Int = 0): List { + val source = trace("bitbrains-small").sampleByLoad(fraction) + return source.resolve(workloadLoader, Random(seed.toLong())) } /** diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet new file mode 100644 index 00000000..ee76d38f Binary files /dev/null and b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/meta.parquet differ diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet new file mode 100644 index 00000000..9b1cde13 Binary files /dev/null and b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small/trace.parquet differ diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet deleted file mode 100644 index ee76d38f..00000000 Binary files a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/meta.parquet and /dev/null differ diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet b/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet deleted file mode 100644 index 9b1cde13..00000000 Binary files a/opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/trace.parquet and /dev/null differ -- cgit v1.2.3