summaryrefslogtreecommitdiff
path: root/opendc/opendc-compute/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-04-12 14:10:03 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-04-12 14:30:42 +0200
commitacfc5edaec2e3ee1f92551bcf3878e7dc8496b7e (patch)
tree731d6a7916b69407940ab573362f6f92a35fba4e /opendc/opendc-compute/src
parent8d4d552e706ad5c5adebc774920337b4f201ac1f (diff)
perf: Address bottlenecks in VirtDriver
Diffstat (limited to 'opendc/opendc-compute/src')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt51
1 files changed, 28 insertions, 23 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index cec9ce53..9b741ce1 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -42,6 +42,7 @@ import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
import com.atlarge.opendc.core.workload.PerformanceInterferenceModel
+import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
@@ -58,11 +59,12 @@ import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.select
+import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext
-import java.lang.Exception
import java.util.Objects
import java.util.TreeSet
import java.util.UUID
+import kotlin.coroutines.resume
import kotlin.math.ceil
import kotlin.math.max
import kotlin.math.min
@@ -180,7 +182,7 @@ class SimpleVirtDriver(
val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency }
val vms = mutableMapOf<VmServerContext, Collection<CpuRequest>>()
- val requests = TreeSet<CpuRequest>()
+ val requests = TreeSet<CpuRequest>(cpuRequestComparator)
val usage = DoubleArray(hostContext.cpus.size)
val burst = LongArray(hostContext.cpus.size)
@@ -321,7 +323,7 @@ class SimpleVirtDriver(
requests.removeAll(vmRequests)
// Return vCPU `run` call: the requested burst was completed or deadline was exceeded
- vm.chan.send(Unit)
+ vm.cont?.resume(Unit)
}
}
@@ -344,6 +346,25 @@ class SimpleVirtDriver(
}
/**
+ * The [Comparator] for [CpuRequest].
+ */
+ private val cpuRequestComparator: Comparator<CpuRequest> = Comparator { lhs, rhs ->
+ var cmp = lhs.limit.compareTo(rhs.limit)
+
+ if (cmp != 0) {
+ return@Comparator cmp
+ }
+
+ cmp = lhs.vm.server.uid.compareTo(rhs.vm.server.uid)
+
+ if (cmp != 0) {
+ return@Comparator cmp
+ }
+
+ lhs.vcpu.id.compareTo(rhs.vcpu.id)
+ }
+
+ /**
* A request to schedule a virtual CPU on the host cpu.
*/
internal data class CpuRequest(
@@ -351,7 +372,7 @@ class SimpleVirtDriver(
val vcpu: ProcessingUnit,
var burst: Long,
val limit: Double
- ) : Comparable<CpuRequest> {
+ ) {
/**
* The usage that was actually granted.
*/
@@ -364,22 +385,6 @@ class SimpleVirtDriver(
override fun equals(other: Any?): Boolean = other is CpuRequest && vm == other.vm && vcpu == other.vcpu
override fun hashCode(): Int = Objects.hash(vm, vcpu)
-
- override fun compareTo(other: CpuRequest): Int {
- var cmp = limit.compareTo(other.limit)
-
- if (cmp != 0) {
- return cmp
- }
-
- cmp = vm.server.uid.compareTo(other.vm.server.uid)
-
- if (cmp != 0) {
- return cmp
- }
-
- return vcpu.id.compareTo(other.vcpu.id)
- }
}
internal inner class VmServerContext(
@@ -390,7 +395,7 @@ class SimpleVirtDriver(
private var finalized: Boolean = false
lateinit var burst: LongArray
var deadline: Long = 0L
- var chan = Channel<Unit>(Channel.RENDEZVOUS)
+ var cont: CancellableContinuation<Unit>? = null
private var initialized: Boolean = false
internal val job: Job = launch {
@@ -462,13 +467,13 @@ class SimpleVirtDriver(
// Wait until the burst has been run or the coroutine is cancelled
try {
schedulingQueue.send(SchedulerCommand.Schedule(this, requests))
- chan.receive()
+ suspendCancellableCoroutine<Unit> { cont = it }
} catch (e: CancellationException) {
// Deschedule the VM
withContext(NonCancellable) {
requests.forEach { it.isCancelled = true }
schedulingQueue.send(SchedulerCommand.Interrupt)
- chan.receive()
+ suspendCancellableCoroutine<Unit> { cont = it }
}
e.assertFailure()