diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-18 00:50:05 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-25 10:51:05 +0100 |
| commit | bc64182612ad06f15bff5b48637ed7d241e293b2 (patch) | |
| tree | 7b2a5fb78cc1c5f0f2a8a3f850a4652f0a433044 /opendc/opendc-compute/src/main | |
| parent | b1cf9b2bd9559328c3c9d26e73123e67d2bfea05 (diff) | |
[ci skip] refactor: Refactor monitors into EventFlow
Diffstat (limited to 'opendc/opendc-compute/src/main')
17 files changed, 216 insertions, 196 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt index 31b070a4..01968cd8 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt @@ -28,6 +28,7 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.resource.Resource import com.atlarge.opendc.core.resource.TagContainer import com.atlarge.opendc.core.services.ServiceRegistry +import kotlinx.coroutines.flow.Flow import java.util.UUID /** @@ -67,7 +68,12 @@ public data class Server( /** * The services published by this server. */ - public val services: ServiceRegistry = ServiceRegistry() + public val services: ServiceRegistry, + + /** + * The events that are emitted by the server. + */ + public val events: Flow<ServerEvent> ) : Resource { override fun hashCode(): Int = uid.hashCode() override fun equals(other: Any?): Boolean = other is Server && uid == other.uid diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ServerEvent.kt index c2b30b9d..1595937c 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ServerEvent.kt @@ -22,29 +22,32 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.core.monitor +package com.atlarge.opendc.compute.core -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.core.services.ServiceKey /** - * An interface for monitoring the state of a machine. + * An event that is emitted by a [Server]. */ -public interface ServerMonitor { +public sealed class ServerEvent { /** - * This method is synchronously invoked when the state of a machine updates. + * The server that emitted the event. + */ + public abstract val server: Server + + /** + * This event is emitted when the state of [server] changes. * - * @param server The server which state was updated. - * @param previousState The previous state of the server. + * @property server The server of which the state changed. + * @property previousState The previous state of the server. */ - public fun stateChanged(server: Server, previousState: ServerState) {} + public data class StateChanged(override val server: Server, val previousState: ServerState) : ServerEvent() /** - * This method is synchronously invoked when the server publishes a service. + * This event is emitted when a server publishes a service. * - * @param server The server that published the service. - * @param key The key of the service that was published. + * @property server The server that published the service. + * @property key The service key of the service that was published. */ - public fun servicePublished(server: Server, key: ServiceKey<*>) {} + public data class ServicePublished(override val server: Server, val key: ServiceKey<*>) : ServerEvent() } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt index abf6f8db..e4da557b 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt @@ -27,7 +27,7 @@ package com.atlarge.opendc.compute.core.execution import kotlinx.coroutines.CancellationException /** - * This exception is thrown by the underlying [ServerContext] to indicate that a shutdown signal + * This exception is thrown by the underlying [ServerContext] to indicate that a shutdown flow * has been sent to the server. */ public class ShutdownException(message: String? = null, override val cause: Throwable? = null) : CancellationException(message) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt index 55948d3c..8b8d1596 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt @@ -27,6 +27,7 @@ package com.atlarge.opendc.compute.metal import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.Identity +import kotlinx.coroutines.flow.Flow import java.util.UUID /** @@ -46,7 +47,7 @@ data class Node( /** * Meta data of the node. */ - public val metadata: Map<String, Any> = emptyMap(), + public val metadata: Map<String, Any>, /** * The last known state of the compute node. @@ -61,7 +62,12 @@ data class Node( /** * The server instance that is running on the node or `null` if no server is running. */ - public val server: Server? + public val server: Server?, + + /** + * The events that are emitted by the node. + */ + public val events: Flow<NodeEvent> ) : Identity { override fun hashCode(): Int = uid.hashCode() override fun equals(other: Any?): Boolean = other is Node && uid == other.uid diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/monitor/NodeMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeEvent.kt index bd4b40d8..7719db24 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/monitor/NodeMonitor.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeEvent.kt @@ -22,21 +22,22 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.metal.monitor - -import com.atlarge.opendc.compute.core.monitor.ServerMonitor -import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.compute.metal.NodeState +package com.atlarge.opendc.compute.metal /** - * An interface for monitoring bare-metal nodes. + * An event that is emitted by a [Node]. */ -public interface NodeMonitor : ServerMonitor { +public sealed class NodeEvent { + /** + * The node that emitted the event. + */ + public abstract val node: Node + /** - * This method is synchronously invoked when the state of a bare metal machine updates. + * This event is emitted when the state of [node] changes. * - * @param node The node for which state was updated. - * @param previousState The previous state of the node. + * @property node The node of which the state changed. + * @property previousState The previous state of the node. */ - public fun stateChanged(node: Node, previousState: NodeState) {} + public data class StateChanged(override val node: Node, val previousState: NodeState) : NodeEvent() } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt index 3956338b..5d1db378 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt @@ -27,7 +27,6 @@ package com.atlarge.opendc.compute.metal.driver import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.compute.metal.monitor.NodeMonitor import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.core.power.Powerable import com.atlarge.opendc.core.services.AbstractServiceKey @@ -47,7 +46,7 @@ public interface BareMetalDriver : Powerable, FailureDomain { /** * Initialize the driver. */ - public suspend fun init(monitor: NodeMonitor): Node + public suspend fun init(): Node /** * Start the bare metal node with the specified boot disk image. diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 46b4c30c..49c3fa2e 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -25,12 +25,14 @@ package com.atlarge.opendc.compute.metal.driver import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.signal.Signal +import com.atlarge.odcsim.flow.EventFlow +import com.atlarge.odcsim.flow.StateFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.MemoryUnit +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.execution.ShutdownException @@ -38,8 +40,8 @@ import com.atlarge.opendc.compute.core.execution.assertFailure import com.atlarge.opendc.compute.core.image.EmptyImage import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.compute.metal.NodeEvent import com.atlarge.opendc.compute.metal.NodeState -import com.atlarge.opendc.compute.metal.monitor.NodeMonitor import com.atlarge.opendc.compute.metal.power.ConstantPowerModel import com.atlarge.opendc.core.power.PowerModel import com.atlarge.opendc.core.services.ServiceKey @@ -77,48 +79,47 @@ public class SimpleBareMetalDriver( powerModel: PowerModel<SimpleBareMetalDriver> = ConstantPowerModel(0.0) ) : BareMetalDriver { /** - * The monitor to use. + * The flavor that corresponds to this machine. + */ + private val flavor = Flavor(cpus.size, memoryUnits.map { it.size }.sum()) + + /** + * The current active server context. + */ + private var serverContext: BareMetalServerContext? = null + + /** + * The events of the machine. */ - private lateinit var monitor: NodeMonitor + private val events = EventFlow<NodeEvent>() + + /** + * The flow containing the load of the server. + */ + private val usageSignal = StateFlow(0.0) /** * The machine state. */ - private var node: Node = Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null) + private var node: Node = Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events) set(value) { if (field.state != value.state) { - monitor.stateChanged(value, field.state) + events.emit(NodeEvent.StateChanged(value, field.state)) } if (field.server != null && value.server != null && field.server!!.state != value.server.state) { - monitor.stateChanged(value.server, field.server!!.state) + serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server!!.state)) } field = value } - /** - * The flavor that corresponds to this machine. - */ - private val flavor = Flavor(cpus.size, memoryUnits.map { it.size }.sum()) - - /** - * The current active server context. - */ - private var serverContext: BareMetalServerContext? = null - - /** - * The signal containing the load of the server. - */ - private val usageSignal = Signal(0.0) - override val usage: Flow<Double> = usageSignal override val powerDraw: Flow<Double> = powerModel(this) - override suspend fun init(monitor: NodeMonitor): Node = withContext(domain.coroutineContext) { - this@SimpleBareMetalDriver.monitor = monitor - return@withContext node + override suspend fun init(): Node = withContext(domain.coroutineContext) { + node } override suspend fun start(): Node = withContext(domain.coroutineContext) { @@ -126,6 +127,7 @@ public class SimpleBareMetalDriver( return@withContext node } + val events = EventFlow<ServerEvent>() val server = Server( UUID.randomUUID(), node.name, @@ -133,11 +135,12 @@ public class SimpleBareMetalDriver( flavor, node.image, ServerState.BUILD, - ServiceRegistry().put(BareMetalDriver, this@SimpleBareMetalDriver) + ServiceRegistry().put(BareMetalDriver, this@SimpleBareMetalDriver), + events ) node = node.copy(state = NodeState.BOOT, server = server) - serverContext = BareMetalServerContext() + serverContext = BareMetalServerContext(events) return@withContext node } @@ -166,7 +169,7 @@ public class SimpleBareMetalDriver( override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node } - private inner class BareMetalServerContext : ServerManagementContext { + private inner class BareMetalServerContext(val events: EventFlow<ServerEvent>) : ServerManagementContext { private var finalized: Boolean = false override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus @@ -175,6 +178,7 @@ public class SimpleBareMetalDriver( get() = node.server!! private val job = domain.launch { + delay(1) // TODO Introduce boot time init() try { server.image(this@BareMetalServerContext) @@ -198,7 +202,7 @@ public class SimpleBareMetalDriver( override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) { val server = server.copy(services = server.services.put(key, service)) node = node.copy(server = server) - monitor.servicePublished(server, key) + events.emit(ServerEvent.ServicePublished(server, key)) } override suspend fun init() { diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt index 24ade799..105505f2 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt @@ -25,7 +25,6 @@ package com.atlarge.opendc.compute.metal.service import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.core.services.AbstractServiceKey @@ -53,7 +52,7 @@ public interface ProvisioningService { /** * Deploy the specified [Image] on a compute node. */ - public suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node + public suspend fun deploy(node: Node, image: Image): Node /** * The service key of this service. diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt index e5cd0a77..a7e143aa 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt @@ -25,33 +25,22 @@ package com.atlarge.opendc.compute.metal.service import com.atlarge.odcsim.Domain -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.driver.BareMetalDriver -import com.atlarge.opendc.compute.metal.monitor.NodeMonitor -import com.atlarge.opendc.core.services.ServiceKey -import kotlinx.coroutines.launch import kotlinx.coroutines.withContext /** * A very basic implementation of the [ProvisioningService]. */ -public class SimpleProvisioningService(val domain: Domain) : ProvisioningService, NodeMonitor { +public class SimpleProvisioningService(val domain: Domain) : ProvisioningService { /** * The active nodes in this service. */ private val nodes: MutableMap<Node, BareMetalDriver> = mutableMapOf() - /** - * The installed monitors. - */ - private val monitors: MutableMap<Server, ServerMonitor> = mutableMapOf() - override suspend fun create(driver: BareMetalDriver): Node = withContext(domain.coroutineContext) { - val node = driver.init(this@SimpleProvisioningService) + val node = driver.init() nodes[node] = driver return@withContext node } @@ -62,23 +51,10 @@ public class SimpleProvisioningService(val domain: Domain) : ProvisioningService return@withContext nodes[node]!!.refresh() } - override suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node = withContext(domain.coroutineContext) { + override suspend fun deploy(node: Node, image: Image): Node = withContext(domain.coroutineContext) { val driver = nodes[node]!! driver.setImage(image) val newNode = driver.reboot() - monitors[newNode.server!!] = monitor return@withContext newNode } - - override fun stateChanged(server: Server, previousState: ServerState) { - domain.launch { - monitors[server]?.stateChanged(server, previousState) - } - } - - override fun servicePublished(server: Server, key: ServiceKey<*>) { - domain.launch { - monitors[server]?.servicePublished(server, key) - } - } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt index d889d0f9..296f170e 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt @@ -27,8 +27,8 @@ package com.atlarge.opendc.compute.virt.driver import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.core.services.AbstractServiceKey +import kotlinx.coroutines.flow.Flow import java.util.UUID /** @@ -37,28 +37,18 @@ import java.util.UUID */ public interface VirtDriver { /** + * The events emitted by the driver. + */ + public val events: Flow<VirtDriverEvent> + + /** * Spawn the given [Image] on the compute resource of this driver. * * @param image The image to deploy. - * @param monitor The monitor to use for the deployment of this particular image. * @param flavor The flavor of the server which this driver is controlling. * @return The virtual server spawned by this method. */ - public suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: Flavor): Server - - /** - * Adds the given [VirtDriverMonitor] to the list of monitors to keep informed on the state of this driver. - * - * @param monitor The monitor to keep informed. - */ - public suspend fun addMonitor(monitor: VirtDriverMonitor) - - /** - * Removes the given [VirtDriverMonitor] from the list of monitors. - * - * @param monitor The monitor to unsubscribe - */ - public suspend fun removeMonitor(monitor: VirtDriverMonitor) + public suspend fun spawn(image: Image, flavor: Flavor): Server companion object Key : AbstractServiceKey<VirtDriver>(UUID.randomUUID(), "virtual-driver") } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt new file mode 100644 index 00000000..ccbe8b3c --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt @@ -0,0 +1,59 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt.driver + +/** + * An event that is emitted by a [VirtDriver]. + */ +public sealed class VirtDriverEvent { + /** + * The driver that emitted the event. + */ + public abstract val driver: VirtDriver + + /** + * This event is emitted when the number of active servers on the server managed by this driver is updated. + * + * @property driver The driver that emitted the event. + * @property numberOfActiveServers The number of active servers. + * @property availableMemory The available memory, in MB. + */ + public data class VmsUpdated(override val driver: VirtDriver, public val numberOfActiveServers: Int, public val availableMemory: Long) : VirtDriverEvent() + + /** + * This event is emitted when a slice is finished. + * + * @property driver The driver that emitted the event. + * @property requestedBurst The total requested CPU time (can be above capacity). + * @property grantedBurst The actual total granted capacity. + * @property numberOfDeployedImages The number of images deployed on this hypervisor. + */ + public data class SliceFinished( + override val driver: VirtDriver, + public val requestedBurst: Long, + public val grantedBurst: Long, + public val numberOfDeployedImages: Int + ) : VirtDriverEvent() +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt deleted file mode 100644 index cf2f4619..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt +++ /dev/null @@ -1,14 +0,0 @@ -package com.atlarge.opendc.compute.virt.driver - -/** - * Monitor for entities interested in the state of a [VirtDriver]. - */ -interface VirtDriverMonitor { - /** - * Called when the number of active servers on the server managed by this driver is updated. - * - * @param numberOfActiveServers The number of active servers. - * @param availableMemory The available memory, in MB. - */ - public suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt index 0f4d3c15..1eb0e0ff 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt @@ -27,7 +27,6 @@ package com.atlarge.opendc.compute.virt.driver.hypervisor import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import com.atlarge.opendc.core.resource.TagContainer import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.suspendCancellableCoroutine @@ -36,20 +35,22 @@ import java.util.UUID /** * A hypervisor managing the VMs of a node. */ -class HypervisorImage( - private val hypervisorMonitor: HypervisorMonitor -) : Image { +object HypervisorImage : Image { override val uid: UUID = UUID.randomUUID() override val name: String = "vmm" override val tags: TagContainer = emptyMap() override suspend fun invoke(ctx: ServerContext) { coroutineScope { - val driver = HypervisorVirtDriver(ctx, hypervisorMonitor, this) + val driver = HypervisorVirtDriver(ctx, this) ctx.publishService(VirtDriver.Key, driver) // Suspend image until it is cancelled - suspendCancellableCoroutine<Unit> {} + try { + suspendCancellableCoroutine<Unit> {} + } finally { + driver.eventFlow.close() + } } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index 98d8092c..0b4a7109 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -25,28 +25,33 @@ package com.atlarge.opendc.compute.virt.driver.hypervisor import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.execution.ShutdownException import com.atlarge.opendc.compute.core.execution.assertFailure import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import com.atlarge.opendc.compute.virt.driver.VirtDriverEvent import com.atlarge.opendc.core.services.ServiceKey +import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.core.workload.PerformanceInterferenceModel import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.Job import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.launch import java.util.UUID import kotlin.math.ceil @@ -56,12 +61,18 @@ import kotlin.math.min /** * A [VirtDriver] that is backed by a simple hypervisor implementation. */ +@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) class HypervisorVirtDriver( private val hostContext: ServerContext, - private val monitor: HypervisorMonitor, private val coroutineScope: CoroutineScope ) : VirtDriver { /** + * The [Server] on which this hypervisor runs. + */ + public val server: Server + get() = hostContext.server + + /** * A set for tracking the VM context objects. */ internal val vms: MutableSet<VmServerContext> = mutableSetOf() @@ -72,32 +83,33 @@ class HypervisorVirtDriver( private var availableMemory: Long = hostContext.server.flavor.memorySize /** - * Monitors to keep informed. + * The [EventFlow] to emit the events. */ - private val monitors: MutableSet<VirtDriverMonitor> = mutableSetOf() + internal val eventFlow = EventFlow<VirtDriverEvent>() - override suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: Flavor): Server { + override val events: Flow<VirtDriverEvent> = eventFlow + + override suspend fun spawn( + image: Image, + flavor: Flavor + ): Server { val requiredMemory = flavor.memorySize if (availableMemory - requiredMemory < 0) { throw InsufficientMemoryOnServerException() } require(flavor.cpuCount <= hostContext.server.flavor.cpuCount) { "Machine does not fit" } - val server = Server(UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD) + val events = EventFlow<ServerEvent>() + val server = Server( + UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD, + ServiceRegistry(), events + ) availableMemory -= requiredMemory - vms.add(VmServerContext(server, monitor, simulationContext.domain)) - monitors.forEach { it.onUpdate(vms.size, availableMemory) } + vms.add(VmServerContext(server, events, simulationContext.domain)) + eventFlow.emit(VirtDriverEvent.VmsUpdated(this, vms.size, availableMemory)) return server } - override suspend fun addMonitor(monitor: VirtDriverMonitor) { - monitors.add(monitor) - } - - override suspend fun removeMonitor(monitor: VirtDriverMonitor) { - monitors.remove(monitor) - } - /** * A flag to indicate the driver is stopped. */ @@ -211,13 +223,7 @@ class HypervisorVirtDriver( } } - monitor.onSliceFinish( - end, - totalBurst, - totalBurst - totalRemainder, - vms.size, - hostContext.server - ) + eventFlow.emit(VirtDriverEvent.SliceFinished(this@HypervisorVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size)) } this.call = call } @@ -250,7 +256,7 @@ class HypervisorVirtDriver( internal inner class VmServerContext( server: Server, - val monitor: ServerMonitor, + val events: EventFlow<ServerEvent>, val domain: Domain ) : ServerManagementContext { private var finalized: Boolean = false @@ -261,6 +267,7 @@ class HypervisorVirtDriver( private var initialized: Boolean = false internal val job: Job = coroutineScope.launch { + delay(1) // TODO Introduce boot time init() try { server.image(this@VmServerContext) @@ -273,7 +280,7 @@ class HypervisorVirtDriver( override var server: Server = server set(value) { if (field.state != value.state) { - monitor.stateChanged(value, field.state) + events.emit(ServerEvent.StateChanged(value, field.state)) } field = value @@ -283,7 +290,7 @@ class HypervisorVirtDriver( override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) { server = server.copy(services = server.services.put(key, service)) - monitor.servicePublished(server, key) + events.emit(ServerEvent.ServicePublished(server, key)) } override suspend fun init() { @@ -304,8 +311,8 @@ class HypervisorVirtDriver( server = server.copy(state = serverState) availableMemory += server.flavor.memorySize vms.remove(this) - - monitors.forEach { it.onUpdate(vms.size, availableMemory) } + events.close() + eventFlow.emit(VirtDriverEvent.VmsUpdated(this@HypervisorVirtDriver, vms.size, availableMemory)) } override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt deleted file mode 100644 index 1e3981f6..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt +++ /dev/null @@ -1,25 +0,0 @@ -package com.atlarge.opendc.compute.virt.monitor - -import com.atlarge.opendc.compute.core.Server - -/** - * Monitoring interface for hypervisor-specific events. - */ -interface HypervisorMonitor { - /** - * Invoked after a scheduling slice has finished processed. - * - * @param time The current time (in ms). - * @param requestedBurst The total requested CPU time (can be above capacity). - * @param grantedBurst The actual total granted capacity. - * @param numberOfDeployedImages The number of images deployed on this hypervisor. - * @param hostServer The server hosting this hypervisor. - */ - suspend fun onSliceFinish( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - numberOfDeployedImages: Int, - hostServer: Server - ) -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 6fb821d7..8365f8c9 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -3,25 +3,27 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.odcsim.SimulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnServerException -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import com.atlarge.opendc.core.services.ServiceKey +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch +@OptIn(ExperimentalCoroutinesApi::class) class SimpleVirtProvisioningService( public override val allocationPolicy: AllocationPolicy, private val ctx: SimulationContext, - private val provisioningService: ProvisioningService, - private val hypervisorMonitor: HypervisorMonitor -) : VirtProvisioningService, ServerMonitor { + private val provisioningService: ProvisioningService +) : VirtProvisioningService { /** * The hypervisors that have been launched by the service. */ @@ -46,14 +48,24 @@ class SimpleVirtProvisioningService( ctx.domain.launch { val provisionedNodes = provisioningService.nodes() provisionedNodes.forEach { node -> - val hypervisorImage = HypervisorImage(hypervisorMonitor) - provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService) + val hypervisorImage = HypervisorImage + val node = provisioningService.deploy(node, hypervisorImage) + node.server!!.events.onEach { event -> + when (event) { + is ServerEvent.StateChanged -> stateChanged(event.server, event.previousState) + is ServerEvent.ServicePublished -> servicePublished(event.server, event.key) + } + } + .launchIn(ctx.domain) } } } - override suspend fun deploy(image: Image, monitor: ServerMonitor, flavor: Flavor) { - val vmInstance = ImageView(image, monitor, flavor) + override suspend fun deploy( + image: Image, + flavor: Flavor + ) { + val vmInstance = ImageView(image, flavor) incomingImages += vmInstance requestCycle() } @@ -82,7 +94,6 @@ class SimpleVirtProvisioningService( incomingImages -= imageInstance imageInstance.server = selectedHv.driver.spawn( imageInstance.image, - imageInstance.monitor, imageInstance.flavor ) activeImages += imageInstance @@ -92,7 +103,7 @@ class SimpleVirtProvisioningService( } } - override fun stateChanged(server: Server, previousState: ServerState) { + private fun stateChanged(server: Server, previousState: ServerState) { when (server.state) { ServerState.ACTIVE -> { val hvView = HypervisorView( @@ -111,7 +122,7 @@ class SimpleVirtProvisioningService( } } - override fun servicePublished(server: Server, key: ServiceKey<*>) { + private fun servicePublished(server: Server, key: ServiceKey<*>) { if (key == VirtDriver.Key) { val hv = hypervisors[server] ?: return hv.driver = server.services[VirtDriver] @@ -122,7 +133,6 @@ class SimpleVirtProvisioningService( data class ImageView( val image: Image, - val monitor: ServerMonitor, val flavor: Flavor, var server: Server? = null ) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index 7770ec50..da72d742 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -2,7 +2,6 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy /** @@ -15,8 +14,7 @@ interface VirtProvisioningService { * Submit the specified [Image] to the provisioning service. * * @param image The image to be deployed. - * @param monitor The monitor to inform on events. * @param flavor The flavor of the machine instance to run this [image] on. */ - public suspend fun deploy(image: Image, monitor: ServerMonitor, flavor: Flavor) + public suspend fun deploy(image: Image, flavor: Flavor) } |
