summaryrefslogtreecommitdiff
path: root/simulator/opendc-experiments/opendc-experiments-capelin/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-04-08 20:02:15 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-04-08 20:02:15 +0200
commit4f80e79b567b7d91b1086dcd74ef35616d7177f2 (patch)
tree953d095b08fad740725d24575d8da49e1da7d131 /simulator/opendc-experiments/opendc-experiments-capelin/src
parentd0f5200cf378a0d7f9397526f0db0695bdc34dd2 (diff)
compute: Migrate to new FilterScheduler
This change migrates the OpenDC codebase to use the new FilterScheduler for scheduling virtual machines. This removes the old allocation policies as well.
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-capelin/src')
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt16
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt63
-rw-r--r--simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt21
3 files changed, 70 insertions, 30 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 40f50235..997eba0c 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -32,7 +32,7 @@ import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
-import org.opendc.compute.service.scheduler.AllocationPolicy
+import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
@@ -135,7 +135,7 @@ public suspend fun withComputeService(
clock: Clock,
meterProvider: MeterProvider,
environmentReader: EnvironmentReader,
- allocationPolicy: AllocationPolicy,
+ scheduler: ComputeScheduler,
block: suspend CoroutineScope.(ComputeService) -> Unit
): Unit = coroutineScope {
val hosts = environmentReader
@@ -154,18 +154,18 @@ public suspend fun withComputeService(
)
}
- val schedulerMeter = meterProvider.get("opendc-compute")
- val scheduler =
- ComputeService(coroutineContext, clock, schedulerMeter, allocationPolicy)
+ val serviceMeter = meterProvider.get("opendc-compute")
+ val service =
+ ComputeService(coroutineContext, clock, serviceMeter, scheduler)
for (host in hosts) {
- scheduler.addHost(host)
+ service.addHost(host)
}
try {
- block(this, scheduler)
+ block(this, service)
} finally {
- scheduler.close()
+ service.close()
hosts.forEach(SimHost::close)
}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 5fa77161..941d3c97 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -31,6 +31,9 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.test.runBlockingTest
import mu.KotlinLogging
import org.opendc.compute.service.scheduler.*
+import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.weights.*
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
import org.opendc.experiments.capelin.model.Topology
@@ -45,8 +48,9 @@ import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.telemetry.sdk.toOtelClock
import java.io.File
+import java.util.*
import java.util.concurrent.ConcurrentHashMap
-import kotlin.random.Random
+import kotlin.random.asKotlinRandom
/**
* A portfolio represents a collection of scenarios are tested for the work.
@@ -115,11 +119,11 @@ public abstract class Portfolio(name: String) : Experiment(name) {
@OptIn(ExperimentalCoroutinesApi::class)
override fun doRun(repeat: Int): Unit = runBlockingTest {
val clock = DelayControllerClockAdapter(this)
- val seeder = Random(repeat)
+ val seeder = Random(repeat.toLong())
val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt"))
val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = createAllocationPolicy(seeder)
+ val allocationPolicy = createComputeScheduler(seeder)
val meterProvider: MeterProvider = SdkMeterProvider
.builder()
@@ -142,7 +146,7 @@ public abstract class Portfolio(name: String) : Experiment(name) {
val performanceInterferenceModel = performanceInterferenceModel
?.takeIf { operationalPhenomena.hasInterference }
- ?.construct(seeder) ?: emptyMap()
+ ?.construct(seeder.asKotlinRandom()) ?: emptyMap()
val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, workload, seeder.nextInt())
val monitor = ParquetExperimentMonitor(
@@ -184,20 +188,47 @@ public abstract class Portfolio(name: String) : Experiment(name) {
}
/**
- * Create the [AllocationPolicy] instance to use for the trial.
+ * Create the [ComputeScheduler] instance to use for the trial.
*/
- private fun createAllocationPolicy(seeder: Random): AllocationPolicy {
+ private fun createComputeScheduler(seeder: Random): ComputeScheduler {
return when (allocationPolicy) {
- "mem" -> AvailableMemoryAllocationPolicy()
- "mem-inv" -> AvailableMemoryAllocationPolicy(true)
- "core-mem" -> AvailableCoreMemoryAllocationPolicy()
- "core-mem-inv" -> AvailableCoreMemoryAllocationPolicy(true)
- "active-servers" -> NumberOfActiveServersAllocationPolicy()
- "active-servers-inv" -> NumberOfActiveServersAllocationPolicy(true)
- "provisioned-cores" -> ProvisionedCoresAllocationPolicy()
- "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true)
- "random" -> RandomAllocationPolicy(Random(seeder.nextInt()))
- "replay" -> ReplayAllocationPolicy(vmPlacements)
+ "mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(MemoryWeigher() to -1.0)
+ )
+ "mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(MemoryWeigher() to -1.0)
+ )
+ "core-mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(CoreMemoryWeigher() to -1.0)
+ )
+ "core-mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(CoreMemoryWeigher() to -1.0)
+ )
+ "active-servers" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(ProvisionedCoresWeigher() to -1.0)
+ )
+ "active-servers-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(InstanceCountWeigher() to 1.0)
+ )
+ "provisioned-cores" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(ProvisionedCoresWeigher() to -1.0)
+ )
+ "provisioned-cores-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(ProvisionedCoresWeigher() to 1.0)
+ )
+ "random" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(RandomWeigher(Random(seeder.nextLong())) to 1.0)
+ )
+ "replay" -> ReplayScheduler(vmPlacements)
else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
}
}
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index d2e7473f..4a47922d 100644
--- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -34,7 +34,10 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.service.driver.Host
-import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
import org.opendc.experiments.capelin.trace.Sc20ParquetTraceReader
@@ -71,7 +74,10 @@ class CapelinIntegrationTest {
val failures = false
val seed = 0
val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
+ val allocationPolicy = FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(CoreMemoryWeigher() to -1.0)
+ )
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
lateinit var monitorResults: ComputeMetrics
@@ -118,9 +124,9 @@ class CapelinIntegrationTest {
{ assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
{ assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
{ assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
- { assertEquals(207388095207, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
- { assertEquals(204745144701, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
- { assertEquals(2642950497, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
+ { assertEquals(207389912923, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
+ { assertEquals(207122087280, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
+ { assertEquals(267825640, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
)
}
@@ -130,7 +136,10 @@ class CapelinIntegrationTest {
val clock = DelayControllerClockAdapter(this)
val seed = 1
val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = AvailableCoreMemoryAllocationPolicy()
+ val allocationPolicy = FilterScheduler(
+ filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
+ weighers = listOf(CoreMemoryWeigher() to -1.0)
+ )
val traceReader = createTestTraceReader(0.5, seed)
val environmentReader = createTestEnvironmentReader("single")