summaryrefslogtreecommitdiff
path: root/opendc-experiments
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-22 13:23:53 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-22 13:40:21 +0200
commitb8f64c1d3df2c990df8941cd036222fab2def9fa (patch)
tree34dfc78f84c5f463fda7f97957cc403411b4f166 /opendc-experiments
parentc1988fa1b08011f716194f48da10386a236ffd7f (diff)
refactor(compute): Update FilterScheduler to follow OpenStack's Nova
This change updates the FilterScheduler implementation to follow more closely the scheduler implementation in OpenStack's Nova. We now normalize the weights, support many of the filters and weights in OpenStack and support overcommitting resources.
Diffstat (limited to 'opendc-experiments')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt59
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt53
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt19
-rw-r--r--opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt9
4 files changed, 76 insertions, 64 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 42d240dc..2c443678 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -38,6 +38,15 @@ import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.ReplayScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
+import org.opendc.compute.service.scheduler.weights.RamWeigher
+import org.opendc.compute.service.scheduler.weights.VCpuWeigher
import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
@@ -301,3 +310,53 @@ fun createMeterProvider(clock: Clock): MeterProvider {
.registerView(powerSelector, powerView)
.build()
}
+
+/**
+ * Create a [ComputeScheduler] for the experiment.
+ */
+fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler {
+ val cpuAllocationRatio = 16.0
+ val ramAllocationRatio = 1.5
+ return when (allocationPolicy) {
+ "mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = 1.0))
+ )
+ "mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = -1.0))
+ )
+ "core-mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
+ "core-mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = -1.0))
+ )
+ "active-servers" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = -1.0))
+ )
+ "active-servers-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = 1.0))
+ )
+ "provisioned-cores" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0))
+ )
+ "provisioned-cores-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0))
+ )
+ "random" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = emptyList(),
+ subsetSize = Int.MAX_VALUE,
+ random = java.util.Random(seeder.nextLong())
+ )
+ "replay" -> ReplayScheduler(vmPlacements)
+ else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index cbb5bfd9..ee832af8 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -28,10 +28,6 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import mu.KotlinLogging
-import org.opendc.compute.service.scheduler.*
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
-import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.*
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
@@ -49,6 +45,7 @@ import java.io.File
import java.io.FileInputStream
import java.util.*
import java.util.concurrent.ConcurrentHashMap
+import kotlin.random.asKotlinRandom
/**
* A portfolio represents a collection of scenarios are tested for the work.
@@ -105,7 +102,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt"))
val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = createComputeScheduler(seeder)
+ val allocationPolicy = createComputeScheduler(allocationPolicy, seeder.asKotlinRandom(), vmPlacements)
val meterProvider = createMeterProvider(clock)
val workload = workload
@@ -167,50 +164,4 @@ abstract class Portfolio(name: String) : Experiment(name) {
val monitorResults = collectMetrics(meterProvider as MetricProducer)
logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
}
-
- /**
- * Create the [ComputeScheduler] instance to use for the trial.
- */
- private fun createComputeScheduler(seeder: Random): ComputeScheduler {
- return when (allocationPolicy) {
- "mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(MemoryWeigher() to -1.0)
- )
- "mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(MemoryWeigher() to -1.0)
- )
- "core-mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
- )
- "core-mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
- )
- "active-servers" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to -1.0)
- )
- "active-servers-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(InstanceCountWeigher() to 1.0)
- )
- "provisioned-cores" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to -1.0)
- )
- "provisioned-cores-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to 1.0)
- )
- "random" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(RandomWeigher(Random(seeder.nextLong())) to 1.0)
- )
- "replay" -> ReplayScheduler(vmPlacements)
- else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
- }
- }
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 08e04ddf..75428011 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -31,9 +31,10 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
@@ -68,8 +69,8 @@ class CapelinIntegrationTest {
val seed = 0
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
)
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
@@ -113,9 +114,9 @@ class CapelinIntegrationTest {
{ assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
{ assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
{ assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
- { assertEquals(207380244590, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
- { assertEquals(207112418950, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
- { assertEquals(267825640, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
+ { assertEquals(207380204679, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
+ { assertEquals(207371815929, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
+ { assertEquals(8388750, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
)
}
@@ -125,8 +126,8 @@ class CapelinIntegrationTest {
val seed = 1
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
)
val traceReader = createTestTraceReader(0.5, seed)
val environmentReader = createTestEnvironmentReader("single")
diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
index 8fc4f6b8..e64e20a2 100644
--- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
+++ b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
@@ -32,9 +32,9 @@ import mu.KotlinLogging
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.RandomWeigher
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.*
import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor
@@ -81,8 +81,9 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") {
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(RandomWeigher(Random(0)) to 1.0)
+ filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
+ weighers = listOf(),
+ subsetSize = Int.MAX_VALUE
)
val meterProvider: MeterProvider = createMeterProvider(clock)