diff options
| author | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-04-12 15:44:53 +0200 |
|---|---|---|
| committer | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-04-12 15:44:53 +0200 |
| commit | 310daf42af741dee2f11d98eb929d2b6c0db141c (patch) | |
| tree | a0570137b64651bfecd0d3a1d0e0383c2c327cea /opendc/opendc-compute/src | |
| parent | 5f141c8b6aa6cfe96333f0cc02015e490b90fca6 (diff) | |
| parent | 4a5ef5a41c8e008d5c09261de550d3f55eaa3348 (diff) | |
Merge branch 'bug/virt-driver-behavior' into '2.x'
Address multiple (performance) issues
See merge request opendc/opendc-simulator!58
Diffstat (limited to 'opendc/opendc-compute/src')
4 files changed, 58 insertions, 38 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index 9ad88c17..b0688f99 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -3,6 +3,7 @@ package com.atlarge.opendc.compute.core.image import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer +import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.ensureActive import java.util.UUID @@ -19,17 +20,20 @@ class VmImage( ) : Image { override suspend fun invoke(ctx: ServerContext) { + val clock = simulationContext.clock + val job = coroutineContext[Job]!! + for (fragment in flopsHistory) { - coroutineContext.ensureActive() + job.ensureActive() if (fragment.flops == 0L) { delay(fragment.duration) } else { val cores = min(fragment.cores, ctx.server.flavor.cpuCount) val burst = LongArray(cores) { fragment.flops / cores } - val usage = DoubleArray(cores) { fragment.usage } + val usage = DoubleArray(cores) { fragment.usage / cores } - ctx.run(burst, usage, simulationContext.clock.millis() + fragment.duration) + ctx.run(burst, usage, clock.millis() + fragment.duration) } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 8e15584a..844938db 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -62,6 +62,7 @@ import kotlin.math.min import kotlinx.coroutines.withContext import java.lang.Exception import kotlin.coroutines.ContinuationInterceptor +import kotlin.random.Random /** * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. @@ -116,6 +117,11 @@ public class SimpleBareMetalDriver( override val powerDraw: Flow<Double> = powerModel(this) + /** + * The internal random instance. + */ + private val random = Random(0) + override suspend fun init(): Node = withContext(domain.coroutineContext) { nodeState.value } @@ -128,7 +134,7 @@ public class SimpleBareMetalDriver( val events = EventFlow<ServerEvent>() val server = Server( - UUID.randomUUID(), + UUID(node.uid.leastSignificantBits xor node.uid.mostSignificantBits, random.nextLong()), node.name, emptyMap(), flavor, diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt index 9ceb8bfc..7c088bc8 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt @@ -60,6 +60,8 @@ public sealed class HypervisorEvent { * it did not have the capacity. * @property interferedBurst The sum of CPU time that virtual machines could not utilize due to performance * interference. + * @property cpuUsage CPU use in megahertz. + * @property cpuDemand CPU demand in megahertz. * @property numberOfDeployedImages The number of images deployed on this hypervisor. */ public data class SliceFinished( @@ -68,6 +70,8 @@ public sealed class HypervisorEvent { public val grantedBurst: Long, public val overcommissionedBurst: Long, public val interferedBurst: Long, + public val cpuUsage: Double, + public val cpuDemand: Double, public val numberOfDeployedImages: Int, public val hostServer: Server ) : HypervisorEvent() diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 5f15084d..d81b8825 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -42,6 +42,7 @@ import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -58,11 +59,12 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.select +import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext -import java.lang.Exception import java.util.Objects import java.util.TreeSet import java.util.UUID +import kotlin.coroutines.resume import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -180,7 +182,7 @@ class SimpleVirtDriver( val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency } val vms = mutableMapOf<VmServerContext, Collection<CpuRequest>>() - val requests = TreeSet<CpuRequest>() + val requests = TreeSet(cpuRequestComparator) val usage = DoubleArray(hostContext.cpus.size) val burst = LongArray(hostContext.cpus.size) @@ -237,7 +239,8 @@ class SimpleVirtDriver( deadline = min(deadline, req.vm.deadline) } - duration = ceil(duration) + // XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs. + duration = max(300.0, ceil(duration)) val totalAllocatedUsage = maxUsage - availableUsage var totalAllocatedBurst = 0L @@ -246,14 +249,14 @@ class SimpleVirtDriver( // Divide the requests over the available capacity of the pCPUs fairly for (i in pCPUs) { - val remaining = hostContext.cpus.size - i - val availableShare = availableUsage / remaining - val grantedUsage = min(hostContext.cpus[i].frequency, availableShare) - val pBurst = ceil(duration * grantedUsage).toLong() + val maxCpuUsage = hostContext.cpus[i].frequency + val fraction = maxCpuUsage / maxUsage + val grantedUsage = min(maxCpuUsage, totalAllocatedUsage * fraction) + val grantedBurst = ceil(duration * grantedUsage).toLong() usage[i] = grantedUsage - burst[i] = pBurst - totalAllocatedBurst += pBurst + burst[i] = grantedBurst + totalAllocatedBurst += grantedBurst availableUsage -= grantedUsage } @@ -308,9 +311,7 @@ class SimpleVirtDriver( if (req.burst <= 0L || req.isCancelled) { hasFinished = true - } - - if (vm.deadline <= end && hostContext.server.state != ServerState.ERROR) { + } else if (vm.deadline <= end && hostContext.server.state != ServerState.ERROR) { // Request must have its entire burst consumed or otherwise we have overcommission // Note that we count the overcommissioned burst if the hypervisor has failed. totalOvercommissionedBurst += req.burst @@ -323,7 +324,7 @@ class SimpleVirtDriver( requests.removeAll(vmRequests) // Return vCPU `run` call: the requested burst was completed or deadline was exceeded - vm.chan.send(Unit) + vm.cont?.resume(Unit) } } @@ -335,7 +336,9 @@ class SimpleVirtDriver( min(totalRequestedBurst, totalAllocatedBurst), min(totalRequestedBurst, totalGrantedBurst), // We can run more than requested due to timing totalOvercommissionedBurst, - totalInterferedBurst, // Might be smaller than zero due to FP rounding errors + totalInterferedBurst, // Might be smaller than zero due to FP rounding errors, + totalAllocatedUsage, + totalRequestedUsage, vmCount, // Some VMs might already have finished, so keep initial VM count server ) @@ -344,6 +347,25 @@ class SimpleVirtDriver( } /** + * The [Comparator] for [CpuRequest]. + */ + private val cpuRequestComparator: Comparator<CpuRequest> = Comparator { lhs, rhs -> + var cmp = lhs.limit.compareTo(rhs.limit) + + if (cmp != 0) { + return@Comparator cmp + } + + cmp = lhs.vm.server.uid.compareTo(rhs.vm.server.uid) + + if (cmp != 0) { + return@Comparator cmp + } + + lhs.vcpu.id.compareTo(rhs.vcpu.id) + } + + /** * A request to schedule a virtual CPU on the host cpu. */ internal data class CpuRequest( @@ -351,7 +373,7 @@ class SimpleVirtDriver( val vcpu: ProcessingUnit, var burst: Long, val limit: Double - ) : Comparable<CpuRequest> { + ) { /** * The usage that was actually granted. */ @@ -364,22 +386,6 @@ class SimpleVirtDriver( override fun equals(other: Any?): Boolean = other is CpuRequest && vm == other.vm && vcpu == other.vcpu override fun hashCode(): Int = Objects.hash(vm, vcpu) - - override fun compareTo(other: CpuRequest): Int { - var cmp = limit.compareTo(other.limit) - - if (cmp != 0) { - return cmp - } - - cmp = vm.server.uid.compareTo(other.vm.server.uid) - - if (cmp != 0) { - return cmp - } - - return vcpu.id.compareTo(other.vcpu.id) - } } internal inner class VmServerContext( @@ -390,7 +396,7 @@ class SimpleVirtDriver( private var finalized: Boolean = false lateinit var burst: LongArray var deadline: Long = 0L - var chan = Channel<Unit>(Channel.RENDEZVOUS) + var cont: CancellableContinuation<Unit>? = null private var initialized: Boolean = false internal val job: Job = launch { @@ -462,13 +468,13 @@ class SimpleVirtDriver( // Wait until the burst has been run or the coroutine is cancelled try { schedulingQueue.send(SchedulerCommand.Schedule(this, requests)) - chan.receive() + suspendCancellableCoroutine<Unit> { cont = it } } catch (e: CancellationException) { // Deschedule the VM withContext(NonCancellable) { requests.forEach { it.isCancelled = true } schedulingQueue.send(SchedulerCommand.Interrupt) - chan.receive() + suspendCancellableCoroutine<Unit> { cont = it } } e.assertFailure() |
