diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-14 02:28:33 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-05-14 02:28:33 +0200 |
| commit | 189001983350cbcc7f3524ea5983df48c873709b (patch) | |
| tree | 4eb033f13a599e5ebd77705ba9fc1732bad1f13f /opendc/opendc-compute/src | |
| parent | 7958af9966cb3ea3237aabca9e9409041c6dbfbb (diff) | |
feat: Persist provisioner events
Diffstat (limited to 'opendc/opendc-compute/src')
3 files changed, 150 insertions, 12 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 3603ae69..c25834a7 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -1,6 +1,7 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.odcsim.SimulationContext +import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server @@ -19,6 +20,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch @@ -57,11 +59,11 @@ class SimpleVirtProvisioningService( */ private val activeImages: MutableSet<ImageView> = mutableSetOf() - public var submittedVms = 0L - public var queuedVms = 0L - public var runningVms = 0L - public var finishedVms = 0L - public var unscheduledVms = 0L + public var submittedVms = 0 + public var queuedVms = 0 + public var runningVms = 0 + public var finishedVms = 0 + public var unscheduledVms = 0 private var maxCores = 0 private var maxMemory = 0L @@ -71,6 +73,13 @@ class SimpleVirtProvisioningService( */ private val allocationLogic = allocationPolicy() + /** + * The [EventFlow] to emit the events. + */ + internal val eventFlow = EventFlow<VirtProvisioningEvent>() + + override val events: Flow<VirtProvisioningEvent> = eventFlow + init { launch { val provisionedNodes = provisioningService.nodes() @@ -96,8 +105,17 @@ class SimpleVirtProvisioningService( image: Image, flavor: Flavor ): Server = withContext(coroutineContext) { - submittedVms++ - queuedVms++ + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + ++submittedVms, + runningVms, + finishedVms, + ++queuedVms, + unscheduledVms + )) + suspendCancellableCoroutine<Server> { cont -> val vmInstance = ImageView(name, image, flavor, cont) incomingImages += vmInstance @@ -141,7 +159,17 @@ class SimpleVirtProvisioningService( if (selectedHv == null) { if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { - unscheduledVms++ + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + ++unscheduledVms + )) + incomingImages -= imageInstance logger.warn("Failed to spawn ${imageInstance.image}: does not fit [${clock.millis()}]") @@ -168,8 +196,17 @@ class SimpleVirtProvisioningService( ) imageInstance.server = server imageInstance.continuation.resume(server) - queuedVms-- - runningVms++ + + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + ++runningVms, + finishedVms, + --queuedVms, + unscheduledVms + )) activeImages += imageInstance server.events @@ -178,8 +215,17 @@ class SimpleVirtProvisioningService( is ServerEvent.StateChanged -> { if (event.server.state == ServerState.SHUTOFF) { logger.info { "Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } - runningVms-- - finishedVms++ + + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + --runningVms, + ++finishedVms, + queuedVms, + unscheduledVms + )) activeImages -= imageInstance selectedHv.provisionedCores -= server.flavor.cpuCount @@ -211,6 +257,7 @@ class SimpleVirtProvisioningService( if (server in hypervisors) { // Corner case for when the hypervisor already exists availableHypervisors += hypervisors.getValue(server) + } else { val hv = HypervisorView( server.uid, @@ -223,11 +270,33 @@ class SimpleVirtProvisioningService( maxMemory = max(maxMemory, server.flavor.memorySize) hypervisors[server] = hv } + + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + )) } ServerState.SHUTOFF, ServerState.ERROR -> { val hv = hypervisors[server] ?: return availableHypervisors -= hv + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + )) + if (incomingImages.isNotEmpty()) { requestCycle() } @@ -242,6 +311,17 @@ class SimpleVirtProvisioningService( hv.driver = server.services[VirtDriver] availableHypervisors += hv + eventFlow.emit(VirtProvisioningEvent.MetricsAvailable( + this@SimpleVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + )) + hv.driver.events .onEach { event -> if (event is HypervisorEvent.VmsUpdated) { diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt new file mode 100644 index 00000000..39f75913 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningEvent.kt @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt.service + +import com.atlarge.opendc.compute.virt.driver.VirtDriver + + +/** + * An event that is emitted by the [VirtProvisioningService]. + */ +public sealed class VirtProvisioningEvent { + /** + * The service that has emitted the event. + */ + public abstract val provisioner: VirtProvisioningService + + /** + * An event emitted for writing metrics. + */ + data class MetricsAvailable( + override val provisioner: VirtProvisioningService, + public val totalHostCount: Int, + public val availableHostCount: Int, + public val totalVmCount: Int, + public val activeVmCount: Int, + public val inactiveVmCount: Int, + public val waitingVmCount: Int, + public val failedVmCount: Int + ) : VirtProvisioningEvent() +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index 2ad7df84..c4cbd711 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -5,6 +5,7 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy +import kotlinx.coroutines.flow.Flow /** * A service for VM provisioning on a cloud. @@ -16,6 +17,11 @@ interface VirtProvisioningService { val allocationPolicy: AllocationPolicy /** + * The events emitted by the service. + */ + public val events: Flow<VirtProvisioningEvent> + + /** * Obtain the active hypervisors for this provisioner. */ public suspend fun drivers(): Set<VirtDriver> |
