diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-04-08 20:02:15 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-04-08 20:02:15 +0200 |
| commit | 4f80e79b567b7d91b1086dcd74ef35616d7177f2 (patch) | |
| tree | 953d095b08fad740725d24575d8da49e1da7d131 /simulator/opendc-experiments/opendc-experiments-capelin/src | |
| parent | d0f5200cf378a0d7f9397526f0db0695bdc34dd2 (diff) | |
compute: Migrate to new FilterScheduler
This change migrates the OpenDC codebase to use the new FilterScheduler
for scheduling virtual machines. This removes the old allocation
policies as well.
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-capelin/src')
3 files changed, 70 insertions, 30 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 40f50235..997eba0c 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -32,7 +32,7 @@ import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState -import org.opendc.compute.service.scheduler.AllocationPolicy +import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter import org.opendc.experiments.capelin.monitor.ExperimentMonitor @@ -135,7 +135,7 @@ public suspend fun withComputeService( clock: Clock, meterProvider: MeterProvider, environmentReader: EnvironmentReader, - allocationPolicy: AllocationPolicy, + scheduler: ComputeScheduler, block: suspend CoroutineScope.(ComputeService) -> Unit ): Unit = coroutineScope { val hosts = environmentReader @@ -154,18 +154,18 @@ public suspend fun withComputeService( ) } - val schedulerMeter = meterProvider.get("opendc-compute") - val scheduler = - ComputeService(coroutineContext, clock, schedulerMeter, allocationPolicy) + val serviceMeter = meterProvider.get("opendc-compute") + val service = + ComputeService(coroutineContext, clock, serviceMeter, scheduler) for (host in hosts) { - scheduler.addHost(host) + service.addHost(host) } try { - block(this, scheduler) + block(this, service) } finally { - scheduler.close() + service.close() hosts.forEach(SimHost::close) } } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 5fa77161..941d3c97 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -31,6 +31,9 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.test.runBlockingTest import mu.KotlinLogging import org.opendc.compute.service.scheduler.* +import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.weights.* import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology @@ -45,8 +48,9 @@ import org.opendc.harness.dsl.anyOf import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.telemetry.sdk.toOtelClock import java.io.File +import java.util.* import java.util.concurrent.ConcurrentHashMap -import kotlin.random.Random +import kotlin.random.asKotlinRandom /** * A portfolio represents a collection of scenarios are tested for the work. @@ -115,11 +119,11 @@ public abstract class Portfolio(name: String) : Experiment(name) { @OptIn(ExperimentalCoroutinesApi::class) override fun doRun(repeat: Int): Unit = runBlockingTest { val clock = DelayControllerClockAdapter(this) - val seeder = Random(repeat) + val seeder = Random(repeat.toLong()) val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt")) val chan = Channel<Unit>(Channel.CONFLATED) - val allocationPolicy = createAllocationPolicy(seeder) + val allocationPolicy = createComputeScheduler(seeder) val meterProvider: MeterProvider = SdkMeterProvider .builder() @@ -142,7 +146,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { val performanceInterferenceModel = performanceInterferenceModel ?.takeIf { operationalPhenomena.hasInterference } - ?.construct(seeder) ?: emptyMap() + ?.construct(seeder.asKotlinRandom()) ?: emptyMap() val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt()) val monitor = ParquetExperimentMonitor( @@ -184,20 +188,47 @@ public abstract class Portfolio(name: String) : Experiment(name) { } /** - * Create the [AllocationPolicy] instance to use for the trial. + * Create the [ComputeScheduler] instance to use for the trial. */ - private fun createAllocationPolicy(seeder: Random): AllocationPolicy { + private fun createComputeScheduler(seeder: Random): ComputeScheduler { return when (allocationPolicy) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) - "replay" -> ReplayAllocationPolicy(vmPlacements) + "mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(MemoryWeigher() to -1.0) + ) + "mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(MemoryWeigher() to -1.0) + ) + "core-mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(CoreMemoryWeigher() to -1.0) + ) + "core-mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(CoreMemoryWeigher() to -1.0) + ) + "active-servers" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(ProvisionedCoresWeigher() to -1.0) + ) + "active-servers-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(InstanceCountWeigher() to 1.0) + ) + "provisioned-cores" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(ProvisionedCoresWeigher() to -1.0) + ) + "provisioned-cores-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(ProvisionedCoresWeigher() to 1.0) + ) + "random" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(RandomWeigher(Random(seeder.nextLong())) to 1.0) + ) + "replay" -> ReplayScheduler(vmPlacements) else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") } } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index d2e7473f..4a47922d 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -34,7 +34,10 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy +import org.opendc.compute.service.scheduler.FilterScheduler +import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader @@ -71,7 +74,10 @@ class CapelinIntegrationTest { val failures = false val seed = 0 val chan = Channel<Unit>(Channel.CONFLATED) - val allocationPolicy = AvailableCoreMemoryAllocationPolicy() + val allocationPolicy = FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(CoreMemoryWeigher() to -1.0) + ) val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() lateinit var monitorResults: ComputeMetrics @@ -118,9 +124,9 @@ class CapelinIntegrationTest { { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, - { assertEquals(207388095207, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, - { assertEquals(204745144701, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, - { assertEquals(2642950497, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, + { assertEquals(207389912923, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, + { assertEquals(207122087280, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, + { assertEquals(267825640, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } ) } @@ -130,7 +136,10 @@ class CapelinIntegrationTest { val clock = DelayControllerClockAdapter(this) val seed = 1 val chan = Channel<Unit>(Channel.CONFLATED) - val allocationPolicy = AvailableCoreMemoryAllocationPolicy() + val allocationPolicy = FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(CoreMemoryWeigher() to -1.0) + ) val traceReader = createTestTraceReader(0.5, seed) val environmentReader = createTestEnvironmentReader("single") |
