diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-02-28 14:47:35 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-02-28 14:47:35 +0100 |
| commit | ac6e6f7c611fa7d10fff5467c4a61af932e4c171 (patch) | |
| tree | 1f5af9dd9fea38aae0007591826bfc54e34b8f29 | |
| parent | 3a5eac673fb67a6cff7fc79f16312db78d706322 (diff) | |
| parent | d394224b76e98d6092e6fb78279ec8944f974aa2 (diff) | |
Merge branch 'feat/2.x-allocation-policy' into 'feat/2.x'
Factor out VM allocation policy and add memory-aware policy
Closes #48
See merge request opendc/opendc-simulator!31
10 files changed, 125 insertions, 27 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt index 68b8e541..d889d0f9 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt @@ -24,8 +24,8 @@ package com.atlarge.opendc.compute.virt.driver -import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.core.services.AbstractServiceKey @@ -47,11 +47,18 @@ public interface VirtDriver { public suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: Flavor): Server /** - * Returns the number of spawned images on the server managed by this driver. + * Adds the given [VirtDriverMonitor] to the list of monitors to keep informed on the state of this driver. + * + * @param monitor The monitor to keep informed. + */ + public suspend fun addMonitor(monitor: VirtDriverMonitor) + + /** + * Removes the given [VirtDriverMonitor] from the list of monitors. * - * @return The number of spawned images. + * @param monitor The monitor to unsubscribe */ - public suspend fun getNumberOfSpawnedImages(): Int + public suspend fun removeMonitor(monitor: VirtDriverMonitor) companion object Key : AbstractServiceKey<VirtDriver>(UUID.randomUUID(), "virtual-driver") } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt new file mode 100644 index 00000000..cf2f4619 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt @@ -0,0 +1,14 @@ +package com.atlarge.opendc.compute.virt.driver + +/** + * Monitor for entities interested in the state of a [VirtDriver]. + */ +interface VirtDriverMonitor { + /** + * Called when the number of active servers on the server managed by this driver is updated. + * + * @param numberOfActiveServers The number of active servers. + * @param availableMemory The available memory, in MB. + */ + public suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index 9745b56c..e0547dcf 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -35,6 +35,7 @@ import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.virt.driver.VirtDriver +import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor import kotlinx.coroutines.Job import kotlinx.coroutines.launch import java.util.UUID @@ -54,23 +55,33 @@ class HypervisorVirtDriver( /** * Current total memory use of the images on this hypervisor. */ - private var memoryAvailable: Long = hostContext.server.flavor.memorySize + private var availableMemory: Long = hostContext.server.flavor.memorySize + + /** + * Monitors to keep informed. + */ + private val monitors: MutableSet<VirtDriverMonitor> = mutableSetOf() override suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: Flavor): Server { val requiredMemory = flavor.memorySize - if (memoryAvailable - requiredMemory < 0) { + if (availableMemory - requiredMemory < 0) { throw InsufficientMemoryOnServerException() } require(flavor.cpuCount <= hostContext.server.flavor.cpuCount) { "Machine does not fit" } val server = Server(UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD) - memoryAvailable -= requiredMemory + availableMemory -= requiredMemory vms.add(VmServerContext(server, monitor, simulationContext)) + monitors.forEach { it.onUpdate(vms.size, availableMemory) } return server } - override suspend fun getNumberOfSpawnedImages(): Int { - return vms.size + override suspend fun addMonitor(monitor: VirtDriverMonitor) { + monitors.add(monitor) + } + + override suspend fun removeMonitor(monitor: VirtDriverMonitor) { + monitors.remove(monitor) } internal inner class VmServerContext( @@ -107,9 +118,11 @@ class HypervisorVirtDriver( val previousState = server.state val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR server = server.copy(state = state) - memoryAvailable += server.flavor.memorySize + availableMemory += server.flavor.memorySize monitor.onUpdate(server, previousState) initialized = false + vms.remove(this) + monitors.forEach { it.onUpdate(vms.size, availableMemory) } } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt new file mode 100644 index 00000000..41e67624 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt @@ -0,0 +1,11 @@ +package com.atlarge.opendc.compute.virt.service + +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage + +class NodeView( + val node: Node, + val hypervisor: HypervisorImage, + var numberOfActiveServers: Int, + var availableMemory: Long +) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 888364e2..f036e370 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -9,12 +9,16 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.driver.VirtDriver +import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnServerException import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import kotlinx.coroutines.launch +import kotlinx.coroutines.yield class SimpleVirtProvisioningService( + public override val allocationPolicy: AllocationPolicy, private val ctx: SimulationContext, private val provisioningService: ProvisioningService, private val hypervisorMonitor: HypervisorMonitor @@ -27,12 +31,7 @@ class SimpleVirtProvisioningService( /** * The available nodes. */ - internal val availableNodes: MutableSet<Node> = mutableSetOf() - - /** - * The available hypervisors. - */ - internal val hypervisorByNode: MutableMap<Node, HypervisorImage> = mutableMapOf() + internal val availableNodes: MutableSet<NodeView> = mutableSetOf() /** * The incoming images to be processed by the provisioner. @@ -53,14 +52,24 @@ class SimpleVirtProvisioningService( ctx.domain.launch { val provisionedNodes = provisioningService.nodes().toList() val deployedNodes = provisionedNodes.map { node -> - val hypervisorImage = - HypervisorImage( - hypervisorMonitor - ) - hypervisorByNode[node] = hypervisorImage - provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService) + val hypervisorImage = HypervisorImage(hypervisorMonitor) + val deployedNode = provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService) + val nodeView = NodeView( + deployedNode, + hypervisorImage, + 0, + deployedNode.server!!.flavor.memorySize + ) + yield() + deployedNode.server.serviceRegistry[VirtDriver.Key].addMonitor(object : VirtDriverMonitor { + override suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) { + nodeView.numberOfActiveServers = numberOfActiveServers + nodeView.availableMemory = availableMemory + } + }) + nodeView } - nodes = deployedNodes + nodes = deployedNodes.map { it.node } availableNodes.addAll(deployedNodes) } } @@ -83,12 +92,10 @@ class SimpleVirtProvisioningService( for (imageInstance in imagesToBeScheduled) { println("Spawning $imageInstance") - val selectedNode = availableNodes.minBy { - it.server!!.serviceRegistry[VirtDriver.Key].getNumberOfSpawnedImages() - } + val selectedNode = availableNodes.minWith(allocationPolicy().thenBy { it.node.uid }) try { - imageInstance.server = selectedNode?.server!!.serviceRegistry[VirtDriver.Key].spawn( + imageInstance.server = selectedNode?.node!!.server!!.serviceRegistry[VirtDriver.Key].spawn( imageInstance.image, imageInstance.monitor, imageInstance.flavor diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index fb087f9d..7770ec50 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -3,11 +3,14 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.monitor.ServerMonitor +import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy /** * A service for VM provisioning on a cloud. */ interface VirtProvisioningService { + val allocationPolicy: AllocationPolicy + /** * Submit the specified [Image] to the provisioning service. * diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt new file mode 100644 index 00000000..a1c0ab9a --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt @@ -0,0 +1,14 @@ +package com.atlarge.opendc.compute.virt.service.allocation + +import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.compute.virt.service.NodeView + +/** + * A policy for selecting the [Node] an image should be deployed to, + */ +interface AllocationPolicy { + /** + * Builds the logic of the policy. + */ + operator fun invoke(): Comparator<NodeView> +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt new file mode 100644 index 00000000..b3e9d77e --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt @@ -0,0 +1,12 @@ +package com.atlarge.opendc.compute.virt.service.allocation + +import com.atlarge.opendc.compute.virt.service.NodeView + +/** + * Allocation policy that selects the node with the most available memory. + */ +class AvailableMemoryAllocationPolicy : AllocationPolicy { + override fun invoke(): Comparator<NodeView> = Comparator { o1, o2 -> + compareValuesBy(o1, o2) { -it.availableMemory } + } +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt new file mode 100644 index 00000000..9d6582dd --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt @@ -0,0 +1,15 @@ +package com.atlarge.opendc.compute.virt.service.allocation + +import com.atlarge.opendc.compute.virt.service.NodeView +import kotlinx.coroutines.runBlocking + +/** + * Allocation policy that selects the node with the least amount of active servers. + */ +class NumberOfActiveServersAllocationPolicy : AllocationPolicy { + override fun invoke(): Comparator<NodeView> = Comparator { o1, o2 -> + runBlocking { + compareValuesBy(o1, o2) { it.numberOfActiveServers } + } + } +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 48aca303..daa40193 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -32,6 +32,7 @@ import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService +import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy import com.atlarge.opendc.format.environment.sc20.Sc20EnvironmentReader import com.atlarge.opendc.format.trace.vm.VmTraceReader import kotlinx.coroutines.channels.Channel @@ -69,6 +70,7 @@ fun main(args: Array<String>) { println(simulationContext.clock.instant()) val scheduler = SimpleVirtProvisioningService( + AvailableMemoryAllocationPolicy(), simulationContext, environment.platforms[0].zones[0].services[ProvisioningService.Key], Sc20HypervisorMonitor() |
