From 7f4faa83504abeaac86080bb8d2fc2254633c0db Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 12 Mar 2020 23:37:40 +0100 Subject: feat: Add VM scheduler for scheduling over all cores --- .../virt/driver/hypervisor/HypervisorVirtDriver.kt | 101 +++++++++++++-------- 1 file changed, 64 insertions(+), 37 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index f50d694a..10d6dd1f 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -117,42 +117,53 @@ class HypervisorVirtDriver( val start = simulationContext.clock.millis() val vms = activeVms.toSet() - var duration: Long = Long.MAX_VALUE + var duration: Double = Double.POSITIVE_INFINITY var deadline: Long = Long.MAX_VALUE val usage = DoubleArray(hostContext.cpus.size) - for (vm in vms) { - for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) { - val cpu = vm.cpus[i] + var availableUsage = hostContext.cpus.sumByDouble { it.frequency } + val requests = vms.asSequence() + .flatMap { it.requests.asSequence() } + .sortedBy { it.limit } + .toList() + var cpuId = 0 + + for ((i, req) in requests.withIndex()) { + val remaining = requests.size - i + val availableShare = availableUsage / remaining + val grantedUsage = min(req.limit, availableShare) + + req.allocatedUsage = grantedUsage + availableUsage -= grantedUsage - // Limit each vCPU to at most an equal share of the host CPU - val actualUsage = min(vm.limit[i], cpu.frequency / vms.size) + // The duration that we want to run is that of the shortest request from a vCPU + duration = min(duration, req.burst / (req.allocatedUsage * 1_000_000L)) + deadline = min(deadline, req.vm.deadline) - // The duration that we want to run is that of the shortest request from a vCPU - duration = min(duration, ceil(vm.requestedBurst[i] / (actualUsage * 1_000_000L)).toLong()) - deadline = min(deadline, vm.deadline) - usage[i] += actualUsage + // Move to next CPU if the current CPU is full + if (usage[cpuId] + grantedUsage > hostContext.cpus[cpuId].frequency) { + cpuId++ + + // TODO This might possibly break where the last request might not be satisfied + assert(cpuId < hostContext.cpus.size) { "Request cannot be satisfied" } } + + req.allocatedCpu = cpuId + usage[cpuId] += grantedUsage } val burst = LongArray(hostContext.cpus.size) - val imagesRunning = vms.map { it.server.image }.toSet() - for (vm in vms) { + for (req in requests) { // Apply performance interference model - val performanceModel = vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + val performanceModel = req.vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? val performanceScore = performanceModel?.apply(imagesRunning) ?: 1.0 - for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) { - val cpu = vm.cpus[i] - - // Limit each vCPU to at most an equal share of the host CPU - val actualUsage = min(vm.limit[i], cpu.frequency / vms.size) - val actualBurst = (duration * actualUsage * 1_000_000L).toLong() - - burst[i] += (performanceScore * actualBurst).toLong() - } + // Limit each vCPU to at most an equal share of the host CPU + val actualBurst = (performanceScore * duration * req.allocatedUsage * 1_000_000L).toLong() + req.allocatedBurst = actualBurst + burst[req.allocatedCpu] += actualBurst } val remainder = burst.clone() @@ -167,24 +178,19 @@ class HypervisorVirtDriver( } for (vm in vms) { - for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) { - val cpu = vm.cpus[i] - - // Limit each vCPU to at most an equal share of the host CPU - val actualUsage = min(vm.limit[i], cpu.frequency / vms.size) - val actualBurst = (duration * actualUsage * 1_000_000L).toLong() - + for ((i, req) in vm.requests.withIndex()) { // Compute the fraction of compute time allocated to the VM - val fraction = actualUsage / usage[i] + val fraction = req.allocatedUsage / usage[req.allocatedCpu] // Compute the burst time that the VM was actually granted - val grantedBurst = max(0, actualBurst - ceil(remainder[i] * fraction).toLong()) + val grantedBurst = max(0, req.allocatedBurst - ceil(remainder[req.allocatedCpu] * fraction).toLong()) // Compute remaining burst time to be executed for the request - vm.requestedBurst[i] = max(0, vm.requestedBurst[i] - grantedBurst) + req.burst = max(0, vm.burst[i] - grantedBurst) + vm.burst[i] = req.burst } - if (vm.requestedBurst.any { it == 0L } || vm.deadline <= end) { + if (vm.burst.any { it == 0L } || vm.deadline <= end) { // Return vCPU `run` call: the requested burst was completed or deadline was exceeded vm.chan.send(Unit) } @@ -213,13 +219,31 @@ class HypervisorVirtDriver( call.cancel() } + /** + * A request to schedule a virtual CPU on the host cpu. + */ + internal data class CpuRequest( + val vm: VmServerContext, + val vcpu: ProcessingUnit, + var burst: Long, + val limit: Double + ) { + /** + * The usage that was actually granted. + */ + var allocatedUsage: Double = 0.0 + var allocatedCpu: Int = 0 + var allocatedBurst: Long = 0 + } + + internal inner class VmServerContext( override var server: Server, val monitor: ServerMonitor, ctx: SimulationContext ) : ServerManagementContext { - lateinit var requestedBurst: LongArray - lateinit var limit: DoubleArray + lateinit var requests: List + lateinit var burst: LongArray var deadline: Long = 0L var chan = Channel(Channel.RENDEZVOUS) private var initialized: Boolean = false @@ -261,9 +285,12 @@ class HypervisorVirtDriver( override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { require(burst.size == limit.size) { "Array dimensions do not match" } - requestedBurst = burst - this.limit = limit this.deadline = deadline + this.burst = burst + requests = cpus.asSequence() + .take(burst.size) + .mapIndexed { i, cpu -> CpuRequest(this, cpu, burst[i], limit[i]) } + .toList() // Wait until the burst has been run or the coroutine is cancelled try { -- cgit v1.2.3 From bc22a5b6b04e8a7c588948854831d0ee930aaa75 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 13 Mar 2020 11:34:35 +0100 Subject: feat: Divide workload equally over pCPUs --- .../atlarge/opendc/compute/core/image/VmImage.kt | 4 +- .../virt/driver/hypervisor/HypervisorVirtDriver.kt | 58 +++++++++++----------- .../virt/driver/hypervisor/HypervisorTest.kt | 4 +- .../opendc/experiments/sc20/TestExperiment.kt | 2 +- 4 files changed, 33 insertions(+), 35 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index 52f9068d..79021d6b 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -27,9 +27,9 @@ class VmImage( } else { val cores = min(this.cores, ctx.server.flavor.cpuCount) val burst = LongArray(cores) { fragment.flops / cores } - val maxUsage = DoubleArray(cores) { i -> burst[i] / (fragment.usage * 1_000_000L) } + val usage = DoubleArray(cores) { fragment.usage } - ctx.run(burst, maxUsage, simulationContext.clock.millis() + fragment.duration) + ctx.run(burst, usage, simulationContext.clock.millis() + fragment.duration) } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index 10d6dd1f..1f9c8a80 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -119,15 +119,15 @@ class HypervisorVirtDriver( var duration: Double = Double.POSITIVE_INFINITY var deadline: Long = Long.MAX_VALUE - val usage = DoubleArray(hostContext.cpus.size) - var availableUsage = hostContext.cpus.sumByDouble { it.frequency } + val maxUsage = hostContext.cpus.sumByDouble { it.frequency } + var availableUsage = maxUsage val requests = vms.asSequence() .flatMap { it.requests.asSequence() } .sortedBy { it.limit } .toList() - var cpuId = 0 + // Divide the available host capacity fairly across the vCPUs using max-min fair sharing for ((i, req) in requests.withIndex()) { val remaining = requests.size - i val availableShare = availableUsage / remaining @@ -139,31 +139,22 @@ class HypervisorVirtDriver( // The duration that we want to run is that of the shortest request from a vCPU duration = min(duration, req.burst / (req.allocatedUsage * 1_000_000L)) deadline = min(deadline, req.vm.deadline) - - // Move to next CPU if the current CPU is full - if (usage[cpuId] + grantedUsage > hostContext.cpus[cpuId].frequency) { - cpuId++ - - // TODO This might possibly break where the last request might not be satisfied - assert(cpuId < hostContext.cpus.size) { "Request cannot be satisfied" } - } - - req.allocatedCpu = cpuId - usage[cpuId] += grantedUsage } + val usage = DoubleArray(hostContext.cpus.size) val burst = LongArray(hostContext.cpus.size) - val imagesRunning = vms.map { it.server.image }.toSet() + val totalUsage = maxUsage - availableUsage + availableUsage = totalUsage - for (req in requests) { - // Apply performance interference model - val performanceModel = req.vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - val performanceScore = performanceModel?.apply(imagesRunning) ?: 1.0 + // Divide the requests over the available capacity of the pCPUs fairly + for (i in hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency }) { + val remaining = hostContext.cpus.size - i + val availableShare = availableUsage / remaining + val grantedUsage = min(hostContext.cpus[i].frequency, availableShare) - // Limit each vCPU to at most an equal share of the host CPU - val actualBurst = (performanceScore * duration * req.allocatedUsage * 1_000_000L).toLong() - req.allocatedBurst = actualBurst - burst[req.allocatedCpu] += actualBurst + usage[i] = grantedUsage + burst[i] = (duration * grantedUsage * 1_000_000L).toLong() + availableUsage -= grantedUsage } val remainder = burst.clone() @@ -177,13 +168,24 @@ class HypervisorVirtDriver( return@launch } + val totalRemainder = remainder.sum() + val totalBurst = burst.sum() + val imagesRunning = vms.map { it.server.image }.toSet() + for (vm in vms) { + // Apply performance interference model + val performanceModel = vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + val performanceScore = performanceModel?.apply(imagesRunning) ?: 1.0 + for ((i, req) in vm.requests.withIndex()) { // Compute the fraction of compute time allocated to the VM - val fraction = req.allocatedUsage / usage[req.allocatedCpu] + val fraction = req.allocatedUsage / totalUsage + + // Derive the burst that was allocated to this vCPU + val allocatedBurst = ceil(duration * req.allocatedUsage * 1_000_000L).toLong() // Compute the burst time that the VM was actually granted - val grantedBurst = max(0, req.allocatedBurst - ceil(remainder[req.allocatedCpu] * fraction).toLong()) + val grantedBurst = (performanceScore * (allocatedBurst - ceil(totalRemainder * fraction))).toLong() // Compute remaining burst time to be executed for the request req.burst = max(0, vm.burst[i] - grantedBurst) @@ -196,11 +198,10 @@ class HypervisorVirtDriver( } } - val totalBurst = burst.sum() monitor.onSliceFinish( end, totalBurst, - totalBurst - remainder.sum(), + totalBurst - totalRemainder, vms.size, hostContext.server ) @@ -232,11 +233,8 @@ class HypervisorVirtDriver( * The usage that was actually granted. */ var allocatedUsage: Double = 0.0 - var allocatedCpu: Int = 0 - var allocatedBurst: Long = 0 } - internal inner class VmServerContext( override var server: Server, val monitor: ServerMonitor, diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt index 2a841711..adc476a7 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt @@ -79,8 +79,8 @@ internal class HypervisorTest { val driverDom = root.newDomain("driver") - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) - val cpus = List(4) { ProcessingUnit(cpuNode, it, 2000.0) } + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + val cpus = List(2) { ProcessingUnit(cpuNode, it, 2000.0) } val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", cpus, emptyList()) metalDriver.init(monitor) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index ffdf0529..415a7628 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -101,7 +101,7 @@ fun main(args: Array) { val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, selectedVms) while (reader.hasNext()) { val (time, workload) = reader.next() - delay(max(0, time * 1000 - simulationContext.clock.millis())) + delay(max(0, time - simulationContext.clock.millis())) scheduler.deploy(workload.image, monitor, Flavor(workload.image.cores, workload.image.requiredMemory)) } -- cgit v1.2.3