From 189001983350cbcc7f3524ea5983df48c873709b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 14 May 2020 02:28:33 +0200 Subject: feat: Persist provisioner events --- .../virt/service/SimpleVirtProvisioningService.kt | 104 ++++++++++++++++++--- .../compute/virt/service/VirtProvisioningEvent.kt | 52 +++++++++++ .../virt/service/VirtProvisioningService.kt | 6 ++ 3 files changed, 150 insertions(+), 12 deletions(-) create mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt (limited to 'opendc/opendc-compute/src') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 3603ae69..c25834a7 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -1,6 +1,7 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.odcsim.SimulationContext +import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server @@ -19,6 +20,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch @@ -57,11 +59,11 @@ class SimpleVirtProvisioningService( */ private val activeImages: MutableSet = mutableSetOf() - public var submittedVms = 0L - public var queuedVms = 0L - public var runningVms = 0L - public var finishedVms = 0L - public var unscheduledVms = 0L + public var submittedVms = 0 + public var queuedVms = 0 + public var runningVms = 0 + public var finishedVms = 0 + public var unscheduledVms = 0 private var maxCores = 0 private var maxMemory = 0L @@ -71,6 +73,13 @@ class SimpleVirtProvisioningService( */ private val allocationLogic = allocationPolicy() + /** + * The [EventFlow] to emit the events. + */ + internal val eventFlow = EventFlow() + + override val events: Flow = eventFlow + init { launch { val provisionedNodes = provisioningService.nodes() @@ -96,8 +105,17 @@ class SimpleVirtProvisioningService( image: Image, flavor: Flavor ): Server = withContext(coroutineContext) { - submittedVms++ - queuedVms++ + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + ++submittedVms, + runningVms, + finishedVms, + ++queuedVms, + unscheduledVms + )) + suspendCancellableCoroutine { cont -> val vmInstance = ImageView(name, image, flavor, cont) incomingImages += vmInstance @@ -141,7 +159,17 @@ class SimpleVirtProvisioningService( if (selectedHv == null) { if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { - unscheduledVms++ + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + ++unscheduledVms + )) + incomingImages -= imageInstance logger.warn("Failed to spawn ${imageInstance.image}: does not fit [${clock.millis()}]") @@ -168,8 +196,17 @@ class SimpleVirtProvisioningService( ) imageInstance.server = server imageInstance.continuation.resume(server) - queuedVms-- - runningVms++ + + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + ++runningVms, + finishedVms, + --queuedVms, + unscheduledVms + )) activeImages += imageInstance server.events @@ -178,8 +215,17 @@ class SimpleVirtProvisioningService( is ServerEvent.StateChanged -> { if (event.server.state == ServerState.SHUTOFF) { logger.info { "Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } - runningVms-- - finishedVms++ + + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + --runningVms, + ++finishedVms, + queuedVms, + unscheduledVms + )) activeImages -= imageInstance selectedHv.provisionedCores -= server.flavor.cpuCount @@ -211,6 +257,7 @@ class SimpleVirtProvisioningService( if (server in hypervisors) { // Corner case for when the hypervisor already exists availableHypervisors += hypervisors.getValue(server) + } else { val hv = HypervisorView( server.uid, @@ -223,11 +270,33 @@ class SimpleVirtProvisioningService( maxMemory = max(maxMemory, server.flavor.memorySize) hypervisors[server] = hv } + + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + )) } ServerState.SHUTOFF, ServerState.ERROR -> { val hv = hypervisors[server] ?: return availableHypervisors -= hv + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + )) + if (incomingImages.isNotEmpty()) { requestCycle() } @@ -242,6 +311,17 @@ class SimpleVirtProvisioningService( hv.driver = server.services[VirtDriver] availableHypervisors += hv + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + )) + hv.driver.events .onEach { event -> if (event is HypervisorEvent.VmsUpdated) { diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt new file mode 100644 index 00000000..39f75913 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt.service + +import com.atlarge.opendc.compute.virt.driver.VirtDriver + + +/** + * An event that is emitted by the [VirtProvisioningService]. + */ +public sealed class VirtProvisioningEvent { + /** + * The service that has emitted the event. + */ + public abstract val provisioner: VirtProvisioningService + + /** + * An event emitted for writing metrics. + */ + data class MetricsAvailable( + override val provisioner: VirtProvisioningService, + public val totalHostCount: Int, + public val availableHostCount: Int, + public val totalVmCount: Int, + public val activeVmCount: Int, + public val inactiveVmCount: Int, + public val waitingVmCount: Int, + public val failedVmCount: Int + ) : VirtProvisioningEvent() +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index 2ad7df84..c4cbd711 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -5,6 +5,7 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy +import kotlinx.coroutines.flow.Flow /** * A service for VM provisioning on a cloud. @@ -15,6 +16,11 @@ interface VirtProvisioningService { */ val allocationPolicy: AllocationPolicy + /** + * The events emitted by the service. + */ + public val events: Flow + /** * Obtain the active hypervisors for this provisioner. */ -- cgit v1.2.3 From 6c51f02c38053a8aa395ebeb5b29e2b0a4f30c84 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 14 May 2020 14:00:33 +0200 Subject: perf: Use PostgreSQL bulk data inserter --- .../opendc/compute/virt/service/SimpleVirtProvisioningService.kt | 1 - .../com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt | 3 --- 2 files changed, 4 deletions(-) (limited to 'opendc/opendc-compute/src') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index c25834a7..c3d9c745 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -257,7 +257,6 @@ class SimpleVirtProvisioningService( if (server in hypervisors) { // Corner case for when the hypervisor already exists availableHypervisors += hypervisors.getValue(server) - } else { val hv = HypervisorView( server.uid, diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt index 39f75913..c3fb99f9 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt @@ -24,9 +24,6 @@ package com.atlarge.opendc.compute.virt.service -import com.atlarge.opendc.compute.virt.driver.VirtDriver - - /** * An event that is emitted by the [VirtProvisioningService]. */ -- cgit v1.2.3 From 94e34d41eb384731333819a1dbe10539f9a5a14b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 14 May 2020 18:30:26 +0200 Subject: perf: Improve performance interference model construction performance --- .../compute/core/workload/PerformanceInterferenceModel.kt | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) (limited to 'opendc/opendc-compute/src') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt index 45024a49..fab4ae9d 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt @@ -58,10 +58,20 @@ class PerformanceInterferenceModel( lhs.hashCode().compareTo(rhs.hashCode()) } val items = TreeSet(comparator) + val workloadToItem: Map> private val colocatedWorkloads = TreeSet() init { - this.items.addAll(items) + val workloadToItem = mutableMapOf>() + + for (item in items) { + for (workload in item.workloadNames) { + workloadToItem.getOrPut(workload) { mutableSetOf() }.add(item) + } + this.items.add(item) + } + + this.workloadToItem = workloadToItem } fun vmStarted(server: Server) { -- cgit v1.2.3 From 269860ba2616c32ca8a81ac66b6fbf95c2f1c77d Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 15 May 2020 13:29:25 +0200 Subject: perf: Reduce memory consumption of perf interference model --- .../core/workload/PerformanceInterferenceModel.kt | 46 ++++++++-------------- 1 file changed, 16 insertions(+), 30 deletions(-) (limited to 'opendc/opendc-compute/src') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt index fab4ae9d..f458877b 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt @@ -40,40 +40,12 @@ const val IMAGE_PERF_INTERFERENCE_MODEL = "image:performance-interference" * @param items The [PerformanceInterferenceModelItem]s that make up this model. */ class PerformanceInterferenceModel( - items: Set, + val items: SortedSet, val random: Random = Random(0) ) { private var intersectingItems: List = emptyList() - private val comparator = Comparator { lhs, rhs -> - var cmp = lhs.performanceScore.compareTo(rhs.performanceScore) - if (cmp != 0) { - return@Comparator cmp - } - - cmp = lhs.minServerLoad.compareTo(rhs.minServerLoad) - if (cmp != 0) { - return@Comparator cmp - } - - lhs.hashCode().compareTo(rhs.hashCode()) - } - val items = TreeSet(comparator) - val workloadToItem: Map> private val colocatedWorkloads = TreeSet() - init { - val workloadToItem = mutableMapOf>() - - for (item in items) { - for (workload in item.workloadNames) { - workloadToItem.getOrPut(workload) { mutableSetOf() }.add(item) - } - this.items.add(item) - } - - this.workloadToItem = workloadToItem - } - fun vmStarted(server: Server) { colocatedWorkloads.add(server.image.name) intersectingItems = items.filter { item -> doesMatch(item) } @@ -123,7 +95,7 @@ data class PerformanceInterferenceModelItem( val workloadNames: SortedSet, val minServerLoad: Double, val performanceScore: Double -) { +) : Comparable { override fun equals(other: Any?): Boolean { if (this === other) return true if (javaClass != other?.javaClass) return false @@ -136,4 +108,18 @@ data class PerformanceInterferenceModelItem( } override fun hashCode(): Int = workloadNames.hashCode() + + override fun compareTo(other: PerformanceInterferenceModelItem): Int { + var cmp = performanceScore.compareTo(other.performanceScore) + if (cmp != 0) { + return cmp + } + + cmp = minServerLoad.compareTo(other.minServerLoad) + if (cmp != 0) { + return cmp + } + + return hashCode().compareTo(other.hashCode()) + } } -- cgit v1.2.3 From 3228ba5694876a3ac9c0712d2b9330be9eb2f05c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 17 May 2020 19:27:52 +0200 Subject: bug: Limit CPU usage to demand --- .../kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'opendc/opendc-compute/src') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 53fa463b..ce814dd8 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -339,7 +339,7 @@ class SimpleVirtDriver( min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing totalOvercommissionedBurst, totalInterferedBurst, // Might be smaller than zero due to FP rounding errors, - totalAllocatedUsage, + min(totalAllocatedUsage, totalRequestedUsage), // The allocated usage might be slightly higher due to FP rounding totalRequestedUsage, vmCount, // Some VMs might already have finished, so keep initial VM count server -- cgit v1.2.3