summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
authorGeorgios Andreadis <info@gandreadis.com>2020-02-28 13:59:21 +0100
committerGeorgios Andreadis <info@gandreadis.com>2020-02-28 14:39:59 +0100
commit3b31e1e0c4f7f8c7ca20a4b1a3e784029bbd0179 (patch)
tree741b5ae6aa503c4a92280c49629e29cc26f533e3 /opendc
parent7cadeb711158609eacc86e4e4c60dd825c848b99 (diff)
Change to push-based node status communication
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt12
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt14
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt25
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt11
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt34
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt3
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt12
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumHostsAllocationPolicy.kt15
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt15
9 files changed, 91 insertions, 50 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
index e68dd488..d889d0f9 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt
@@ -47,18 +47,18 @@ public interface VirtDriver {
public suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: Flavor): Server
/**
- * Returns the number of spawned images on the server managed by this driver.
+ * Adds the given [VirtDriverMonitor] to the list of monitors to keep informed on the state of this driver.
*
- * @return The number of spawned images.
+ * @param monitor The monitor to keep informed.
*/
- public suspend fun getNumberOfSpawnedImages(): Int
+ public suspend fun addMonitor(monitor: VirtDriverMonitor)
/**
- * Returns the available memory on the server managed by this driver.
+ * Removes the given [VirtDriverMonitor] from the list of monitors.
*
- * @return The available memory, in MB.
+ * @param monitor The monitor to unsubscribe
*/
- public suspend fun getAvailableMemory(): Long
+ public suspend fun removeMonitor(monitor: VirtDriverMonitor)
companion object Key : AbstractServiceKey<VirtDriver>(UUID.randomUUID(), "virtual-driver")
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt
new file mode 100644
index 00000000..cf2f4619
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt
@@ -0,0 +1,14 @@
+package com.atlarge.opendc.compute.virt.driver
+
+/**
+ * Monitor for entities interested in the state of a [VirtDriver].
+ */
+interface VirtDriverMonitor {
+ /**
+ * Called when the number of active servers on the server managed by this driver is updated.
+ *
+ * @param numberOfActiveServers The number of active servers.
+ * @param availableMemory The available memory, in MB.
+ */
+ public suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long)
+}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
index 4b23159f..e0547dcf 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
@@ -35,6 +35,7 @@ import com.atlarge.opendc.compute.core.execution.ServerManagementContext
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.virt.driver.VirtDriver
+import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import java.util.UUID
@@ -54,21 +55,35 @@ class HypervisorVirtDriver(
/**
* Current total memory use of the images on this hypervisor.
*/
- private var memoryAvailable: Long = hostContext.server.flavor.memorySize
+ private var availableMemory: Long = hostContext.server.flavor.memorySize
+
+ /**
+ * Monitors to keep informed.
+ */
+ private val monitors: MutableSet<VirtDriverMonitor> = mutableSetOf()
override suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: Flavor): Server {
val requiredMemory = flavor.memorySize
- if (memoryAvailable - requiredMemory < 0) {
+ if (availableMemory - requiredMemory < 0) {
throw InsufficientMemoryOnServerException()
}
require(flavor.cpuCount <= hostContext.server.flavor.cpuCount) { "Machine does not fit" }
val server = Server(UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD)
- memoryAvailable -= requiredMemory
+ availableMemory -= requiredMemory
vms.add(VmServerContext(server, monitor, simulationContext))
+ monitors.forEach { it.onUpdate(vms.size, availableMemory) }
return server
}
+ override suspend fun addMonitor(monitor: VirtDriverMonitor) {
+ monitors.add(monitor)
+ }
+
+ override suspend fun removeMonitor(monitor: VirtDriverMonitor) {
+ monitors.remove(monitor)
+ }
+
internal inner class VmServerContext(
override var server: Server,
val monitor: ServerMonitor,
@@ -103,9 +118,11 @@ class HypervisorVirtDriver(
val previousState = server.state
val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR
server = server.copy(state = state)
- memoryAvailable += server.flavor.memorySize
+ availableMemory += server.flavor.memorySize
monitor.onUpdate(server, previousState)
initialized = false
+ vms.remove(this)
+ monitors.forEach { it.onUpdate(vms.size, availableMemory) }
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt
new file mode 100644
index 00000000..41e67624
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt
@@ -0,0 +1,11 @@
+package com.atlarge.opendc.compute.virt.service
+
+import com.atlarge.opendc.compute.metal.Node
+import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage
+
+class NodeView(
+ val node: Node,
+ val hypervisor: HypervisorImage,
+ var numberOfActiveServers: Int,
+ var availableMemory: Long
+)
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index 309cfa83..5924c2c7 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -9,6 +9,7 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.compute.virt.driver.VirtDriver
+import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor
import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage
import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnServerException
import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
@@ -29,12 +30,7 @@ class SimpleVirtProvisioningService(
/**
* The available nodes.
*/
- internal val availableNodes: MutableSet<Node> = mutableSetOf()
-
- /**
- * The available hypervisors.
- */
- internal val hypervisorByNode: MutableMap<Node, HypervisorImage> = mutableMapOf()
+ internal val availableNodes: MutableSet<NodeView> = mutableSetOf()
/**
* The incoming images to be processed by the provisioner.
@@ -55,14 +51,22 @@ class SimpleVirtProvisioningService(
ctx.domain.launch {
val provisionedNodes = provisioningService.nodes().toList()
val deployedNodes = provisionedNodes.map { node ->
- val hypervisorImage =
- HypervisorImage(
- hypervisorMonitor
- )
- hypervisorByNode[node] = hypervisorImage
- provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService)
+ val hypervisorImage = HypervisorImage(hypervisorMonitor)
+ val nodeView = NodeView(
+ provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService),
+ hypervisorImage,
+ 0,
+ node.server!!.flavor.memorySize
+ )
+ node.server.serviceRegistry[VirtDriver.Key].addMonitor(object : VirtDriverMonitor {
+ override suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) {
+ nodeView.numberOfActiveServers = numberOfActiveServers
+ nodeView.availableMemory = availableMemory
+ }
+ })
+ nodeView
}
- nodes = deployedNodes
+ nodes = deployedNodes.map { it.node }
availableNodes.addAll(deployedNodes)
}
}
@@ -85,10 +89,10 @@ class SimpleVirtProvisioningService(
for (imageInstance in imagesToBeScheduled) {
println("Spawning $imageInstance")
- val selectedNode = availableNodes.minWith(allocationPolicy().thenBy { it.uid })
+ val selectedNode = availableNodes.minWith(allocationPolicy().thenBy { it.node.uid })
try {
- imageInstance.server = selectedNode?.server!!.serviceRegistry[VirtDriver.Key].spawn(
+ imageInstance.server = selectedNode?.node!!.server!!.serviceRegistry[VirtDriver.Key].spawn(
imageInstance.image,
imageInstance.monitor,
imageInstance.flavor
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt
index fc08aa87..a1c0ab9a 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt
@@ -1,6 +1,7 @@
package com.atlarge.opendc.compute.virt.service.allocation
import com.atlarge.opendc.compute.metal.Node
+import com.atlarge.opendc.compute.virt.service.NodeView
/**
* A policy for selecting the [Node] an image should be deployed to,
@@ -9,5 +10,5 @@ interface AllocationPolicy {
/**
* Builds the logic of the policy.
*/
- suspend operator fun invoke(): Comparator<Node>
+ operator fun invoke(): Comparator<NodeView>
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt
index e02e2cef..b3e9d77e 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt
@@ -1,18 +1,12 @@
package com.atlarge.opendc.compute.virt.service.allocation
-import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import kotlinx.coroutines.runBlocking
+import com.atlarge.opendc.compute.virt.service.NodeView
/**
* Allocation policy that selects the node with the most available memory.
*/
class AvailableMemoryAllocationPolicy : AllocationPolicy {
- override suspend fun invoke(): Comparator<Node> = Comparator { o1, o2 ->
- runBlocking {
- compareValuesBy(o1, o2) {
- -it.server!!.serviceRegistry[VirtDriver.Key].getAvailableMemory()
- }
- }
+ override fun invoke(): Comparator<NodeView> = Comparator { o1, o2 ->
+ compareValuesBy(o1, o2) { -it.availableMemory }
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumHostsAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumHostsAllocationPolicy.kt
deleted file mode 100644
index c5ee29c5..00000000
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumHostsAllocationPolicy.kt
+++ /dev/null
@@ -1,15 +0,0 @@
-package com.atlarge.opendc.compute.virt.service.allocation
-
-import com.atlarge.opendc.compute.metal.Node
-import com.atlarge.opendc.compute.virt.driver.VirtDriver
-import kotlinx.coroutines.runBlocking
-
-class NumHostsAllocationPolicy : AllocationPolicy {
- override suspend fun invoke(): Comparator<Node> = Comparator { o1, o2 ->
- runBlocking {
- compareValuesBy(o1, o2) {
- it.server!!.serviceRegistry[VirtDriver.Key].getNumberOfSpawnedImages()
- }
- }
- }
-}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt
new file mode 100644
index 00000000..9d6582dd
--- /dev/null
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt
@@ -0,0 +1,15 @@
+package com.atlarge.opendc.compute.virt.service.allocation
+
+import com.atlarge.opendc.compute.virt.service.NodeView
+import kotlinx.coroutines.runBlocking
+
+/**
+ * Allocation policy that selects the node with the least amount of active servers.
+ */
+class NumberOfActiveServersAllocationPolicy : AllocationPolicy {
+ override fun invoke(): Comparator<NodeView> = Comparator { o1, o2 ->
+ runBlocking {
+ compareValuesBy(o1, o2) { it.numberOfActiveServers }
+ }
+ }
+}