From 896c6e2fa2cb84be2f383b39bf81c40101647694 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Wed, 1 Apr 2020 18:32:04 +0200 Subject: Optimizes the performance interference model logic Credits to Fabian for the idea! --- .../opendc/compute/virt/driver/SimpleVirtDriver.kt | 15 +++++++++++++-- .../opendc/core/workload/PerformanceInterferenceModel.kt | 8 ++++++-- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 3086f4e6..00368c43 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -50,6 +50,7 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import java.util.UUID import kotlin.math.ceil @@ -87,6 +88,17 @@ class SimpleVirtDriver( override val events: Flow = eventFlow + init { + events.onEach { + val imagesRunning = vms.map { it.server.image }.toSet() + vms.forEach { + val performanceModel = + it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + performanceModel?.computeIntersectingItems(imagesRunning) + } + } + } + override suspend fun spawn( name: String, image: Image, @@ -193,13 +205,12 @@ class SimpleVirtDriver( val totalRemainder = remainder.sum() val totalBurst = burst.sum() - val imagesRunning = vms.map { it.server.image }.toSet() for (vm in vms) { // Apply performance interference model val performanceModel = vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - val performanceScore = performanceModel?.apply(imagesRunning, serverLoad) ?: 1.0 + val performanceScore = performanceModel?.apply(serverLoad) ?: 1.0 for ((i, req) in vm.requests.withIndex()) { // Compute the fraction of compute time allocated to the VM diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/PerformanceInterferenceModel.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/PerformanceInterferenceModel.kt index e2032ff1..ea110934 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/PerformanceInterferenceModel.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/PerformanceInterferenceModel.kt @@ -16,12 +16,16 @@ const val IMAGE_PERF_INTERFERENCE_MODEL = "image:performance-interference" data class PerformanceInterferenceModel( val items: Set ) { - fun apply(colocatedWorkloads: Set, currentServerLoad: Double): Double { + private var intersectingItems: List = emptyList() + + fun computeIntersectingItems(colocatedWorkloads: Set) { val colocatedWorkloadIds = colocatedWorkloads.map { it.name } - val intersectingItems = items.filter { item -> + intersectingItems = items.filter { item -> colocatedWorkloadIds.intersect(item.workloadNames).size > 1 } + } + fun apply(currentServerLoad: Double): Double { if (intersectingItems.isEmpty()) { return 1.0 } -- cgit v1.2.3 From 3fa3ad3af752b72ef792c2a4314e363f2586e59e Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Wed, 1 Apr 2020 19:51:43 +0200 Subject: Launch in coroutine scope --- .../atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 00368c43..cc03d5f5 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -50,6 +50,7 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import java.util.UUID @@ -96,7 +97,7 @@ class SimpleVirtDriver( it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? performanceModel?.computeIntersectingItems(imagesRunning) } - } + }.launchIn(coroutineScope) } override suspend fun spawn( @@ -233,7 +234,15 @@ class SimpleVirtDriver( } } - eventFlow.emit(HypervisorEvent.SliceFinished(this@SimpleVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size, server)) + eventFlow.emit( + HypervisorEvent.SliceFinished( + this@SimpleVirtDriver, + totalBurst, + totalBurst - totalRemainder, + vms.size, + server + ) + ) } this.call = call } -- cgit v1.2.3 From 2da0b5bfc37febca2183374f74eece0b230f054e Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Fri, 3 Apr 2020 09:15:11 +0200 Subject: Filter on vms updated events --- .../kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index cc03d5f5..b4626def 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -50,6 +50,7 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch @@ -90,7 +91,7 @@ class SimpleVirtDriver( override val events: Flow = eventFlow init { - events.onEach { + events.filter { it is HypervisorEvent.VmsUpdated }.onEach { val imagesRunning = vms.map { it.server.image }.toSet() vms.forEach { val performanceModel = -- cgit v1.2.3