diff options
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin')
3 files changed, 71 insertions, 60 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 42d240dc..2c443678 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -38,6 +38,15 @@ import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.service.scheduler.FilterScheduler +import org.opendc.compute.service.scheduler.ReplayScheduler +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher +import org.opendc.compute.service.scheduler.weights.RamWeigher +import org.opendc.compute.service.scheduler.weights.VCpuWeigher import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter import org.opendc.experiments.capelin.monitor.ExperimentMonitor @@ -301,3 +310,53 @@ fun createMeterProvider(clock: Clock): MeterProvider { .registerView(powerSelector, powerView) .build() } + +/** + * Create a [ComputeScheduler] for the experiment. + */ +fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler { + val cpuAllocationRatio = 16.0 + val ramAllocationRatio = 1.5 + return when (allocationPolicy) { + "mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = 1.0)) + ) + "mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(RamWeigher(multiplier = -1.0)) + ) + "core-mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) + ) + "core-mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(CoreRamWeigher(multiplier = -1.0)) + ) + "active-servers" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = -1.0)) + ) + "active-servers-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(InstanceCountWeigher(multiplier = 1.0)) + ) + "provisioned-cores" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0)) + ) + "provisioned-cores-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0)) + ) + "random" -> FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)), + weighers = emptyList(), + subsetSize = Int.MAX_VALUE, + random = java.util.Random(seeder.nextLong()) + ) + "replay" -> ReplayScheduler(vmPlacements) + else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index cbb5bfd9..ee832af8 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -28,10 +28,6 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import mu.KotlinLogging -import org.opendc.compute.service.scheduler.* -import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter -import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.weights.* import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena @@ -49,6 +45,7 @@ import java.io.File import java.io.FileInputStream import java.util.* import java.util.concurrent.ConcurrentHashMap +import kotlin.random.asKotlinRandom /** * A portfolio represents a collection of scenarios are tested for the work. @@ -105,7 +102,7 @@ abstract class Portfolio(name: String) : Experiment(name) { val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt")) val chan = Channel<Unit>(Channel.CONFLATED) - val allocationPolicy = createComputeScheduler(seeder) + val allocationPolicy = createComputeScheduler(allocationPolicy, seeder.asKotlinRandom(), vmPlacements) val meterProvider = createMeterProvider(clock) val workload = workload @@ -167,50 +164,4 @@ abstract class Portfolio(name: String) : Experiment(name) { val monitorResults = collectMetrics(meterProvider as MetricProducer) logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" } } - - /** - * Create the [ComputeScheduler] instance to use for the trial. - */ - private fun createComputeScheduler(seeder: Random): ComputeScheduler { - return when (allocationPolicy) { - "mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(MemoryWeigher() to -1.0) - ) - "mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(MemoryWeigher() to -1.0) - ) - "core-mem" -> FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(CoreMemoryWeigher() to -1.0) - ) - "core-mem-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(CoreMemoryWeigher() to -1.0) - ) - "active-servers" -> FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(ProvisionedCoresWeigher() to -1.0) - ) - "active-servers-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(InstanceCountWeigher() to 1.0) - ) - "provisioned-cores" -> FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(ProvisionedCoresWeigher() to -1.0) - ) - "provisioned-cores-inv" -> FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(ProvisionedCoresWeigher() to 1.0) - ) - "random" -> FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(RandomWeigher(Random(seeder.nextLong())) to 1.0) - ) - "replay" -> ReplayScheduler(vmPlacements) - else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") - } - } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 08e04ddf..75428011 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -31,9 +31,10 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.service.driver.Host import org.opendc.compute.service.scheduler.FilterScheduler -import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter import org.opendc.compute.service.scheduler.filters.ComputeFilter -import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher +import org.opendc.compute.service.scheduler.filters.RamFilter +import org.opendc.compute.service.scheduler.filters.VCpuFilter +import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ExperimentMonitor @@ -68,8 +69,8 @@ class CapelinIntegrationTest { val seed = 0 val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(CoreMemoryWeigher() to -1.0) + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) ) val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() @@ -113,9 +114,9 @@ class CapelinIntegrationTest { { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, - { assertEquals(207380244590, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, - { assertEquals(207112418950, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, - { assertEquals(267825640, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, + { assertEquals(207380204679, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, + { assertEquals(207371815929, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, + { assertEquals(8388750, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } ) } @@ -125,8 +126,8 @@ class CapelinIntegrationTest { val seed = 1 val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = FilterScheduler( - filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), - weighers = listOf(CoreMemoryWeigher() to -1.0) + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)) ) val traceReader = createTestTraceReader(0.5, seed) val environmentReader = createTestEnvironmentReader("single") |
