diff options
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src')
4 files changed, 241 insertions, 177 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt index c09ce96a..378c3833 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt @@ -22,6 +22,7 @@ package org.opendc.experiments.capelin +import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter @@ -29,8 +30,10 @@ import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.workload.* import org.opendc.compute.workload.topology.Topology -import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.topology.clusterTopology +import org.opendc.experiments.compute.setupComputeService +import org.opendc.experiments.compute.setupHosts +import org.opendc.experiments.provisioner.Provisioner import org.opendc.simulator.core.runBlockingSimulation import org.openjdk.jmh.annotations.* import java.io.File @@ -54,29 +57,27 @@ class CapelinBenchmarks { @Setup fun setUp() { val loader = ComputeWorkloadLoader(File("src/test/resources/trace")) - val source = trace("bitbrains-small") vms = trace("bitbrains-small").resolve(loader, Random(1L)) topology = checkNotNull(object {}.javaClass.getResourceAsStream("/topology.txt")).use { clusterTopology(it) } } @Benchmark fun benchmarkCapelin() = runBlockingSimulation { - val computeScheduler = FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)) - ) - val runner = ComputeServiceHelper( - coroutineContext, - clock, - computeScheduler, - seed = 0L - ) + val serviceDomain = "compute.opendc.org" - try { - runner.apply(topology, isOptimized) - runner.run(vms, interference = true) - } finally { - runner.close() + Provisioner(coroutineContext, clock, seed = 0).use { provisioner -> + val computeScheduler = FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) + + provisioner.runSteps( + setupComputeService(serviceDomain, { computeScheduler }), + setupHosts(serviceDomain, topology.resolve(), optimize = isOptimized) + ) + + val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! + service.replay(clock, vms, 0L, interference = true) } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt index dbb5ced3..6bd470f3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/CapelinRunner.kt @@ -22,15 +22,16 @@ package org.opendc.experiments.capelin -import org.opendc.compute.workload.ComputeServiceHelper +import org.opendc.compute.service.ComputeService import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.createComputeScheduler import org.opendc.compute.workload.export.parquet.ParquetComputeMonitor import org.opendc.compute.workload.grid5000 -import org.opendc.compute.workload.telemetry.ComputeMetricReader -import org.opendc.compute.workload.topology.apply +import org.opendc.compute.workload.replay import org.opendc.experiments.capelin.model.Scenario import org.opendc.experiments.capelin.topology.clusterTopology +import org.opendc.experiments.compute.* +import org.opendc.experiments.provisioner.Provisioner import org.opendc.simulator.core.runBlockingSimulation import java.io.File import java.time.Duration @@ -58,54 +59,41 @@ public class CapelinRunner( * Run a single [scenario] with the specified seed. */ fun runScenario(scenario: Scenario, seed: Long) = runBlockingSimulation { - val seeder = Random(seed) - - val operationalPhenomena = scenario.operationalPhenomena - val computeScheduler = createComputeScheduler(scenario.allocationPolicy, seeder) - val failureModel = - if (operationalPhenomena.failureFrequency > 0) - grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong())) - else - null - val vms = scenario.workload.source.resolve(workloadLoader, seeder) - val runner = ComputeServiceHelper( - coroutineContext, - clock, - computeScheduler, - seed, - ) - + val serviceDomain = "compute.opendc.org" val topology = clusterTopology(File(envPath, "${scenario.topology.name}.txt")) - val partitions = scenario.partitions + ("seed" to seed.toString()) - val partition = partitions.map { (k, v) -> "$k=$v" }.joinToString("/") - val exporter = if (outputPath != null) { - ComputeMetricReader( - this, - clock, - runner.service, - ParquetComputeMonitor( - outputPath, - partition, - bufferSize = 4096 - ), - exportInterval = Duration.ofMinutes(5) + Provisioner(coroutineContext, clock, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain, { createComputeScheduler(scenario.allocationPolicy, Random(it.seeder.nextLong())) }), + setupHosts(serviceDomain, topology.resolve(), optimize = true) ) - } else { - null - } - try { - // Instantiate the desired topology - runner.apply(topology, optimize = true) + if (outputPath != null) { + val partitions = scenario.partitions + ("seed" to seed.toString()) + val partition = partitions.map { (k, v) -> "$k=$v" }.joinToString("/") + + provisioner.runStep( + registerComputeMonitor( + serviceDomain, + ParquetComputeMonitor( + outputPath, + partition, + bufferSize = 4096 + ) + ) + ) + } - // Run the workload trace - runner.run(vms, failureModel = failureModel, interference = operationalPhenomena.hasInterference) + val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!! + val vms = scenario.workload.source.resolve(workloadLoader, Random(seed)) + val operationalPhenomena = scenario.operationalPhenomena + val failureModel = + if (operationalPhenomena.failureFrequency > 0) + grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong())) + else + null - // Stop the metric collection - exporter?.close() - } finally { - runner.close() + service.replay(clock, vms, seed, failureModel = failureModel, interference = operationalPhenomena.hasInterference) } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index eae3c993..3407e30f 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -26,18 +26,20 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.ComputeMetricReader import org.opendc.compute.workload.telemetry.ComputeMonitor import org.opendc.compute.workload.telemetry.table.HostTableReader +import org.opendc.compute.workload.telemetry.table.ServiceTableReader import org.opendc.compute.workload.topology.Topology -import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.topology.clusterTopology +import org.opendc.experiments.compute.* +import org.opendc.experiments.provisioner.Provisioner import org.opendc.simulator.core.runBlockingSimulation import java.io.File import java.time.Duration @@ -82,45 +84,41 @@ class CapelinIntegrationTest { fun testLarge() = runBlockingSimulation { val seed = 0L val workload = createTestWorkload(1.0, seed) - val runner = ComputeServiceHelper( - coroutineContext, - clock, - computeScheduler, - seed, - ) val topology = createTopology() - val reader = ComputeMetricReader(this, clock, runner.service, monitor) - - try { - runner.apply(topology) - runner.run(workload) + val monitor = monitor - val serviceMetrics = runner.service.getSchedulerStats() - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" + Provisioner(coroutineContext, clock, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology.resolve()), ) - // Note that these values have been verified beforehand - assertAll( - { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") }, - { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, - { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, - { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223393683, this@CapelinIntegrationTest.monitor.idleTime) { "Incorrect idle time" } }, - { assertEquals(66977508, this@CapelinIntegrationTest.monitor.activeTime) { "Incorrect active time" } }, - { assertEquals(3160381, this@CapelinIntegrationTest.monitor.stealTime) { "Incorrect steal time" } }, - { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.840939264814157E9, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } }, - ) - } finally { - runner.close() - reader.close() + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(clock, workload, seed) } + + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.serversPending} " + + "Active=${monitor.serversActive}" + ) + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(50, monitor.attemptsSuccess, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, monitor.serversActive, "All VMs should finish after a run") }, + { assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") }, + { assertEquals(0, monitor.serversPending, "No VM should not be in the queue") }, + { assertEquals(223393683, monitor.idleTime) { "Incorrect idle time" } }, + { assertEquals(66977508, monitor.activeTime) { "Incorrect active time" } }, + { assertEquals(3160381, monitor.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.840939264814157E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } }, + ) } /** @@ -130,40 +128,36 @@ class CapelinIntegrationTest { fun testSmall() = runBlockingSimulation { val seed = 1L val workload = createTestWorkload(0.25, seed) - val runner = ComputeServiceHelper( - coroutineContext, - clock, - computeScheduler, - seed, - ) val topology = createTopology("single") - val reader = ComputeMetricReader(this, clock, runner.service, monitor) - - try { - runner.apply(topology) - runner.run(workload) + val monitor = monitor - val serviceMetrics = runner.service.getSchedulerStats() - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" + Provisioner(coroutineContext, clock, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology.resolve()), ) - } finally { - runner.close() - reader.close() + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(clock, workload, seed) } + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.serversPending} " + + "Active=${monitor.serversActive}" + ) + // Note that these values have been verified beforehand assertAll( - { assertEquals(10999592, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(9741207, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(0, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(0, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(7.011676470304312E8, this@CapelinIntegrationTest.monitor.energyUsage, 0.01) { "Incorrect power draw" } } + { assertEquals(10999592, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(9741207, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, + { assertEquals(7.011676470304312E8, monitor.energyUsage, 0.01) { "Incorrect power draw" } } ) } @@ -174,40 +168,34 @@ class CapelinIntegrationTest { fun testInterference() = runBlockingSimulation { val seed = 0L val workload = createTestWorkload(1.0, seed) - - val simulator = ComputeServiceHelper( - coroutineContext, - clock, - computeScheduler, - seed - ) val topology = createTopology("single") - val reader = ComputeMetricReader(this, clock, simulator.service, monitor) - try { - simulator.apply(topology) - simulator.run(workload, interference = true) - - val serviceMetrics = simulator.service.getSchedulerStats() - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" + Provisioner(coroutineContext, clock, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology.resolve()), ) - } finally { - simulator.close() - reader.close() + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(clock, workload, seed, interference = true) } + println( + "Scheduler " + + "Success=${monitor.attemptsSuccess} " + + "Failure=${monitor.attemptsFailure} " + + "Error=${monitor.attemptsError} " + + "Pending=${monitor.serversPending} " + + "Active=${monitor.serversActive}" + ) + // Note that these values have been verified beforehand assertAll( - { assertEquals(6028050, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(14712749, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } }, - { assertEquals(12532907, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(485510, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } } + { assertEquals(6028050, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(14712749, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(12532907, monitor.stealTime) { "Steal time incorrect" } }, + { assertEquals(470593, monitor.lostTime) { "Lost time incorrect" } } ) } @@ -217,41 +205,28 @@ class CapelinIntegrationTest { @Test fun testFailures() = runBlockingSimulation { val seed = 0L - val simulator = ComputeServiceHelper( - coroutineContext, - clock, - computeScheduler, - seed - ) val topology = createTopology("single") val workload = createTestWorkload(0.25, seed) - val reader = ComputeMetricReader(this, clock, simulator.service, monitor) + val monitor = monitor - try { - simulator.apply(topology) - simulator.run(workload, failureModel = grid5000(Duration.ofDays(7))) - - val serviceMetrics = simulator.service.getSchedulerStats() - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" + Provisioner(coroutineContext, clock, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor), + setupHosts(serviceDomain = "compute.opendc.org", topology.resolve()), ) - } finally { - simulator.close() - reader.close() + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(clock, workload, seed, failureModel = grid5000(Duration.ofDays(7))) } // Note that these values have been verified beforehand assertAll( - { assertEquals(10982026, monitor.idleTime) { "Idle time incorrect" } }, - { assertEquals(9740058, monitor.activeTime) { "Active time incorrect" } }, + { assertEquals(10085158, monitor.idleTime) { "Idle time incorrect" } }, + { assertEquals(8539158, monitor.activeTime) { "Active time incorrect" } }, { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } }, { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }, - { assertEquals(2590260605, monitor.uptime) { "Uptime incorrect" } }, + { assertEquals(2328039558, monitor.uptime) { "Uptime incorrect" } }, ) } @@ -272,6 +247,20 @@ class CapelinIntegrationTest { } class TestComputeMonitor : ComputeMonitor { + var attemptsSuccess = 0 + var attemptsFailure = 0 + var attemptsError = 0 + var serversPending = 0 + var serversActive = 0 + + override fun record(reader: ServiceTableReader) { + attemptsSuccess = reader.attemptsSuccess + attemptsFailure = reader.attemptsFailure + attemptsError = reader.attemptsError + serversPending = reader.serversPending + serversActive = reader.serversActive + } + var idleTime = 0L var activeTime = 0L var stealTime = 0L diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinRunnerTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinRunnerTest.kt new file mode 100644 index 00000000..2aeb9ff9 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinRunnerTest.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin + +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.opendc.experiments.capelin.model.OperationalPhenomena +import org.opendc.experiments.capelin.model.Scenario +import org.opendc.experiments.capelin.model.Topology +import org.opendc.experiments.capelin.model.Workload +import org.opendc.experiments.compute.trace +import java.io.File +import java.nio.file.Files + +/** + * Test suite for [CapelinRunner]. + */ +class CapelinRunnerTest { + /** + * The path to the environments. + */ + private val envPath = File("src/test/resources/env") + + /** + * The path to the traces. + */ + private val tracePath = File("src/test/resources/trace") + + /** + * Smoke test with output. + */ + @Test + fun testSmoke() { + val outputPath = Files.createTempDirectory("output").toFile() + + try { + val runner = CapelinRunner(envPath, tracePath, outputPath) + val scenario = Scenario( + Topology("topology"), + Workload("bitbrains-small", trace("bitbrains-small")), + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), + "active-servers" + ) + + assertDoesNotThrow { runner.runScenario(scenario, seed = 0L) } + } finally { + outputPath.delete() + } + } + + /** + * Smoke test without output. + */ + @Test + fun testSmokeNoOutput() { + val runner = CapelinRunner(envPath, tracePath, null) + val scenario = Scenario( + Topology("topology"), + Workload("bitbrains-small", trace("bitbrains-small")), + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true), + "active-servers" + ) + + assertDoesNotThrow { runner.runScenario(scenario, seed = 0L) } + } +} |
