diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-17 22:26:15 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-25 10:48:58 +0100 |
| commit | b1cf9b2bd9559328c3c9d26e73123e67d2bfea05 (patch) | |
| tree | 62de7a5a2b386e1467171578742dc983bd9f7c19 /opendc/opendc-compute | |
| parent | 6b10881f123f5e6a8e7bce1045d02eba5e48c3a2 (diff) | |
refactor: Rework monitor interfaces
Diffstat (limited to 'opendc/opendc-compute')
12 files changed, 109 insertions, 81 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt index 86ec9a5b..31b070a4 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt @@ -28,7 +28,6 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.resource.Resource import com.atlarge.opendc.core.resource.TagContainer import com.atlarge.opendc.core.services.ServiceRegistry -import com.atlarge.opendc.core.services.ServiceRegistryImpl import java.util.UUID /** @@ -68,7 +67,7 @@ public data class Server( /** * The services published by this server. */ - public val serviceRegistry: ServiceRegistry = ServiceRegistryImpl() + public val services: ServiceRegistry = ServiceRegistry() ) : Resource { override fun hashCode(): Int = uid.hashCode() override fun equals(other: Any?): Boolean = other is Server && uid == other.uid diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index b09a5a7d..c8caaca6 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -27,7 +27,7 @@ package com.atlarge.opendc.compute.core.execution import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.core.services.AbstractServiceKey +import com.atlarge.opendc.core.services.ServiceKey /** * Represents the execution context in which an bootable [Image] runs on a [Server]. @@ -44,11 +44,9 @@ public interface ServerContext { public val cpus: List<ProcessingUnit> /** - * Publishes the given [service] with key [serviceKey] in the server's registry. + * Publish the specified [service] at the given [ServiceKey]. */ - public suspend fun <T : Any> publishService(serviceKey: AbstractServiceKey<T>, service: T) { - server.serviceRegistry[serviceKey] = service - } + public suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) /** * Request the specified burst time from the processor cores and suspend execution until a processor core finishes diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt index 26b94ba5..c2b30b9d 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt @@ -26,16 +26,25 @@ package com.atlarge.opendc.compute.core.monitor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.core.services.ServiceKey /** * An interface for monitoring the state of a machine. */ public interface ServerMonitor { /** - * This method is invoked when the state of a machine updates. + * This method is synchronously invoked when the state of a machine updates. * * @param server The server which state was updated. * @param previousState The previous state of the server. */ - public suspend fun onUpdate(server: Server, previousState: ServerState) {} + public fun stateChanged(server: Server, previousState: ServerState) {} + + /** + * This method is synchronously invoked when the server publishes a service. + * + * @param server The server that published the service. + * @param key The key of the service that was published. + */ + public fun servicePublished(server: Server, key: ServiceKey<*>) {} } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index a8f3d781..46b4c30c 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -42,6 +42,8 @@ import com.atlarge.opendc.compute.metal.NodeState import com.atlarge.opendc.compute.metal.monitor.NodeMonitor import com.atlarge.opendc.compute.metal.power.ConstantPowerModel import com.atlarge.opendc.core.power.PowerModel +import com.atlarge.opendc.core.services.ServiceKey +import com.atlarge.opendc.core.services.ServiceRegistry import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job @@ -83,20 +85,18 @@ public class SimpleBareMetalDriver( * The machine state. */ private var node: Node = Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null) + set(value) { + if (field.state != value.state) { + monitor.stateChanged(value, field.state) + } - private suspend fun setNode(value: Node) { - val field = node - if (field.state != value.state) { - monitor.onUpdate(value, field.state) - } + if (field.server != null && value.server != null && field.server!!.state != value.server.state) { + monitor.stateChanged(value.server, field.server!!.state) + } - if (field.server != null && value.server != null && field.server.state != value.server.state) { - monitor.onUpdate(value.server, field.server.state) + field = value } - node = value - } - /** * The flavor that corresponds to this machine. */ @@ -132,11 +132,11 @@ public class SimpleBareMetalDriver( emptyMap(), flavor, node.image, - ServerState.BUILD + ServerState.BUILD, + ServiceRegistry().put(BareMetalDriver, this@SimpleBareMetalDriver) ) - server.serviceRegistry[BareMetalDriver.Key] = this@SimpleBareMetalDriver - setNode(node.copy(state = NodeState.BOOT, server = server)) + node = node.copy(state = NodeState.BOOT, server = server) serverContext = BareMetalServerContext() return@withContext node } @@ -150,7 +150,7 @@ public class SimpleBareMetalDriver( serverContext!!.cancel(fail = false) serverContext = null - setNode(node.copy(state = NodeState.SHUTOFF, server = null)) + node = node.copy(state = NodeState.SHUTOFF, server = null) return@withContext node } @@ -160,7 +160,7 @@ public class SimpleBareMetalDriver( } override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) { - setNode(node.copy(image = image)) + node = node.copy(image = image) return@withContext node } @@ -195,11 +195,17 @@ public class SimpleBareMetalDriver( job.join() } + override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) { + val server = server.copy(services = server.services.put(key, service)) + node = node.copy(server = server) + monitor.servicePublished(server, key) + } + override suspend fun init() { assert(!finalized) { "Machine is already finalized" } val server = server.copy(state = ServerState.ACTIVE) - setNode(node.copy(state = NodeState.ACTIVE, server = server)) + node = node.copy(state = NodeState.ACTIVE, server = server) } override suspend fun exit(cause: Throwable?) { @@ -216,7 +222,7 @@ public class SimpleBareMetalDriver( else NodeState.ERROR val server = server.copy(state = serverState) - setNode(node.copy(state = nodeState, server = server)) + node = node.copy(state = nodeState, server = server) } private var flush: Job? = null @@ -278,8 +284,6 @@ public class SimpleBareMetalDriver( get() = domain override suspend fun fail() { - withContext(domain.coroutineContext) { - serverContext?.cancel(fail = true) - } + serverContext?.cancel(fail = true) } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/monitor/NodeMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/monitor/NodeMonitor.kt index f35cf57b..bd4b40d8 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/monitor/NodeMonitor.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/monitor/NodeMonitor.kt @@ -33,10 +33,10 @@ import com.atlarge.opendc.compute.metal.NodeState */ public interface NodeMonitor : ServerMonitor { /** - * This method is invoked when the state of a bare metal machine updates. + * This method is synchronously invoked when the state of a bare metal machine updates. * * @param node The node for which state was updated. * @param previousState The previous state of the node. */ - public suspend fun onUpdate(node: Node, previousState: NodeState) {} + public fun stateChanged(node: Node, previousState: NodeState) {} } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt index d8fe0dd9..e5cd0a77 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt @@ -32,6 +32,8 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.compute.metal.monitor.NodeMonitor +import com.atlarge.opendc.core.services.ServiceKey +import kotlinx.coroutines.launch import kotlinx.coroutines.withContext /** @@ -68,9 +70,15 @@ public class SimpleProvisioningService(val domain: Domain) : ProvisioningService return@withContext newNode } - override suspend fun onUpdate(server: Server, previousState: ServerState) { - withContext(domain.coroutineContext) { - monitors[server]?.onUpdate(server, previousState) + override fun stateChanged(server: Server, previousState: ServerState) { + domain.launch { + monitors[server]?.stateChanged(server, previousState) + } + } + + override fun servicePublished(server: Server, key: ServiceKey<*>) { + domain.launch { + monitors[server]?.servicePublished(server, key) } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index 1ff33c0c..98d8092c 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -39,6 +39,7 @@ import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.core.workload.PerformanceInterferenceModel import kotlinx.coroutines.CancellationException @@ -248,7 +249,7 @@ class HypervisorVirtDriver( } internal inner class VmServerContext( - override var server: Server, + server: Server, val monitor: ServerMonitor, val domain: Domain ) : ServerManagementContext { @@ -269,21 +270,26 @@ class HypervisorVirtDriver( } } - private suspend fun setServer(value: Server) { - val field = server - if (field.state != value.state) { - monitor.onUpdate(value, field.state) - } + override var server: Server = server + set(value) { + if (field.state != value.state) { + monitor.stateChanged(value, field.state) + } - server = value - } + field = value + } override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount) + override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) { + server = server.copy(services = server.services.put(key, service)) + monitor.servicePublished(server, key) + } + override suspend fun init() { assert(!finalized) { "VM is already finalized" } - setServer(server.copy(state = ServerState.ACTIVE)) + server = server.copy(state = ServerState.ACTIVE) initialized = true } @@ -295,7 +301,7 @@ class HypervisorVirtDriver( ServerState.SHUTOFF else ServerState.ERROR - setServer(server.copy(state = serverState)) + server = server.copy(state = serverState) availableMemory += server.flavor.memorySize vms.remove(this) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt index 996bd8eb..97842f18 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt @@ -1,11 +1,12 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage +import com.atlarge.opendc.compute.virt.driver.VirtDriver class HypervisorView( var server: Server, - val hypervisor: HypervisorImage, var numberOfActiveServers: Int, var availableMemory: Long -) +) { + lateinit var driver: VirtDriver +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index a50292a7..6fb821d7 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -8,11 +8,12 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnServerException import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy +import com.atlarge.opendc.core.services.ServiceKey +import kotlinx.coroutines.Job import kotlinx.coroutines.launch class SimpleVirtProvisioningService( @@ -46,15 +47,7 @@ class SimpleVirtProvisioningService( val provisionedNodes = provisioningService.nodes() provisionedNodes.forEach { node -> val hypervisorImage = HypervisorImage(hypervisorMonitor) - val deployedNode = provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService) - val server = deployedNode.server!! - val hvView = HypervisorView( - server, - hypervisorImage, - 0, - server.flavor.memorySize - ) - hypervisors[server] = hvView + provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService) } } } @@ -65,21 +58,29 @@ class SimpleVirtProvisioningService( requestCycle() } + private var call: Job? = null + private fun requestCycle() { - ctx.domain.launch { + if (call != null) { + return + } + + val call = ctx.domain.launch { schedule() } + call.invokeOnCompletion { this.call = null } + this.call = call } private suspend fun schedule() { val imagesToBeScheduled = incomingImages.toSet() for (imageInstance in imagesToBeScheduled) { - val selectedNode = availableHypervisors.minWith(allocationPolicy().thenBy { it.server.uid }) ?: break + val selectedHv = availableHypervisors.minWith(allocationPolicy().thenBy { it.server.uid }) ?: break try { println("Spawning ${imageInstance.image}") incomingImages -= imageInstance - imageInstance.server = selectedNode.server.serviceRegistry[VirtDriver.Key].spawn( + imageInstance.server = selectedHv.driver.spawn( imageInstance.image, imageInstance.monitor, imageInstance.flavor @@ -91,21 +92,15 @@ class SimpleVirtProvisioningService( } } - override suspend fun onUpdate(server: Server, previousState: ServerState) { - println("${server.uid} ${server.state} ${hypervisors[server]}") + override fun stateChanged(server: Server, previousState: ServerState) { when (server.state) { ServerState.ACTIVE -> { - val hv = hypervisors[server] ?: return - availableHypervisors += hv - - server.serviceRegistry[VirtDriver.Key].addMonitor(object : VirtDriverMonitor { - override suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) { - hv.numberOfActiveServers = numberOfActiveServers - hv.availableMemory = availableMemory - } - }) - - requestCycle() + val hvView = HypervisorView( + server, + 0, + server.flavor.memorySize + ) + hypervisors[server] = hvView } ServerState.SHUTOFF, ServerState.ERROR -> { val hv = hypervisors[server] ?: return @@ -116,6 +111,15 @@ class SimpleVirtProvisioningService( } } + override fun servicePublished(server: Server, key: ServiceKey<*>) { + if (key == VirtDriver.Key) { + val hv = hypervisors[server] ?: return + hv.driver = server.services[VirtDriver] + availableHypervisors += hv + requestCycle() + } + } + data class ImageView( val image: Image, val monitor: ServerMonitor, diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index 166e93b8..c5c0441c 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -59,12 +59,12 @@ internal class SimpleBareMetalDriverTest { val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", cpus, emptyList()) val monitor = object : NodeMonitor { - override suspend fun onUpdate(node: Node, previousState: NodeState) { + override fun stateChanged(node: Node, previousState: NodeState) { println(node) } - override suspend fun onUpdate(server: Server, previousState: ServerState) { - println("[${simulationContext.clock.millis()}] $server") + override fun stateChanged(server: Server, previousState: ServerState) { + println("$server") finalState = server.state } } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt index ef19427e..9cbb9baa 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt @@ -54,7 +54,7 @@ internal class SimpleProvisioningServiceTest { root.launch { val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2) val monitor = object : ServerMonitor { - override suspend fun onUpdate(server: Server, previousState: ServerState) { + override fun stateChanged(server: Server, previousState: ServerState) { println(server) } } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt index 57a7150e..9ceaf704 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt @@ -25,7 +25,6 @@ package com.atlarge.opendc.compute.virt.driver.hypervisor import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor @@ -71,8 +70,8 @@ internal class HypervisorTest { val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1) val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1) val monitor = object : NodeMonitor { - override suspend fun onUpdate(server: Server, previousState: ServerState) { - println("[${simulationContext.clock.millis()}]: $server") + override fun stateChanged(server: Server, previousState: ServerState) { + println("$server") } } @@ -89,7 +88,7 @@ internal class HypervisorTest { delay(5) val flavor = Flavor(1, 0) - val vmDriver = metalDriver.refresh().server!!.serviceRegistry[VirtDriver] + val vmDriver = metalDriver.refresh().server!!.services[VirtDriver] vmDriver.spawn(workloadA, monitor, flavor) vmDriver.spawn(workloadB, monitor, flavor) } |
