summaryrefslogtreecommitdiff
path: root/opendc/opendc-compute/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-04-15 12:16:42 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-04-15 12:16:42 +0200
commita77829c7a8cf300a99a695423d85f905e0209286 (patch)
treef31200cdc3f108ea9b6d622df44a972ed5eed46d /opendc/opendc-compute/src
parentae23970faa77c89408a4e98cb9259fb53e222bd3 (diff)
perf: Optimize performance interference
Diffstat (limited to 'opendc/opendc-compute/src')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt129
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt1
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt50
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt12
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt5
5 files changed, 168 insertions, 29 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt
new file mode 100644
index 00000000..ddf9bb33
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt
@@ -0,0 +1,129 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.compute.core.workload
+
+import com.atlarge.opendc.compute.core.Server
+import java.util.SortedSet
+import java.util.TreeSet
+import kotlin.random.Random
+
+/**
+ * Meta-data key for the [PerformanceInterferenceModel] of an image.
+ */
+const val IMAGE_PERF_INTERFERENCE_MODEL = "image:performance-interference"
+
+/**
+ * Performance Interference Model describing the variability incurred by different sets of workloads if colocated.
+ *
+ * @param items The [PerformanceInterferenceModelItem]s that make up this model.
+ */
+class PerformanceInterferenceModel(
+ items: Set<PerformanceInterferenceModelItem>,
+ val random: Random = Random(0)
+) {
+ private var intersectingItems: List<PerformanceInterferenceModelItem> = emptyList()
+ private val comparator = Comparator<PerformanceInterferenceModelItem> { lhs, rhs ->
+ var cmp = lhs.performanceScore.compareTo(rhs.performanceScore)
+ if (cmp != 0) {
+ return@Comparator cmp
+ }
+
+ cmp = lhs.minServerLoad.compareTo(rhs.minServerLoad)
+ if (cmp != 0) {
+ return@Comparator cmp
+ }
+
+ lhs.hashCode().compareTo(rhs.hashCode())
+ }
+ val items = TreeSet(comparator)
+ private val colocatedWorkloads = TreeSet<String>()
+
+ init {
+ this.items.addAll(items)
+ }
+
+ fun vmStarted(server: Server) {
+ colocatedWorkloads.add(server.image.name)
+ intersectingItems = items.filter { item -> doesMatch(item) }
+ }
+
+ fun vmStopped(server: Server) {
+ colocatedWorkloads.remove(server.image.name)
+ intersectingItems = items.filter { item -> doesMatch(item) }
+ }
+
+ private fun doesMatch(item: PerformanceInterferenceModelItem): Boolean {
+ var count = 0
+ for (name in item.workloadNames.subSet(colocatedWorkloads.first(), colocatedWorkloads.last() + "\u0000")) {
+ if (name in colocatedWorkloads)
+ count++
+ if (count > 1)
+ return true
+ }
+ return false
+ }
+
+ fun apply(currentServerLoad: Double): Double {
+ if (intersectingItems.isEmpty()) {
+ return 1.0
+ }
+ val score = intersectingItems
+ .firstOrNull { it.minServerLoad <= currentServerLoad }
+
+ // Apply performance penalty to (on average) only one of the VMs
+ return if (score != null && random.nextInt(score.workloadNames.size) == 0) {
+ score.performanceScore
+ } else {
+ 1.0
+ }
+ }
+}
+
+/**
+ * Model describing how a specific set of workloads causes performance variability for each workload.
+ *
+ * @param workloadNames The names of the workloads that together cause performance variability for each workload in the set.
+ * @param minServerLoad The minimum total server load at which this interference is activated and noticeable.
+ * @param performanceScore The performance score that should be applied to each workload's performance. 1 means no
+ * influence, <1 means that performance degrades, and >1 means that performance improves.
+ */
+data class PerformanceInterferenceModelItem(
+ val workloadNames: SortedSet<String>,
+ val minServerLoad: Double,
+ val performanceScore: Double
+) {
+ override fun equals(other: Any?): Boolean {
+ if (this === other) return true
+ if (javaClass != other?.javaClass) return false
+
+ other as PerformanceInterferenceModelItem
+
+ if (workloadNames != other.workloadNames) return false
+
+ return true
+ }
+
+ override fun hashCode(): Int = workloadNames.hashCode()
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
index 7c088bc8..e7344fa6 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt
@@ -26,6 +26,7 @@ package com.atlarge.opendc.compute.virt
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.virt.driver.VirtDriver
+import com.atlarge.opendc.compute.virt.service.VirtProvisioningService
/**
* An event that is emitted by a [VirtDriver].
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index 73f9dd5c..f32f407c 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -40,8 +40,8 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
-import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
-import com.atlarge.opendc.core.workload.PerformanceInterferenceModel
+import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
+import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
@@ -53,9 +53,6 @@ import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.filter
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.select
@@ -65,6 +62,7 @@ import java.util.Objects
import java.util.TreeSet
import java.util.UUID
import kotlin.coroutines.resume
+import kotlin.coroutines.suspendCoroutine
import kotlin.math.ceil
import kotlin.math.max
import kotlin.math.min
@@ -101,15 +99,6 @@ class SimpleVirtDriver(
override val events: Flow<HypervisorEvent> = eventFlow
init {
- events.filter { it is HypervisorEvent.VmsUpdated }.onEach {
- val imagesRunning = vms.map { it.server.image }.toSet()
- vms.forEach {
- val performanceModel =
- it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
- performanceModel?.computeIntersectingItems(imagesRunning)
- }
- }.launchIn(this)
-
launch {
try {
scheduler()
@@ -140,6 +129,7 @@ class SimpleVirtDriver(
)
availableMemory -= requiredMemory
vms.add(VmServerContext(server, events, simulationContext.domain))
+ vmStarted(server)
eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory))
return server
}
@@ -148,6 +138,22 @@ class SimpleVirtDriver(
eventFlow.close()
}
+ private fun vmStarted(server: Server) {
+ vms.forEach {
+ val performanceModel =
+ it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
+ performanceModel?.vmStarted(server)
+ }
+ }
+
+ private fun vmStopped(server: Server) {
+ vms.forEach {
+ val performanceModel =
+ it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
+ performanceModel?.vmStopped(server)
+ }
+ }
+
/**
* A scheduling command processed by the scheduler.
*/
@@ -325,7 +331,7 @@ class SimpleVirtDriver(
requests.removeAll(vmRequests)
// Return vCPU `run` call: the requested burst was completed or deadline was exceeded
- vm.cont?.resume(Unit)
+ vm.chan.send(Unit)
}
}
@@ -395,7 +401,7 @@ class SimpleVirtDriver(
private var finalized: Boolean = false
lateinit var burst: LongArray
var deadline: Long = 0L
- var cont: CancellableContinuation<Unit>? = null
+ val chan: Channel<Unit> = Channel(Channel.CONFLATED)
private var initialized: Boolean = false
internal val job: Job = launch {
@@ -443,6 +449,7 @@ class SimpleVirtDriver(
server = server.copy(state = serverState)
availableMemory += server.flavor.memorySize
vms.remove(this)
+ vmStopped(server)
eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory))
events.close()
}
@@ -466,15 +473,12 @@ class SimpleVirtDriver(
// Wait until the burst has been run or the coroutine is cancelled
try {
schedulingQueue.send(SchedulerCommand.Schedule(this, requests))
- suspendCancellableCoroutine<Unit> { cont = it }
+ chan.receive()
} catch (e: CancellationException) {
// Deschedule the VM
- withContext(NonCancellable) {
- requests.forEach { it.isCancelled = true }
- schedulingQueue.send(SchedulerCommand.Interrupt)
- suspendCancellableCoroutine<Unit> { cont = it }
- }
-
+ requests.forEach { it.isCancelled = true }
+ schedulingQueue.send(SchedulerCommand.Interrupt)
+ chan.receive()
e.assertFailure()
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 6200ad7c..5ed58fee 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -55,7 +55,10 @@ class SimpleVirtProvisioningService(
*/
private val activeImages: MutableSet<ImageView> = mutableSetOf()
- override val hypervisorEvents: Flow<HypervisorEvent> = EventFlow()
+ public var submittedVms = 0L
+ public var queuedVms = 0L
+ public var runningVms = 0L
+ public var finishedVms = 0L
/**
* The allocation logic to use.
@@ -87,6 +90,8 @@ class SimpleVirtProvisioningService(
image: Image,
flavor: Flavor
): Server = withContext(coroutineContext) {
+ submittedVms++
+ queuedVms++
suspendCancellableCoroutine<Server> { cont ->
val vmInstance = ImageView(name, image, flavor, cont)
incomingImages += vmInstance
@@ -145,6 +150,8 @@ class SimpleVirtProvisioningService(
)
imageInstance.server = server
imageInstance.continuation.resume(server)
+ queuedVms--
+ runningVms++
activeImages += imageInstance
server.events
@@ -152,6 +159,9 @@ class SimpleVirtProvisioningService(
when (event) {
is ServerEvent.StateChanged -> {
if (event.server.state == ServerState.SHUTOFF) {
+ runningVms--
+ finishedVms++
+
activeImages -= imageInstance
selectedHv.provisionedCores -= server.flavor.cpuCount
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
index 550048a4..695f7274 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt
@@ -18,11 +18,6 @@ interface VirtProvisioningService {
val allocationPolicy: AllocationPolicy
/**
- * The events emitted by the hypervisors.
- */
- public val hypervisorEvents: Flow<HypervisorEvent>
-
- /**
* Obtain the active hypervisors for this provisioner.
*/
public suspend fun drivers(): Set<VirtDriver>