diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-04-08 20:02:15 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-04-08 20:02:15 +0200 |
| commit | 4f80e79b567b7d91b1086dcd74ef35616d7177f2 (patch) | |
| tree | 953d095b08fad740725d24575d8da49e1da7d131 /simulator/opendc-experiments/opendc-experiments-capelin/src/main | |
| parent | d0f5200cf378a0d7f9397526f0db0695bdc34dd2 (diff) | |
compute: Migrate to new FilterScheduler
This change migrates the OpenDC codebase to use the new FilterScheduler
for scheduling virtual machines. This removes the old allocation
policies as well.
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-capelin/src/main')
2 files changed, 55 insertions, 24 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 40f50235..997eba0c 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -32,7 +32,7 @@ import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState -import org.opendc.compute.service.scheduler.AllocationPolicy +import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter import org.opendc.experiments.capelin.monitor.ExperimentMonitor @@ -135,7 +135,7 @@ public suspend fun withComputeService( clock: Clock, meterProvider: MeterProvider, environmentReader: EnvironmentReader, - allocationPolicy: AllocationPolicy, + scheduler: ComputeScheduler, block: suspend CoroutineScope.(ComputeService) -> Unit ): Unit = coroutineScope { val hosts = environmentReader @@ -154,18 +154,18 @@ public suspend fun withComputeService( ) } - val schedulerMeter = meterProvider.get("opendc-compute") - val scheduler = - ComputeService(coroutineContext, clock, schedulerMeter, allocationPolicy) + val serviceMeter = meterProvider.get("opendc-compute") + val service = + ComputeService(coroutineContext, clock, serviceMeter, scheduler) for (host in hosts) { - scheduler.addHost(host) + service.addHost(host) } try { - block(this, scheduler) + block(this, service) } finally { - scheduler.close() + service.close() hosts.forEach(SimHost::close) } } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 5fa77161..941d3c97 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -31,6 +31,9 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.test.runBlockingTest import mu.KotlinLogging import org.opendc.compute.service.scheduler.* +import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.weights.* import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology @@ -45,8 +48,9 @@ import org.opendc.harness.dsl.anyOf import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.telemetry.sdk.toOtelClock import java.io.File +import java.util.* import java.util.concurrent.ConcurrentHashMap -import kotlin.random.Random +import kotlin.random.asKotlinRandom /** * A portfolio represents a collection of scenarios are tested for the work. @@ -115,11 +119,11 @@ public abstract class Portfolio(name: String) : Experiment(name) { @OptIn(ExperimentalCoroutinesApi::class) override fun doRun(repeat: Int): Unit = runBlockingTest { val clock = DelayControllerClockAdapter(this) - val seeder = Random(repeat) + val seeder = Random(repeat.toLong()) val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt")) val chan = Channel<Unit>(Channel.CONFLATED) - val allocationPolicy = createAllocationPolicy(seeder) + val allocationPolicy = createComputeScheduler(seeder) val meterProvider: MeterProvider = SdkMeterProvider .builder() @@ -142,7 +146,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { val performanceInterferenceModel = performanceInterferenceModel ?.takeIf { operationalPhenomena.hasInterference } - ?.construct(seeder) ?: emptyMap() + ?.construct(seeder.asKotlinRandom()) ?: emptyMap() val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt()) val monitor = ParquetExperimentMonitor( @@ -184,20 +188,47 @@ public abstract class Portfolio(name: String) : Experiment(name) { } /** - * Create the [AllocationPolicy] instance to use for the trial. + * Create the [ComputeScheduler] instance to use for the trial. */ - private fun createAllocationPolicy(seeder: Random): AllocationPolicy { + private fun createComputeScheduler(seeder: Random): ComputeScheduler { return when (allocationPolicy) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) - "replay" -> ReplayAllocationPolicy(vmPlacements) + "mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(MemoryWeigher() to -1.0) + ) + "mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(MemoryWeigher() to -1.0) + ) + "core-mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(CoreMemoryWeigher() to -1.0) + ) + "core-mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(CoreMemoryWeigher() to -1.0) + ) + "active-servers" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(ProvisionedCoresWeigher() to -1.0) + ) + "active-servers-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(InstanceCountWeigher() to 1.0) + ) + "provisioned-cores" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(ProvisionedCoresWeigher() to -1.0) + ) + "provisioned-cores-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(ProvisionedCoresWeigher() to 1.0) + ) + "random" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(RandomWeigher(Random(seeder.nextLong())) to 1.0) + ) + "replay" -> ReplayScheduler(vmPlacements) else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") } } |
