From acfc5edaec2e3ee1f92551bcf3878e7dc8496b7e Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 12 Apr 2020 14:10:03 +0200 Subject: perf: Address bottlenecks in VirtDriver --- .../opendc/compute/virt/driver/SimpleVirtDriver.kt | 51 ++++++++++++---------- 1 file changed, 28 insertions(+), 23 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index cec9ce53..9b741ce1 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -42,6 +42,7 @@ import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -58,11 +59,12 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.select +import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext -import java.lang.Exception import java.util.Objects import java.util.TreeSet import java.util.UUID +import kotlin.coroutines.resume import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -180,7 +182,7 @@ class SimpleVirtDriver( val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency } val vms = mutableMapOf>() - val requests = TreeSet() + val requests = TreeSet(cpuRequestComparator) val usage = DoubleArray(hostContext.cpus.size) val burst = LongArray(hostContext.cpus.size) @@ -321,7 +323,7 @@ class SimpleVirtDriver( requests.removeAll(vmRequests) // Return vCPU `run` call: the requested burst was completed or deadline was exceeded - vm.chan.send(Unit) + vm.cont?.resume(Unit) } } @@ -343,6 +345,25 @@ class SimpleVirtDriver( } } + /** + * The [Comparator] for [CpuRequest]. + */ + private val cpuRequestComparator: Comparator = Comparator { lhs, rhs -> + var cmp = lhs.limit.compareTo(rhs.limit) + + if (cmp != 0) { + return@Comparator cmp + } + + cmp = lhs.vm.server.uid.compareTo(rhs.vm.server.uid) + + if (cmp != 0) { + return@Comparator cmp + } + + lhs.vcpu.id.compareTo(rhs.vcpu.id) + } + /** * A request to schedule a virtual CPU on the host cpu. */ @@ -351,7 +372,7 @@ class SimpleVirtDriver( val vcpu: ProcessingUnit, var burst: Long, val limit: Double - ) : Comparable { + ) { /** * The usage that was actually granted. */ @@ -364,22 +385,6 @@ class SimpleVirtDriver( override fun equals(other: Any?): Boolean = other is CpuRequest && vm == other.vm && vcpu == other.vcpu override fun hashCode(): Int = Objects.hash(vm, vcpu) - - override fun compareTo(other: CpuRequest): Int { - var cmp = limit.compareTo(other.limit) - - if (cmp != 0) { - return cmp - } - - cmp = vm.server.uid.compareTo(other.vm.server.uid) - - if (cmp != 0) { - return cmp - } - - return vcpu.id.compareTo(other.vcpu.id) - } } internal inner class VmServerContext( @@ -390,7 +395,7 @@ class SimpleVirtDriver( private var finalized: Boolean = false lateinit var burst: LongArray var deadline: Long = 0L - var chan = Channel(Channel.RENDEZVOUS) + var cont: CancellableContinuation? = null private var initialized: Boolean = false internal val job: Job = launch { @@ -462,13 +467,13 @@ class SimpleVirtDriver( // Wait until the burst has been run or the coroutine is cancelled try { schedulingQueue.send(SchedulerCommand.Schedule(this, requests)) - chan.receive() + suspendCancellableCoroutine { cont = it } } catch (e: CancellationException) { // Deschedule the VM withContext(NonCancellable) { requests.forEach { it.isCancelled = true } schedulingQueue.send(SchedulerCommand.Interrupt) - chan.receive() + suspendCancellableCoroutine { cont = it } } e.assertFailure() -- cgit v1.2.3