diff options
| author | Georgios Andreadis <info@gandreadis.com> | 2020-02-28 13:59:21 +0100 |
|---|---|---|
| committer | Georgios Andreadis <info@gandreadis.com> | 2020-02-28 14:39:59 +0100 |
| commit | 3b31e1e0c4f7f8c7ca20a4b1a3e784029bbd0179 (patch) | |
| tree | 741b5ae6aa503c4a92280c49629e29cc26f533e3 /opendc | |
| parent | 7cadeb711158609eacc86e4e4c60dd825c848b99 (diff) | |
Change to push-based node status communication
Diffstat (limited to 'opendc')
9 files changed, 91 insertions, 50 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt index e68dd488..d889d0f9 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt @@ -47,18 +47,18 @@ public interface VirtDriver { public suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: Flavor): Server /** - * Returns the number of spawned images on the server managed by this driver. + * Adds the given [VirtDriverMonitor] to the list of monitors to keep informed on the state of this driver. * - * @return The number of spawned images. + * @param monitor The monitor to keep informed. */ - public suspend fun getNumberOfSpawnedImages(): Int + public suspend fun addMonitor(monitor: VirtDriverMonitor) /** - * Returns the available memory on the server managed by this driver. + * Removes the given [VirtDriverMonitor] from the list of monitors. * - * @return The available memory, in MB. + * @param monitor The monitor to unsubscribe */ - public suspend fun getAvailableMemory(): Long + public suspend fun removeMonitor(monitor: VirtDriverMonitor) companion object Key : AbstractServiceKey<VirtDriver>(UUID.randomUUID(), "virtual-driver") } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt new file mode 100644 index 00000000..cf2f4619 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt @@ -0,0 +1,14 @@ +package com.atlarge.opendc.compute.virt.driver + +/** + * Monitor for entities interested in the state of a [VirtDriver]. + */ +interface VirtDriverMonitor { + /** + * Called when the number of active servers on the server managed by this driver is updated. + * + * @param numberOfActiveServers The number of active servers. + * @param availableMemory The available memory, in MB. + */ + public suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index 4b23159f..e0547dcf 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -35,6 +35,7 @@ import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.virt.driver.VirtDriver +import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor import kotlinx.coroutines.Job import kotlinx.coroutines.launch import java.util.UUID @@ -54,21 +55,35 @@ class HypervisorVirtDriver( /** * Current total memory use of the images on this hypervisor. */ - private var memoryAvailable: Long = hostContext.server.flavor.memorySize + private var availableMemory: Long = hostContext.server.flavor.memorySize + + /** + * Monitors to keep informed. + */ + private val monitors: MutableSet<VirtDriverMonitor> = mutableSetOf() override suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: Flavor): Server { val requiredMemory = flavor.memorySize - if (memoryAvailable - requiredMemory < 0) { + if (availableMemory - requiredMemory < 0) { throw InsufficientMemoryOnServerException() } require(flavor.cpuCount <= hostContext.server.flavor.cpuCount) { "Machine does not fit" } val server = Server(UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD) - memoryAvailable -= requiredMemory + availableMemory -= requiredMemory vms.add(VmServerContext(server, monitor, simulationContext)) + monitors.forEach { it.onUpdate(vms.size, availableMemory) } return server } + override suspend fun addMonitor(monitor: VirtDriverMonitor) { + monitors.add(monitor) + } + + override suspend fun removeMonitor(monitor: VirtDriverMonitor) { + monitors.remove(monitor) + } + internal inner class VmServerContext( override var server: Server, val monitor: ServerMonitor, @@ -103,9 +118,11 @@ class HypervisorVirtDriver( val previousState = server.state val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR server = server.copy(state = state) - memoryAvailable += server.flavor.memorySize + availableMemory += server.flavor.memorySize monitor.onUpdate(server, previousState) initialized = false + vms.remove(this) + monitors.forEach { it.onUpdate(vms.size, availableMemory) } } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt new file mode 100644 index 00000000..41e67624 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt @@ -0,0 +1,11 @@ +package com.atlarge.opendc.compute.virt.service + +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage + +class NodeView( + val node: Node, + val hypervisor: HypervisorImage, + var numberOfActiveServers: Int, + var availableMemory: Long +) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 309cfa83..5924c2c7 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -9,6 +9,7 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.driver.VirtDriver +import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnServerException import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor @@ -29,12 +30,7 @@ class SimpleVirtProvisioningService( /** * The available nodes. */ - internal val availableNodes: MutableSet<Node> = mutableSetOf() - - /** - * The available hypervisors. - */ - internal val hypervisorByNode: MutableMap<Node, HypervisorImage> = mutableMapOf() + internal val availableNodes: MutableSet<NodeView> = mutableSetOf() /** * The incoming images to be processed by the provisioner. @@ -55,14 +51,22 @@ class SimpleVirtProvisioningService( ctx.domain.launch { val provisionedNodes = provisioningService.nodes().toList() val deployedNodes = provisionedNodes.map { node -> - val hypervisorImage = - HypervisorImage( - hypervisorMonitor - ) - hypervisorByNode[node] = hypervisorImage - provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService) + val hypervisorImage = HypervisorImage(hypervisorMonitor) + val nodeView = NodeView( + provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService), + hypervisorImage, + 0, + node.server!!.flavor.memorySize + ) + node.server.serviceRegistry[VirtDriver.Key].addMonitor(object : VirtDriverMonitor { + override suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) { + nodeView.numberOfActiveServers = numberOfActiveServers + nodeView.availableMemory = availableMemory + } + }) + nodeView } - nodes = deployedNodes + nodes = deployedNodes.map { it.node } availableNodes.addAll(deployedNodes) } } @@ -85,10 +89,10 @@ class SimpleVirtProvisioningService( for (imageInstance in imagesToBeScheduled) { println("Spawning $imageInstance") - val selectedNode = availableNodes.minWith(allocationPolicy().thenBy { it.uid }) + val selectedNode = availableNodes.minWith(allocationPolicy().thenBy { it.node.uid }) try { - imageInstance.server = selectedNode?.server!!.serviceRegistry[VirtDriver.Key].spawn( + imageInstance.server = selectedNode?.node!!.server!!.serviceRegistry[VirtDriver.Key].spawn( imageInstance.image, imageInstance.monitor, imageInstance.flavor diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt index fc08aa87..a1c0ab9a 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt @@ -1,6 +1,7 @@ package com.atlarge.opendc.compute.virt.service.allocation import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.compute.virt.service.NodeView /** * A policy for selecting the [Node] an image should be deployed to, @@ -9,5 +10,5 @@ interface AllocationPolicy { /** * Builds the logic of the policy. */ - suspend operator fun invoke(): Comparator<Node> + operator fun invoke(): Comparator<NodeView> } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt index e02e2cef..b3e9d77e 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt @@ -1,18 +1,12 @@ package com.atlarge.opendc.compute.virt.service.allocation -import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import kotlinx.coroutines.runBlocking +import com.atlarge.opendc.compute.virt.service.NodeView /** * Allocation policy that selects the node with the most available memory. */ class AvailableMemoryAllocationPolicy : AllocationPolicy { - override suspend fun invoke(): Comparator<Node> = Comparator { o1, o2 -> - runBlocking { - compareValuesBy(o1, o2) { - -it.server!!.serviceRegistry[VirtDriver.Key].getAvailableMemory() - } - } + override fun invoke(): Comparator<NodeView> = Comparator { o1, o2 -> + compareValuesBy(o1, o2) { -it.availableMemory } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumHostsAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumHostsAllocationPolicy.kt deleted file mode 100644 index c5ee29c5..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumHostsAllocationPolicy.kt +++ /dev/null @@ -1,15 +0,0 @@ -package com.atlarge.opendc.compute.virt.service.allocation - -import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import kotlinx.coroutines.runBlocking - -class NumHostsAllocationPolicy : AllocationPolicy { - override suspend fun invoke(): Comparator<Node> = Comparator { o1, o2 -> - runBlocking { - compareValuesBy(o1, o2) { - it.server!!.serviceRegistry[VirtDriver.Key].getNumberOfSpawnedImages() - } - } - } -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt new file mode 100644 index 00000000..9d6582dd --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt @@ -0,0 +1,15 @@ +package com.atlarge.opendc.compute.virt.service.allocation + +import com.atlarge.opendc.compute.virt.service.NodeView +import kotlinx.coroutines.runBlocking + +/** + * Allocation policy that selects the node with the least amount of active servers. + */ +class NumberOfActiveServersAllocationPolicy : AllocationPolicy { + override fun invoke(): Comparator<NodeView> = Comparator { o1, o2 -> + runBlocking { + compareValuesBy(o1, o2) { it.numberOfActiveServers } + } + } +} |
