From 7f4faa83504abeaac86080bb8d2fc2254633c0db Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 12 Mar 2020 23:37:40 +0100 Subject: feat: Add VM scheduler for scheduling over all cores --- .../virt/driver/hypervisor/HypervisorVirtDriver.kt | 101 +++++++++++++-------- 1 file changed, 64 insertions(+), 37 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index f50d694a..10d6dd1f 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -117,42 +117,53 @@ class HypervisorVirtDriver( val start = simulationContext.clock.millis() val vms = activeVms.toSet() - var duration: Long = Long.MAX_VALUE + var duration: Double = Double.POSITIVE_INFINITY var deadline: Long = Long.MAX_VALUE val usage = DoubleArray(hostContext.cpus.size) - for (vm in vms) { - for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) { - val cpu = vm.cpus[i] + var availableUsage = hostContext.cpus.sumByDouble { it.frequency } + val requests = vms.asSequence() + .flatMap { it.requests.asSequence() } + .sortedBy { it.limit } + .toList() + var cpuId = 0 + + for ((i, req) in requests.withIndex()) { + val remaining = requests.size - i + val availableShare = availableUsage / remaining + val grantedUsage = min(req.limit, availableShare) + + req.allocatedUsage = grantedUsage + availableUsage -= grantedUsage - // Limit each vCPU to at most an equal share of the host CPU - val actualUsage = min(vm.limit[i], cpu.frequency / vms.size) + // The duration that we want to run is that of the shortest request from a vCPU + duration = min(duration, req.burst / (req.allocatedUsage * 1_000_000L)) + deadline = min(deadline, req.vm.deadline) - // The duration that we want to run is that of the shortest request from a vCPU - duration = min(duration, ceil(vm.requestedBurst[i] / (actualUsage * 1_000_000L)).toLong()) - deadline = min(deadline, vm.deadline) - usage[i] += actualUsage + // Move to next CPU if the current CPU is full + if (usage[cpuId] + grantedUsage > hostContext.cpus[cpuId].frequency) { + cpuId++ + + // TODO This might possibly break where the last request might not be satisfied + assert(cpuId < hostContext.cpus.size) { "Request cannot be satisfied" } } + + req.allocatedCpu = cpuId + usage[cpuId] += grantedUsage } val burst = LongArray(hostContext.cpus.size) - val imagesRunning = vms.map { it.server.image }.toSet() - for (vm in vms) { + for (req in requests) { // Apply performance interference model - val performanceModel = vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + val performanceModel = req.vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? val performanceScore = performanceModel?.apply(imagesRunning) ?: 1.0 - for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) { - val cpu = vm.cpus[i] - - // Limit each vCPU to at most an equal share of the host CPU - val actualUsage = min(vm.limit[i], cpu.frequency / vms.size) - val actualBurst = (duration * actualUsage * 1_000_000L).toLong() - - burst[i] += (performanceScore * actualBurst).toLong() - } + // Limit each vCPU to at most an equal share of the host CPU + val actualBurst = (performanceScore * duration * req.allocatedUsage * 1_000_000L).toLong() + req.allocatedBurst = actualBurst + burst[req.allocatedCpu] += actualBurst } val remainder = burst.clone() @@ -167,24 +178,19 @@ class HypervisorVirtDriver( } for (vm in vms) { - for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) { - val cpu = vm.cpus[i] - - // Limit each vCPU to at most an equal share of the host CPU - val actualUsage = min(vm.limit[i], cpu.frequency / vms.size) - val actualBurst = (duration * actualUsage * 1_000_000L).toLong() - + for ((i, req) in vm.requests.withIndex()) { // Compute the fraction of compute time allocated to the VM - val fraction = actualUsage / usage[i] + val fraction = req.allocatedUsage / usage[req.allocatedCpu] // Compute the burst time that the VM was actually granted - val grantedBurst = max(0, actualBurst - ceil(remainder[i] * fraction).toLong()) + val grantedBurst = max(0, req.allocatedBurst - ceil(remainder[req.allocatedCpu] * fraction).toLong()) // Compute remaining burst time to be executed for the request - vm.requestedBurst[i] = max(0, vm.requestedBurst[i] - grantedBurst) + req.burst = max(0, vm.burst[i] - grantedBurst) + vm.burst[i] = req.burst } - if (vm.requestedBurst.any { it == 0L } || vm.deadline <= end) { + if (vm.burst.any { it == 0L } || vm.deadline <= end) { // Return vCPU `run` call: the requested burst was completed or deadline was exceeded vm.chan.send(Unit) } @@ -213,13 +219,31 @@ class HypervisorVirtDriver( call.cancel() } + /** + * A request to schedule a virtual CPU on the host cpu. + */ + internal data class CpuRequest( + val vm: VmServerContext, + val vcpu: ProcessingUnit, + var burst: Long, + val limit: Double + ) { + /** + * The usage that was actually granted. + */ + var allocatedUsage: Double = 0.0 + var allocatedCpu: Int = 0 + var allocatedBurst: Long = 0 + } + + internal inner class VmServerContext( override var server: Server, val monitor: ServerMonitor, ctx: SimulationContext ) : ServerManagementContext { - lateinit var requestedBurst: LongArray - lateinit var limit: DoubleArray + lateinit var requests: List + lateinit var burst: LongArray var deadline: Long = 0L var chan = Channel(Channel.RENDEZVOUS) private var initialized: Boolean = false @@ -261,9 +285,12 @@ class HypervisorVirtDriver( override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { require(burst.size == limit.size) { "Array dimensions do not match" } - requestedBurst = burst - this.limit = limit this.deadline = deadline + this.burst = burst + requests = cpus.asSequence() + .take(burst.size) + .mapIndexed { i, cpu -> CpuRequest(this, cpu, burst[i], limit[i]) } + .toList() // Wait until the burst has been run or the coroutine is cancelled try { -- cgit v1.2.3