summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-12 23:37:40 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-12 23:38:26 +0100
commit7f4faa83504abeaac86080bb8d2fc2254633c0db (patch)
tree6fbd22cd4ba59ee9848bab0cb6e08ebeccd14c85 /opendc
parenta84c548300ad5e58eb3d5067e6ded4bea26828ff (diff)
feat: Add VM scheduler for scheduling over all cores
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt101
1 files changed, 64 insertions, 37 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
index f50d694a..10d6dd1f 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
@@ -117,42 +117,53 @@ class HypervisorVirtDriver(
val start = simulationContext.clock.millis()
val vms = activeVms.toSet()
- var duration: Long = Long.MAX_VALUE
+ var duration: Double = Double.POSITIVE_INFINITY
var deadline: Long = Long.MAX_VALUE
val usage = DoubleArray(hostContext.cpus.size)
- for (vm in vms) {
- for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) {
- val cpu = vm.cpus[i]
+ var availableUsage = hostContext.cpus.sumByDouble { it.frequency }
+ val requests = vms.asSequence()
+ .flatMap { it.requests.asSequence() }
+ .sortedBy { it.limit }
+ .toList()
+ var cpuId = 0
+
+ for ((i, req) in requests.withIndex()) {
+ val remaining = requests.size - i
+ val availableShare = availableUsage / remaining
+ val grantedUsage = min(req.limit, availableShare)
+
+ req.allocatedUsage = grantedUsage
+ availableUsage -= grantedUsage
- // Limit each vCPU to at most an equal share of the host CPU
- val actualUsage = min(vm.limit[i], cpu.frequency / vms.size)
+ // The duration that we want to run is that of the shortest request from a vCPU
+ duration = min(duration, req.burst / (req.allocatedUsage * 1_000_000L))
+ deadline = min(deadline, req.vm.deadline)
- // The duration that we want to run is that of the shortest request from a vCPU
- duration = min(duration, ceil(vm.requestedBurst[i] / (actualUsage * 1_000_000L)).toLong())
- deadline = min(deadline, vm.deadline)
- usage[i] += actualUsage
+ // Move to next CPU if the current CPU is full
+ if (usage[cpuId] + grantedUsage > hostContext.cpus[cpuId].frequency) {
+ cpuId++
+
+ // TODO This might possibly break where the last request might not be satisfied
+ assert(cpuId < hostContext.cpus.size) { "Request cannot be satisfied" }
}
+
+ req.allocatedCpu = cpuId
+ usage[cpuId] += grantedUsage
}
val burst = LongArray(hostContext.cpus.size)
-
val imagesRunning = vms.map { it.server.image }.toSet()
- for (vm in vms) {
+ for (req in requests) {
// Apply performance interference model
- val performanceModel = vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
+ val performanceModel = req.vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
val performanceScore = performanceModel?.apply(imagesRunning) ?: 1.0
- for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) {
- val cpu = vm.cpus[i]
-
- // Limit each vCPU to at most an equal share of the host CPU
- val actualUsage = min(vm.limit[i], cpu.frequency / vms.size)
- val actualBurst = (duration * actualUsage * 1_000_000L).toLong()
-
- burst[i] += (performanceScore * actualBurst).toLong()
- }
+ // Limit each vCPU to at most an equal share of the host CPU
+ val actualBurst = (performanceScore * duration * req.allocatedUsage * 1_000_000L).toLong()
+ req.allocatedBurst = actualBurst
+ burst[req.allocatedCpu] += actualBurst
}
val remainder = burst.clone()
@@ -167,24 +178,19 @@ class HypervisorVirtDriver(
}
for (vm in vms) {
- for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) {
- val cpu = vm.cpus[i]
-
- // Limit each vCPU to at most an equal share of the host CPU
- val actualUsage = min(vm.limit[i], cpu.frequency / vms.size)
- val actualBurst = (duration * actualUsage * 1_000_000L).toLong()
-
+ for ((i, req) in vm.requests.withIndex()) {
// Compute the fraction of compute time allocated to the VM
- val fraction = actualUsage / usage[i]
+ val fraction = req.allocatedUsage / usage[req.allocatedCpu]
// Compute the burst time that the VM was actually granted
- val grantedBurst = max(0, actualBurst - ceil(remainder[i] * fraction).toLong())
+ val grantedBurst = max(0, req.allocatedBurst - ceil(remainder[req.allocatedCpu] * fraction).toLong())
// Compute remaining burst time to be executed for the request
- vm.requestedBurst[i] = max(0, vm.requestedBurst[i] - grantedBurst)
+ req.burst = max(0, vm.burst[i] - grantedBurst)
+ vm.burst[i] = req.burst
}
- if (vm.requestedBurst.any { it == 0L } || vm.deadline <= end) {
+ if (vm.burst.any { it == 0L } || vm.deadline <= end) {
// Return vCPU `run` call: the requested burst was completed or deadline was exceeded
vm.chan.send(Unit)
}
@@ -213,13 +219,31 @@ class HypervisorVirtDriver(
call.cancel()
}
+ /**
+ * A request to schedule a virtual CPU on the host cpu.
+ */
+ internal data class CpuRequest(
+ val vm: VmServerContext,
+ val vcpu: ProcessingUnit,
+ var burst: Long,
+ val limit: Double
+ ) {
+ /**
+ * The usage that was actually granted.
+ */
+ var allocatedUsage: Double = 0.0
+ var allocatedCpu: Int = 0
+ var allocatedBurst: Long = 0
+ }
+
+
internal inner class VmServerContext(
override var server: Server,
val monitor: ServerMonitor,
ctx: SimulationContext
) : ServerManagementContext {
- lateinit var requestedBurst: LongArray
- lateinit var limit: DoubleArray
+ lateinit var requests: List<CpuRequest>
+ lateinit var burst: LongArray
var deadline: Long = 0L
var chan = Channel<Unit>(Channel.RENDEZVOUS)
private var initialized: Boolean = false
@@ -261,9 +285,12 @@ class HypervisorVirtDriver(
override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
require(burst.size == limit.size) { "Array dimensions do not match" }
- requestedBurst = burst
- this.limit = limit
this.deadline = deadline
+ this.burst = burst
+ requests = cpus.asSequence()
+ .take(burst.size)
+ .mapIndexed { i, cpu -> CpuRequest(this, cpu, burst[i], limit[i]) }
+ .toList()
// Wait until the burst has been run or the coroutine is cancelled
try {