From 4f80e79b567b7d91b1086dcd74ef35616d7177f2 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 8 Apr 2021 20:02:15 +0200 Subject: compute: Migrate to new FilterScheduler This change migrates the OpenDC codebase to use the new FilterScheduler for scheduling virtual machines. This removes the old allocation policies as well. --- .../experiments/capelin/ExperimentHelpers.kt | 16 +++--- .../org/opendc/experiments/capelin/Portfolio.kt | 63 ++++++++++++++++------ .../experiments/capelin/CapelinIntegrationTest.kt | 21 +++++--- .../experiments/energy21/EnergyExperiment.kt | 26 +++++---- 4 files changed, 86 insertions(+), 40 deletions(-) (limited to 'simulator/opendc-experiments') diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 40f50235..997eba0c 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -32,7 +32,7 @@ import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostListener import org.opendc.compute.service.driver.HostState -import org.opendc.compute.service.scheduler.AllocationPolicy +import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter import org.opendc.experiments.capelin.monitor.ExperimentMonitor @@ -135,7 +135,7 @@ public suspend fun withComputeService( clock: Clock, meterProvider: MeterProvider, environmentReader: EnvironmentReader, - allocationPolicy: AllocationPolicy, + scheduler: ComputeScheduler, block: suspend CoroutineScope.(ComputeService) -> Unit ): Unit = coroutineScope { val hosts = environmentReader @@ -154,18 +154,18 @@ public suspend fun withComputeService( ) } - val schedulerMeter = meterProvider.get("opendc-compute") - val scheduler = - ComputeService(coroutineContext, clock, schedulerMeter, allocationPolicy) + val serviceMeter = meterProvider.get("opendc-compute") + val service = + ComputeService(coroutineContext, clock, serviceMeter, scheduler) for (host in hosts) { - scheduler.addHost(host) + service.addHost(host) } try { - block(this, scheduler) + block(this, service) } finally { - scheduler.close() + service.close() hosts.forEach(SimHost::close) } } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 5fa77161..941d3c97 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -31,6 +31,9 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.test.runBlockingTest import mu.KotlinLogging import org.opendc.compute.service.scheduler.* +import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.weights.* import org.opendc.experiments.capelin.model.CompositeWorkload import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology @@ -45,8 +48,9 @@ import org.opendc.harness.dsl.anyOf import org.opendc.simulator.utils.DelayControllerClockAdapter import org.opendc.telemetry.sdk.toOtelClock import java.io.File +import java.util.* import java.util.concurrent.ConcurrentHashMap -import kotlin.random.Random +import kotlin.random.asKotlinRandom /** * A portfolio represents a collection of scenarios are tested for the work. @@ -115,11 +119,11 @@ public abstract class Portfolio(name: String) : Experiment(name) { @OptIn(ExperimentalCoroutinesApi::class) override fun doRun(repeat: Int): Unit = runBlockingTest { val clock = DelayControllerClockAdapter(this) - val seeder = Random(repeat) + val seeder = Random(repeat.toLong()) val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt")) val chan = Channel(Channel.CONFLATED) - val allocationPolicy = createAllocationPolicy(seeder) + val allocationPolicy = createComputeScheduler(seeder) val meterProvider: MeterProvider = SdkMeterProvider .builder() @@ -142,7 +146,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { val performanceInterferenceModel = performanceInterferenceModel ?.takeIf { operationalPhenomena.hasInterference } - ?.construct(seeder) ?: emptyMap() + ?.construct(seeder.asKotlinRandom()) ?: emptyMap() val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt()) val monitor = ParquetExperimentMonitor( @@ -184,20 +188,47 @@ public abstract class Portfolio(name: String) : Experiment(name) { } /** - * Create the [AllocationPolicy] instance to use for the trial. + * Create the [ComputeScheduler] instance to use for the trial. */ - private fun createAllocationPolicy(seeder: Random): AllocationPolicy { + private fun createComputeScheduler(seeder: Random): ComputeScheduler { return when (allocationPolicy) { - "mem" -> AvailableMemoryAllocationPolicy() - "mem-inv" -> AvailableMemoryAllocationPolicy(true) - "core-mem" -> AvailableCoreMemoryAllocationPolicy() - "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true) - "active-servers" -> NumberOfActiveServersAllocationPolicy() - "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true) - "provisioned-cores" -> ProvisionedCoresAllocationPolicy() - "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) - "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) - "replay" -> ReplayAllocationPolicy(vmPlacements) + "mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(MemoryWeigher() to -1.0) + ) + "mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(MemoryWeigher() to -1.0) + ) + "core-mem" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(CoreMemoryWeigher() to -1.0) + ) + "core-mem-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(CoreMemoryWeigher() to -1.0) + ) + "active-servers" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(ProvisionedCoresWeigher() to -1.0) + ) + "active-servers-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(InstanceCountWeigher() to 1.0) + ) + "provisioned-cores" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(ProvisionedCoresWeigher() to -1.0) + ) + "provisioned-cores-inv" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(ProvisionedCoresWeigher() to 1.0) + ) + "random" -> FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(RandomWeigher(Random(seeder.nextLong())) to 1.0) + ) + "replay" -> ReplayScheduler(vmPlacements) else -> throw IllegalArgumentException("Unknown policy $allocationPolicy") } } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index d2e7473f..4a47922d 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -34,7 +34,10 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy +import org.opendc.compute.service.scheduler.FilterScheduler +import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher import org.opendc.experiments.capelin.model.Workload import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader @@ -71,7 +74,10 @@ class CapelinIntegrationTest { val failures = false val seed = 0 val chan = Channel(Channel.CONFLATED) - val allocationPolicy = AvailableCoreMemoryAllocationPolicy() + val allocationPolicy = FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(CoreMemoryWeigher() to -1.0) + ) val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() lateinit var monitorResults: ComputeMetrics @@ -118,9 +124,9 @@ class CapelinIntegrationTest { { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, - { assertEquals(207388095207, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, - { assertEquals(204745144701, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, - { assertEquals(2642950497, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, + { assertEquals(207389912923, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, + { assertEquals(207122087280, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, + { assertEquals(267825640, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } ) } @@ -130,7 +136,10 @@ class CapelinIntegrationTest { val clock = DelayControllerClockAdapter(this) val seed = 1 val chan = Channel(Channel.CONFLATED) - val allocationPolicy = AvailableCoreMemoryAllocationPolicy() + val allocationPolicy = FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(CoreMemoryWeigher() to -1.0) + ) val traceReader = createTestTraceReader(0.5, seed) val environmentReader = createTestEnvironmentReader("single") diff --git a/simulator/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/simulator/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt index aa0f5ab4..c5982d8c 100644 --- a/simulator/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt +++ b/simulator/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt @@ -32,8 +32,11 @@ import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.test.runBlockingTest import mu.KotlinLogging import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.scheduler.AllocationPolicy -import org.opendc.compute.service.scheduler.RandomAllocationPolicy +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.service.scheduler.FilterScheduler +import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter +import org.opendc.compute.service.scheduler.filters.ComputeFilter +import org.opendc.compute.service.scheduler.weights.RandomWeigher import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.* import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor @@ -88,7 +91,10 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { val clock = DelayControllerClockAdapter(this) val chan = Channel(Channel.CONFLATED) - val allocationPolicy = RandomAllocationPolicy() + val allocationPolicy = FilterScheduler( + filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()), + weighers = listOf(RandomWeigher(Random(0)) to 1.0) + ) val meterProvider: MeterProvider = SdkMeterProvider .builder() @@ -125,7 +131,7 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { public suspend fun withComputeService( clock: Clock, meterProvider: MeterProvider, - allocationPolicy: AllocationPolicy, + scheduler: ComputeScheduler, block: suspend CoroutineScope.(ComputeService) -> Unit ): Unit = coroutineScope { val model = createMachineModel() @@ -144,18 +150,18 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { ) } - val schedulerMeter = meterProvider.get("opendc-compute") - val scheduler = - ComputeService(coroutineContext, clock, schedulerMeter, allocationPolicy) + val serviceMeter = meterProvider.get("opendc-compute") + val service = + ComputeService(coroutineContext, clock, serviceMeter, scheduler) for (host in hosts) { - scheduler.addHost(host) + service.addHost(host) } try { - block(this, scheduler) + block(this, service) } finally { - scheduler.close() + service.close() hosts.forEach(SimHost::close) } } -- cgit v1.2.3