diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-13 16:12:38 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-13 16:12:38 +0100 |
| commit | daa82a52928adfa79394ffe89ec3758d9f2257ee (patch) | |
| tree | 6cc37d05da20ac8afc8a898db7d5b15440f2ca95 | |
| parent | a84c548300ad5e58eb3d5067e6ded4bea26828ff (diff) | |
| parent | bc22a5b6b04e8a7c588948854831d0ee930aaa75 (diff) | |
Merge branch 'feat/2.x-overcommitting' into '2.x'
Implement proper VM scheduler
See merge request opendc/opendc-simulator!41
4 files changed, 77 insertions, 52 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index 52f9068d..79021d6b 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -27,9 +27,9 @@ class VmImage( } else { val cores = min(this.cores, ctx.server.flavor.cpuCount) val burst = LongArray(cores) { fragment.flops / cores } - val maxUsage = DoubleArray(cores) { i -> burst[i] / (fragment.usage * 1_000_000L) } + val usage = DoubleArray(cores) { fragment.usage } - ctx.run(burst, maxUsage, simulationContext.clock.millis() + fragment.duration) + ctx.run(burst, usage, simulationContext.clock.millis() + fragment.duration) } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index f50d694a..1f9c8a80 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -117,42 +117,44 @@ class HypervisorVirtDriver( val start = simulationContext.clock.millis() val vms = activeVms.toSet() - var duration: Long = Long.MAX_VALUE + var duration: Double = Double.POSITIVE_INFINITY var deadline: Long = Long.MAX_VALUE - val usage = DoubleArray(hostContext.cpus.size) - - for (vm in vms) { - for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) { - val cpu = vm.cpus[i] - - // Limit each vCPU to at most an equal share of the host CPU - val actualUsage = min(vm.limit[i], cpu.frequency / vms.size) - // The duration that we want to run is that of the shortest request from a vCPU - duration = min(duration, ceil(vm.requestedBurst[i] / (actualUsage * 1_000_000L)).toLong()) - deadline = min(deadline, vm.deadline) - usage[i] += actualUsage - } + val maxUsage = hostContext.cpus.sumByDouble { it.frequency } + var availableUsage = maxUsage + val requests = vms.asSequence() + .flatMap { it.requests.asSequence() } + .sortedBy { it.limit } + .toList() + + // Divide the available host capacity fairly across the vCPUs using max-min fair sharing + for ((i, req) in requests.withIndex()) { + val remaining = requests.size - i + val availableShare = availableUsage / remaining + val grantedUsage = min(req.limit, availableShare) + + req.allocatedUsage = grantedUsage + availableUsage -= grantedUsage + + // The duration that we want to run is that of the shortest request from a vCPU + duration = min(duration, req.burst / (req.allocatedUsage * 1_000_000L)) + deadline = min(deadline, req.vm.deadline) } + val usage = DoubleArray(hostContext.cpus.size) val burst = LongArray(hostContext.cpus.size) - - val imagesRunning = vms.map { it.server.image }.toSet() - - for (vm in vms) { - // Apply performance interference model - val performanceModel = vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - val performanceScore = performanceModel?.apply(imagesRunning) ?: 1.0 - - for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) { - val cpu = vm.cpus[i] - - // Limit each vCPU to at most an equal share of the host CPU - val actualUsage = min(vm.limit[i], cpu.frequency / vms.size) - val actualBurst = (duration * actualUsage * 1_000_000L).toLong() - - burst[i] += (performanceScore * actualBurst).toLong() - } + val totalUsage = maxUsage - availableUsage + availableUsage = totalUsage + + // Divide the requests over the available capacity of the pCPUs fairly + for (i in hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency }) { + val remaining = hostContext.cpus.size - i + val availableShare = availableUsage / remaining + val grantedUsage = min(hostContext.cpus[i].frequency, availableShare) + + usage[i] = grantedUsage + burst[i] = (duration * grantedUsage * 1_000_000L).toLong() + availableUsage -= grantedUsage } val remainder = burst.clone() @@ -166,35 +168,40 @@ class HypervisorVirtDriver( return@launch } - for (vm in vms) { - for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) { - val cpu = vm.cpus[i] + val totalRemainder = remainder.sum() + val totalBurst = burst.sum() + val imagesRunning = vms.map { it.server.image }.toSet() - // Limit each vCPU to at most an equal share of the host CPU - val actualUsage = min(vm.limit[i], cpu.frequency / vms.size) - val actualBurst = (duration * actualUsage * 1_000_000L).toLong() + for (vm in vms) { + // Apply performance interference model + val performanceModel = vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + val performanceScore = performanceModel?.apply(imagesRunning) ?: 1.0 + for ((i, req) in vm.requests.withIndex()) { // Compute the fraction of compute time allocated to the VM - val fraction = actualUsage / usage[i] + val fraction = req.allocatedUsage / totalUsage + + // Derive the burst that was allocated to this vCPU + val allocatedBurst = ceil(duration * req.allocatedUsage * 1_000_000L).toLong() // Compute the burst time that the VM was actually granted - val grantedBurst = max(0, actualBurst - ceil(remainder[i] * fraction).toLong()) + val grantedBurst = (performanceScore * (allocatedBurst - ceil(totalRemainder * fraction))).toLong() // Compute remaining burst time to be executed for the request - vm.requestedBurst[i] = max(0, vm.requestedBurst[i] - grantedBurst) + req.burst = max(0, vm.burst[i] - grantedBurst) + vm.burst[i] = req.burst } - if (vm.requestedBurst.any { it == 0L } || vm.deadline <= end) { + if (vm.burst.any { it == 0L } || vm.deadline <= end) { // Return vCPU `run` call: the requested burst was completed or deadline was exceeded vm.chan.send(Unit) } } - val totalBurst = burst.sum() monitor.onSliceFinish( end, totalBurst, - totalBurst - remainder.sum(), + totalBurst - totalRemainder, vms.size, hostContext.server ) @@ -213,13 +220,28 @@ class HypervisorVirtDriver( call.cancel() } + /** + * A request to schedule a virtual CPU on the host cpu. + */ + internal data class CpuRequest( + val vm: VmServerContext, + val vcpu: ProcessingUnit, + var burst: Long, + val limit: Double + ) { + /** + * The usage that was actually granted. + */ + var allocatedUsage: Double = 0.0 + } + internal inner class VmServerContext( override var server: Server, val monitor: ServerMonitor, ctx: SimulationContext ) : ServerManagementContext { - lateinit var requestedBurst: LongArray - lateinit var limit: DoubleArray + lateinit var requests: List<CpuRequest> + lateinit var burst: LongArray var deadline: Long = 0L var chan = Channel<Unit>(Channel.RENDEZVOUS) private var initialized: Boolean = false @@ -261,9 +283,12 @@ class HypervisorVirtDriver( override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { require(burst.size == limit.size) { "Array dimensions do not match" } - requestedBurst = burst - this.limit = limit this.deadline = deadline + this.burst = burst + requests = cpus.asSequence() + .take(burst.size) + .mapIndexed { i, cpu -> CpuRequest(this, cpu, burst[i], limit[i]) } + .toList() // Wait until the burst has been run or the coroutine is cancelled try { diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt index 2a841711..adc476a7 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt @@ -79,8 +79,8 @@ internal class HypervisorTest { val driverDom = root.newDomain("driver") - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) - val cpus = List(4) { ProcessingUnit(cpuNode, it, 2000.0) } + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + val cpus = List(2) { ProcessingUnit(cpuNode, it, 2000.0) } val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", cpus, emptyList()) metalDriver.init(monitor) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index ffdf0529..415a7628 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -101,7 +101,7 @@ fun main(args: Array<String>) { val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, selectedVms) while (reader.hasNext()) { val (time, workload) = reader.next() - delay(max(0, time * 1000 - simulationContext.clock.millis())) + delay(max(0, time - simulationContext.clock.millis())) scheduler.deploy(workload.image, monitor, Flavor(workload.image.cores, workload.image.requiredMemory)) } |
