diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-04-15 12:16:42 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-04-15 12:16:42 +0200 |
| commit | a77829c7a8cf300a99a695423d85f905e0209286 (patch) | |
| tree | f31200cdc3f108ea9b6d622df44a972ed5eed46d /opendc/opendc-compute/src | |
| parent | ae23970faa77c89408a4e98cb9259fb53e222bd3 (diff) | |
perf: Optimize performance interference
Diffstat (limited to 'opendc/opendc-compute/src')
5 files changed, 168 insertions, 29 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt new file mode 100644 index 00000000..ddf9bb33 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt @@ -0,0 +1,129 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.core.workload + +import com.atlarge.opendc.compute.core.Server +import java.util.SortedSet +import java.util.TreeSet +import kotlin.random.Random + +/** + * Meta-data key for the [PerformanceInterferenceModel] of an image. + */ +const val IMAGE_PERF_INTERFERENCE_MODEL = "image:performance-interference" + +/** + * Performance Interference Model describing the variability incurred by different sets of workloads if colocated. + * + * @param items The [PerformanceInterferenceModelItem]s that make up this model. + */ +class PerformanceInterferenceModel( + items: Set<PerformanceInterferenceModelItem>, + val random: Random = Random(0) +) { + private var intersectingItems: List<PerformanceInterferenceModelItem> = emptyList() + private val comparator = Comparator<PerformanceInterferenceModelItem> { lhs, rhs -> + var cmp = lhs.performanceScore.compareTo(rhs.performanceScore) + if (cmp != 0) { + return@Comparator cmp + } + + cmp = lhs.minServerLoad.compareTo(rhs.minServerLoad) + if (cmp != 0) { + return@Comparator cmp + } + + lhs.hashCode().compareTo(rhs.hashCode()) + } + val items = TreeSet(comparator) + private val colocatedWorkloads = TreeSet<String>() + + init { + this.items.addAll(items) + } + + fun vmStarted(server: Server) { + colocatedWorkloads.add(server.image.name) + intersectingItems = items.filter { item -> doesMatch(item) } + } + + fun vmStopped(server: Server) { + colocatedWorkloads.remove(server.image.name) + intersectingItems = items.filter { item -> doesMatch(item) } + } + + private fun doesMatch(item: PerformanceInterferenceModelItem): Boolean { + var count = 0 + for (name in item.workloadNames.subSet(colocatedWorkloads.first(), colocatedWorkloads.last() + "\u0000")) { + if (name in colocatedWorkloads) + count++ + if (count > 1) + return true + } + return false + } + + fun apply(currentServerLoad: Double): Double { + if (intersectingItems.isEmpty()) { + return 1.0 + } + val score = intersectingItems + .firstOrNull { it.minServerLoad <= currentServerLoad } + + // Apply performance penalty to (on average) only one of the VMs + return if (score != null && random.nextInt(score.workloadNames.size) == 0) { + score.performanceScore + } else { + 1.0 + } + } +} + +/** + * Model describing how a specific set of workloads causes performance variability for each workload. + * + * @param workloadNames The names of the workloads that together cause performance variability for each workload in the set. + * @param minServerLoad The minimum total server load at which this interference is activated and noticeable. + * @param performanceScore The performance score that should be applied to each workload's performance. 1 means no + * influence, <1 means that performance degrades, and >1 means that performance improves. + */ +data class PerformanceInterferenceModelItem( + val workloadNames: SortedSet<String>, + val minServerLoad: Double, + val performanceScore: Double +) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + + other as PerformanceInterferenceModelItem + + if (workloadNames != other.workloadNames) return false + + return true + } + + override fun hashCode(): Int = workloadNames.hashCode() +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt index 7c088bc8..e7344fa6 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt @@ -26,6 +26,7 @@ package com.atlarge.opendc.compute.virt import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.virt.driver.VirtDriver +import com.atlarge.opendc.compute.virt.service.VirtProvisioningService /** * An event that is emitted by a [VirtDriver]. diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 73f9dd5c..f32f407c 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -40,8 +40,8 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry -import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope @@ -53,9 +53,6 @@ import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.select @@ -65,6 +62,7 @@ import java.util.Objects import java.util.TreeSet import java.util.UUID import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -101,15 +99,6 @@ class SimpleVirtDriver( override val events: Flow<HypervisorEvent> = eventFlow init { - events.filter { it is HypervisorEvent.VmsUpdated }.onEach { - val imagesRunning = vms.map { it.server.image }.toSet() - vms.forEach { - val performanceModel = - it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - performanceModel?.computeIntersectingItems(imagesRunning) - } - }.launchIn(this) - launch { try { scheduler() @@ -140,6 +129,7 @@ class SimpleVirtDriver( ) availableMemory -= requiredMemory vms.add(VmServerContext(server, events, simulationContext.domain)) + vmStarted(server) eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) return server } @@ -148,6 +138,22 @@ class SimpleVirtDriver( eventFlow.close() } + private fun vmStarted(server: Server) { + vms.forEach { + val performanceModel = + it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + performanceModel?.vmStarted(server) + } + } + + private fun vmStopped(server: Server) { + vms.forEach { + val performanceModel = + it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + performanceModel?.vmStopped(server) + } + } + /** * A scheduling command processed by the scheduler. */ @@ -325,7 +331,7 @@ class SimpleVirtDriver( requests.removeAll(vmRequests) // Return vCPU `run` call: the requested burst was completed or deadline was exceeded - vm.cont?.resume(Unit) + vm.chan.send(Unit) } } @@ -395,7 +401,7 @@ class SimpleVirtDriver( private var finalized: Boolean = false lateinit var burst: LongArray var deadline: Long = 0L - var cont: CancellableContinuation<Unit>? = null + val chan: Channel<Unit> = Channel(Channel.CONFLATED) private var initialized: Boolean = false internal val job: Job = launch { @@ -443,6 +449,7 @@ class SimpleVirtDriver( server = server.copy(state = serverState) availableMemory += server.flavor.memorySize vms.remove(this) + vmStopped(server) eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory)) events.close() } @@ -466,15 +473,12 @@ class SimpleVirtDriver( // Wait until the burst has been run or the coroutine is cancelled try { schedulingQueue.send(SchedulerCommand.Schedule(this, requests)) - suspendCancellableCoroutine<Unit> { cont = it } + chan.receive() } catch (e: CancellationException) { // Deschedule the VM - withContext(NonCancellable) { - requests.forEach { it.isCancelled = true } - schedulingQueue.send(SchedulerCommand.Interrupt) - suspendCancellableCoroutine<Unit> { cont = it } - } - + requests.forEach { it.isCancelled = true } + schedulingQueue.send(SchedulerCommand.Interrupt) + chan.receive() e.assertFailure() } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 6200ad7c..5ed58fee 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -55,7 +55,10 @@ class SimpleVirtProvisioningService( */ private val activeImages: MutableSet<ImageView> = mutableSetOf() - override val hypervisorEvents: Flow<HypervisorEvent> = EventFlow() + public var submittedVms = 0L + public var queuedVms = 0L + public var runningVms = 0L + public var finishedVms = 0L /** * The allocation logic to use. @@ -87,6 +90,8 @@ class SimpleVirtProvisioningService( image: Image, flavor: Flavor ): Server = withContext(coroutineContext) { + submittedVms++ + queuedVms++ suspendCancellableCoroutine<Server> { cont -> val vmInstance = ImageView(name, image, flavor, cont) incomingImages += vmInstance @@ -145,6 +150,8 @@ class SimpleVirtProvisioningService( ) imageInstance.server = server imageInstance.continuation.resume(server) + queuedVms-- + runningVms++ activeImages += imageInstance server.events @@ -152,6 +159,9 @@ class SimpleVirtProvisioningService( when (event) { is ServerEvent.StateChanged -> { if (event.server.state == ServerState.SHUTOFF) { + runningVms-- + finishedVms++ + activeImages -= imageInstance selectedHv.provisionedCores -= server.flavor.cpuCount diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index 550048a4..695f7274 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -18,11 +18,6 @@ interface VirtProvisioningService { val allocationPolicy: AllocationPolicy /** - * The events emitted by the hypervisors. - */ - public val hypervisorEvents: Flow<HypervisorEvent> - - /** * Obtain the active hypervisors for this provisioner. */ public suspend fun drivers(): Set<VirtDriver> |
