summaryrefslogtreecommitdiff
path: root/opendc-experiments
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-experiments')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt59
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt53
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt19
-rw-r--r--opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt9
4 files changed, 76 insertions, 64 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 42d240dc..2c443678 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -38,6 +38,15 @@ import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.driver.HostListener
import org.opendc.compute.service.driver.HostState
import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.service.scheduler.FilterScheduler
+import org.opendc.compute.service.scheduler.ReplayScheduler
+import org.opendc.compute.service.scheduler.filters.ComputeFilter
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
+import org.opendc.compute.service.scheduler.weights.RamWeigher
+import org.opendc.compute.service.scheduler.weights.VCpuWeigher
import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.monitor.ExperimentMetricExporter
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
@@ -301,3 +310,53 @@ fun createMeterProvider(clock: Clock): MeterProvider {
.registerView(powerSelector, powerView)
.build()
}
+
+/**
+ * Create a [ComputeScheduler] for the experiment.
+ */
+fun createComputeScheduler(allocationPolicy: String, seeder: Random, vmPlacements: Map<String, String> = emptyMap()): ComputeScheduler {
+ val cpuAllocationRatio = 16.0
+ val ramAllocationRatio = 1.5
+ return when (allocationPolicy) {
+ "mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = 1.0))
+ )
+ "mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(RamWeigher(multiplier = -1.0))
+ )
+ "core-mem" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
+ )
+ "core-mem-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(CoreRamWeigher(multiplier = -1.0))
+ )
+ "active-servers" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = -1.0))
+ )
+ "active-servers-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(InstanceCountWeigher(multiplier = 1.0))
+ )
+ "provisioned-cores" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = 1.0))
+ )
+ "provisioned-cores-inv" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = listOf(VCpuWeigher(cpuAllocationRatio, multiplier = -1.0))
+ )
+ "random" -> FilterScheduler(
+ filters = listOf(ComputeFilter(), VCpuFilter(cpuAllocationRatio), RamFilter(ramAllocationRatio)),
+ weighers = emptyList(),
+ subsetSize = Int.MAX_VALUE,
+ random = java.util.Random(seeder.nextLong())
+ )
+ "replay" -> ReplayScheduler(vmPlacements)
+ else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index cbb5bfd9..ee832af8 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -28,10 +28,6 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import mu.KotlinLogging
-import org.opendc.compute.service.scheduler.*
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
-import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.*
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.model.CompositeWorkload
import org.opendc.experiments.capelin.model.OperationalPhenomena
@@ -49,6 +45,7 @@ import java.io.File
import java.io.FileInputStream
import java.util.*
import java.util.concurrent.ConcurrentHashMap
+import kotlin.random.asKotlinRandom
/**
* A portfolio represents a collection of scenarios are tested for the work.
@@ -105,7 +102,7 @@ abstract class Portfolio(name: String) : Experiment(name) {
val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt"))
val chan = Channel<Unit>(Channel.CONFLATED)
- val allocationPolicy = createComputeScheduler(seeder)
+ val allocationPolicy = createComputeScheduler(allocationPolicy, seeder.asKotlinRandom(), vmPlacements)
val meterProvider = createMeterProvider(clock)
val workload = workload
@@ -167,50 +164,4 @@ abstract class Portfolio(name: String) : Experiment(name) {
val monitorResults = collectMetrics(meterProvider as MetricProducer)
logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" }
}
-
- /**
- * Create the [ComputeScheduler] instance to use for the trial.
- */
- private fun createComputeScheduler(seeder: Random): ComputeScheduler {
- return when (allocationPolicy) {
- "mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(MemoryWeigher() to -1.0)
- )
- "mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(MemoryWeigher() to -1.0)
- )
- "core-mem" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
- )
- "core-mem-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
- )
- "active-servers" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to -1.0)
- )
- "active-servers-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(InstanceCountWeigher() to 1.0)
- )
- "provisioned-cores" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to -1.0)
- )
- "provisioned-cores-inv" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(ProvisionedCoresWeigher() to 1.0)
- )
- "random" -> FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(RandomWeigher(Random(seeder.nextLong())) to 1.0)
- )
- "replay" -> ReplayScheduler(vmPlacements)
- else -> throw IllegalArgumentException("Unknown policy $allocationPolicy")
- }
- }
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 08e04ddf..75428011 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -31,9 +31,10 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.service.driver.Host
import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.CoreMemoryWeigher
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
+import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.model.Workload
import org.opendc.experiments.capelin.monitor.ExperimentMonitor
@@ -68,8 +69,8 @@ class CapelinIntegrationTest {
val seed = 0
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
)
val traceReader = createTestTraceReader()
val environmentReader = createTestEnvironmentReader()
@@ -113,9 +114,9 @@ class CapelinIntegrationTest {
{ assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
{ assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
{ assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
- { assertEquals(207380244590, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
- { assertEquals(207112418950, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
- { assertEquals(267825640, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
+ { assertEquals(207380204679, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
+ { assertEquals(207371815929, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
+ { assertEquals(8388750, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
)
}
@@ -125,8 +126,8 @@ class CapelinIntegrationTest {
val seed = 1
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(CoreMemoryWeigher() to -1.0)
+ filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
+ weighers = listOf(CoreRamWeigher(multiplier = 1.0))
)
val traceReader = createTestTraceReader(0.5, seed)
val environmentReader = createTestEnvironmentReader("single")
diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
index 8fc4f6b8..e64e20a2 100644
--- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
+++ b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt
@@ -32,9 +32,9 @@ import mu.KotlinLogging
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.service.scheduler.FilterScheduler
-import org.opendc.compute.service.scheduler.filters.ComputeCapabilitiesFilter
import org.opendc.compute.service.scheduler.filters.ComputeFilter
-import org.opendc.compute.service.scheduler.weights.RandomWeigher
+import org.opendc.compute.service.scheduler.filters.RamFilter
+import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.*
import org.opendc.experiments.capelin.monitor.ParquetExperimentMonitor
@@ -81,8 +81,9 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") {
override fun doRun(repeat: Int): Unit = runBlockingSimulation {
val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = FilterScheduler(
- filters = listOf(ComputeFilter(), ComputeCapabilitiesFilter()),
- weighers = listOf(RandomWeigher(Random(0)) to 1.0)
+ filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
+ weighers = listOf(),
+ subsetSize = Int.MAX_VALUE
)
val meterProvider: MeterProvider = createMeterProvider(clock)