diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-04-12 14:10:03 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-04-12 14:30:42 +0200 |
| commit | acfc5edaec2e3ee1f92551bcf3878e7dc8496b7e (patch) | |
| tree | 731d6a7916b69407940ab573362f6f92a35fba4e /opendc/opendc-compute/src | |
| parent | 8d4d552e706ad5c5adebc774920337b4f201ac1f (diff) | |
perf: Address bottlenecks in VirtDriver
Diffstat (limited to 'opendc/opendc-compute/src')
| -rw-r--r-- | opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt | 51 |
1 files changed, 28 insertions, 23 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index cec9ce53..9b741ce1 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -42,6 +42,7 @@ import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -58,11 +59,12 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.select +import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext -import java.lang.Exception import java.util.Objects import java.util.TreeSet import java.util.UUID +import kotlin.coroutines.resume import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -180,7 +182,7 @@ class SimpleVirtDriver( val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency } val vms = mutableMapOf<VmServerContext, Collection<CpuRequest>>() - val requests = TreeSet<CpuRequest>() + val requests = TreeSet<CpuRequest>(cpuRequestComparator) val usage = DoubleArray(hostContext.cpus.size) val burst = LongArray(hostContext.cpus.size) @@ -321,7 +323,7 @@ class SimpleVirtDriver( requests.removeAll(vmRequests) // Return vCPU `run` call: the requested burst was completed or deadline was exceeded - vm.chan.send(Unit) + vm.cont?.resume(Unit) } } @@ -344,6 +346,25 @@ class SimpleVirtDriver( } /** + * The [Comparator] for [CpuRequest]. + */ + private val cpuRequestComparator: Comparator<CpuRequest> = Comparator { lhs, rhs -> + var cmp = lhs.limit.compareTo(rhs.limit) + + if (cmp != 0) { + return@Comparator cmp + } + + cmp = lhs.vm.server.uid.compareTo(rhs.vm.server.uid) + + if (cmp != 0) { + return@Comparator cmp + } + + lhs.vcpu.id.compareTo(rhs.vcpu.id) + } + + /** * A request to schedule a virtual CPU on the host cpu. */ internal data class CpuRequest( @@ -351,7 +372,7 @@ class SimpleVirtDriver( val vcpu: ProcessingUnit, var burst: Long, val limit: Double - ) : Comparable<CpuRequest> { + ) { /** * The usage that was actually granted. */ @@ -364,22 +385,6 @@ class SimpleVirtDriver( override fun equals(other: Any?): Boolean = other is CpuRequest && vm == other.vm && vcpu == other.vcpu override fun hashCode(): Int = Objects.hash(vm, vcpu) - - override fun compareTo(other: CpuRequest): Int { - var cmp = limit.compareTo(other.limit) - - if (cmp != 0) { - return cmp - } - - cmp = vm.server.uid.compareTo(other.vm.server.uid) - - if (cmp != 0) { - return cmp - } - - return vcpu.id.compareTo(other.vcpu.id) - } } internal inner class VmServerContext( @@ -390,7 +395,7 @@ class SimpleVirtDriver( private var finalized: Boolean = false lateinit var burst: LongArray var deadline: Long = 0L - var chan = Channel<Unit>(Channel.RENDEZVOUS) + var cont: CancellableContinuation<Unit>? = null private var initialized: Boolean = false internal val job: Job = launch { @@ -462,13 +467,13 @@ class SimpleVirtDriver( // Wait until the burst has been run or the coroutine is cancelled try { schedulingQueue.send(SchedulerCommand.Schedule(this, requests)) - chan.receive() + suspendCancellableCoroutine<Unit> { cont = it } } catch (e: CancellationException) { // Deschedule the VM withContext(NonCancellable) { requests.forEach { it.isCancelled = true } schedulingQueue.send(SchedulerCommand.Interrupt) - chan.receive() + suspendCancellableCoroutine<Unit> { cont = it } } e.assertFailure() |
