diff options
| author | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-03-26 13:55:32 +0100 |
|---|---|---|
| committer | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-03-26 13:55:32 +0100 |
| commit | 620f194c53d950a37f78577f4aacfd7c0c06bb9a (patch) | |
| tree | f5f7ffdce8efdcffb92e158ebbb643ba1a797b23 /opendc/opendc-compute | |
| parent | f4ee29bb97aed68329e72710dd3049c23f592f25 (diff) | |
| parent | 7eb8177e2278bde2c0f4fad00af6fdd2d632cb5b (diff) | |
Merge branch 'feat/2.x-failures' into '2.x'
Implement basic hardware-level failures
See merge request opendc/opendc-simulator!35
Diffstat (limited to 'opendc/opendc-compute')
31 files changed, 781 insertions, 393 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt index 86ec9a5b..01968cd8 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/Server.kt @@ -28,7 +28,7 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.resource.Resource import com.atlarge.opendc.core.resource.TagContainer import com.atlarge.opendc.core.services.ServiceRegistry -import com.atlarge.opendc.core.services.ServiceRegistryImpl +import kotlinx.coroutines.flow.Flow import java.util.UUID /** @@ -68,7 +68,12 @@ public data class Server( /** * The services published by this server. */ - public val serviceRegistry: ServiceRegistry = ServiceRegistryImpl() + public val services: ServiceRegistry, + + /** + * The events that are emitted by the server. + */ + public val events: Flow<ServerEvent> ) : Resource { override fun hashCode(): Int = uid.hashCode() override fun equals(other: Any?): Boolean = other is Server && uid == other.uid diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ServerEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ServerEvent.kt new file mode 100644 index 00000000..1595937c --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ServerEvent.kt @@ -0,0 +1,53 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.core + +import com.atlarge.opendc.core.services.ServiceKey + +/** + * An event that is emitted by a [Server]. + */ +public sealed class ServerEvent { + /** + * The server that emitted the event. + */ + public abstract val server: Server + + /** + * This event is emitted when the state of [server] changes. + * + * @property server The server of which the state changed. + * @property previousState The previous state of the server. + */ + public data class StateChanged(override val server: Server, val previousState: ServerState) : ServerEvent() + + /** + * This event is emitted when a server publishes a service. + * + * @property server The server that published the service. + * @property key The service key of the service that was published. + */ + public data class ServicePublished(override val server: Server, val key: ServiceKey<*>) : ServerEvent() +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index b09a5a7d..e0a491c8 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -27,10 +27,10 @@ package com.atlarge.opendc.compute.core.execution import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.core.services.AbstractServiceKey +import com.atlarge.opendc.core.services.ServiceKey /** - * Represents the execution context in which an bootable [Image] runs on a [Server]. + * Represents the execution context in which a bootable [Image] runs on a [Server]. */ public interface ServerContext { /** @@ -44,11 +44,9 @@ public interface ServerContext { public val cpus: List<ProcessingUnit> /** - * Publishes the given [service] with key [serviceKey] in the server's registry. + * Publish the specified [service] at the given [ServiceKey]. */ - public suspend fun <T : Any> publishService(serviceKey: AbstractServiceKey<T>, service: T) { - server.serviceRegistry[serviceKey] = service - } + public suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) /** * Request the specified burst time from the processor cores and suspend execution until a processor core finishes diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt new file mode 100644 index 00000000..e4da557b --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt @@ -0,0 +1,53 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.core.execution + +import kotlinx.coroutines.CancellationException + +/** + * This exception is thrown by the underlying [ServerContext] to indicate that a shutdown flow + * has been sent to the server. + */ +public class ShutdownException(message: String? = null, override val cause: Throwable? = null) : CancellationException(message) + +/** + * This method terminates the current active coroutine if the specified [CancellationException] is caused + * by a shutdown. + */ +public fun CancellationException.assertShutdown() { + if (this is ShutdownException) { + throw this + } +} + +/** + * This method terminates the current active coroutine if the specified [CancellationException] is caused + * by a failure. + */ +public fun CancellationException.assertFailure() { + if (this is ShutdownException && cause != null) { + throw this + } +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt index 107237ea..e77b55a6 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt @@ -26,7 +26,7 @@ package com.atlarge.opendc.compute.core.image import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer -import kotlinx.coroutines.isActive +import kotlinx.coroutines.ensureActive import java.util.UUID import kotlin.coroutines.coroutineContext import kotlin.math.min @@ -64,11 +64,8 @@ data class FlopsApplicationImage( val burst = LongArray(cores) { flops / cores } val maxUsage = DoubleArray(cores) { i -> ctx.cpus[i].frequency * utilization } - while (coroutineContext.isActive) { - if (burst.all { it == 0L }) { - break - } - + while (burst.any { it != 0L }) { + coroutineContext.ensureActive() ctx.run(burst, maxUsage, Long.MAX_VALUE) } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/PowerState.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Metadata.kt index 5fce3f48..a3a851fe 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/PowerState.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Metadata.kt @@ -24,17 +24,11 @@ package com.atlarge.opendc.compute.metal -/** - * The power state of a compute node. +/* + * Common metadata keys for bare-metal nodes. */ -public enum class PowerState { - /** - * Node is powered on. - */ - POWER_ON, - /** - * Node is powered off. - */ - POWER_OFF, -} +/** + * The cluster to which the node belongs. + */ +const val NODE_CLUSTER = "bare-metal:cluster" diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt index a43abfe9..7cb4c0c5 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt @@ -27,12 +27,13 @@ package com.atlarge.opendc.compute.metal import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.Identity +import kotlinx.coroutines.flow.Flow import java.util.UUID /** * A bare-metal compute node. */ -data class Node( +public data class Node( /** * The unique identifier of the node. */ @@ -44,9 +45,14 @@ data class Node( public override val name: String, /** - * The power state of the node. + * Metadata of the node. */ - public val powerState: PowerState, + public val metadata: Map<String, Any>, + + /** + * The last known state of the compute node. + */ + public val state: NodeState, /** * The boot image of the node. @@ -56,7 +62,12 @@ data class Node( /** * The server instance that is running on the node or `null` if no server is running. */ - public val server: Server? + public val server: Server?, + + /** + * The events that are emitted by the node. + */ + public val events: Flow<NodeEvent> ) : Identity { override fun hashCode(): Int = uid.hashCode() override fun equals(other: Any?): Boolean = other is Node && uid == other.uid diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeEvent.kt index fbfd0ad6..7719db24 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/monitor/ServerMonitor.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeEvent.kt @@ -22,20 +22,22 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.core.monitor - -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState +package com.atlarge.opendc.compute.metal /** - * An interface for monitoring the state of a machine. + * An event that is emitted by a [Node]. */ -public interface ServerMonitor { +public sealed class NodeEvent { + /** + * The node that emitted the event. + */ + public abstract val node: Node + /** - * This method is invoked when the state of a machine updates. + * This event is emitted when the state of [node] changes. * - * @param server The server which state was updated. - * @param previousState The previous state of the server. + * @property node The node of which the state changed. + * @property previousState The previous state of the node. */ - public suspend fun onUpdate(server: Server, previousState: ServerState) + public data class StateChanged(override val node: Node, val previousState: NodeState) : NodeEvent() } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeState.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeState.kt new file mode 100644 index 00000000..ca9cf509 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/NodeState.kt @@ -0,0 +1,55 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.metal + +/** + * An enumeration describing the possible states of a bare-metal compute node. + */ +public enum class NodeState { + /** + * The node is booting. + */ + BOOT, + + /** + * The node is powered off. + */ + SHUTOFF, + + /** + * The node is active and running. + */ + ACTIVE, + + /** + * The node is in error. + */ + ERROR, + + /** + * The state of the node is unknown. + */ + UNKNOWN, +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt index 1214dd36..41cec291 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt @@ -26,9 +26,8 @@ package com.atlarge.opendc.compute.metal.driver import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.compute.metal.PowerState +import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.core.power.Powerable import com.atlarge.opendc.core.services.AbstractServiceKey import kotlinx.coroutines.flow.Flow @@ -37,7 +36,12 @@ import java.util.UUID /** * A driver interface for the management interface of a bare-metal compute node. */ -public interface BareMetalDriver : Powerable { +public interface BareMetalDriver : Powerable, FailureDomain { + /** + * The [Node] that is controlled by this driver. + */ + public val node: Flow<Node> + /** * The amount of work done by the machine in percentage with respect to the total amount of processing power * available. @@ -47,12 +51,22 @@ public interface BareMetalDriver : Powerable { /** * Initialize the driver. */ - public suspend fun init(monitor: ServerMonitor): Node + public suspend fun init(): Node + + /** + * Start the bare metal node with the specified boot disk image. + */ + public suspend fun start(): Node + + /** + * Stop the bare metal node if it is running. + */ + public suspend fun stop(): Node /** - * Update the power state of the compute node. + * Reboot the bare metal node. */ - public suspend fun setPower(powerState: PowerState): Node + public suspend fun reboot(): Node /** * Update the boot disk image of the compute node. diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index c7dc74cf..4a40dc9f 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -25,23 +25,31 @@ package com.atlarge.opendc.compute.metal.driver import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.signal.Signal +import com.atlarge.odcsim.flow.EventFlow +import com.atlarge.odcsim.flow.StateFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.MemoryUnit +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.execution.ServerManagementContext +import com.atlarge.opendc.compute.core.execution.ShutdownException +import com.atlarge.opendc.compute.core.execution.assertFailure import com.atlarge.opendc.compute.core.image.EmptyImage import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.compute.metal.PowerState +import com.atlarge.opendc.compute.metal.NodeEvent +import com.atlarge.opendc.compute.metal.NodeState import com.atlarge.opendc.compute.metal.power.ConstantPowerModel import com.atlarge.opendc.core.power.PowerModel +import com.atlarge.opendc.core.services.ServiceKey +import com.atlarge.opendc.core.services.ServiceRegistry import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job +import kotlinx.coroutines.cancel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.launch @@ -50,6 +58,7 @@ import kotlin.math.ceil import kotlin.math.max import kotlin.math.min import kotlinx.coroutines.withContext +import java.lang.Exception /** * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. @@ -57,6 +66,7 @@ import kotlinx.coroutines.withContext * @param domain The simulation domain the driver runs in. * @param uid The unique identifier of the machine. * @param name An optional name of the machine. + * @param metadata The initial metadata of the node. * @param cpus The CPUs available to the bare metal machine. * @param memoryUnits The memory units in this machine. * @param powerModel The power model of this machine. @@ -65,128 +75,175 @@ public class SimpleBareMetalDriver( private val domain: Domain, uid: UUID, name: String, + metadata: Map<String, Any>, val cpus: List<ProcessingUnit>, val memoryUnits: List<MemoryUnit>, - powerModel: PowerModel<SimpleBareMetalDriver> = ConstantPowerModel(0.0) + powerModel: PowerModel<SimpleBareMetalDriver> = ConstantPowerModel( + 0.0 + ) ) : BareMetalDriver { /** - * The monitor to use. + * The flavor that corresponds to this machine. */ - private lateinit var monitor: ServerMonitor + private val flavor = Flavor(cpus.size, memoryUnits.map { it.size }.sum()) /** - * The machine state. + * The current active server context. */ - private var node: Node = Node(uid, name, PowerState.POWER_OFF, EmptyImage, null) + private var serverContext: BareMetalServerContext? = null /** - * The flavor that corresponds to this machine. + * The events of the machine. */ - private val flavor = Flavor(cpus.size, memoryUnits.map { it.size }.sum()) + private val events = EventFlow<NodeEvent>() /** - * The job that is running the image. + * The flow containing the load of the server. */ - private var job: Job? = null + private val usageState = StateFlow(0.0) /** - * The signal containing the load of the server. + * The machine state. */ - private val usageSignal = Signal(0.0) + private val nodeState = StateFlow(Node(uid, name, metadata + ("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events)) + + override val node: Flow<Node> = nodeState - override val usage: Flow<Double> = usageSignal + override val usage: Flow<Double> = usageState override val powerDraw: Flow<Double> = powerModel(this) - override suspend fun init(monitor: ServerMonitor): Node = withContext(domain.coroutineContext) { - this@SimpleBareMetalDriver.monitor = monitor - return@withContext node + override suspend fun init(): Node = withContext(domain.coroutineContext) { + nodeState.value } - override suspend fun setPower(powerState: PowerState): Node = withContext(domain.coroutineContext) { - val previousPowerState = node.powerState - val server = when (node.powerState to powerState) { - PowerState.POWER_OFF to PowerState.POWER_OFF -> null - PowerState.POWER_OFF to PowerState.POWER_ON -> Server( - UUID.randomUUID(), - node.name, - emptyMap(), - flavor, - node.image, - ServerState.BUILD - ) - PowerState.POWER_ON to PowerState.POWER_OFF -> null // TODO Terminate existing image - PowerState.POWER_ON to PowerState.POWER_ON -> node.server - else -> throw IllegalStateException() + override suspend fun start(): Node = withContext(domain.coroutineContext) { + val node = nodeState.value + if (node.state != NodeState.SHUTOFF) { + return@withContext node } - server?.serviceRegistry?.set(BareMetalDriver.Key, this@SimpleBareMetalDriver) - node = node.copy(powerState = powerState, server = server) - if (powerState != previousPowerState && server != null) { - launch() + val events = EventFlow<ServerEvent>() + val server = Server( + UUID.randomUUID(), + node.name, + emptyMap(), + flavor, + node.image, + ServerState.BUILD, + ServiceRegistry().put(BareMetalDriver, this@SimpleBareMetalDriver), + events + ) + + setNode(node.copy(state = NodeState.BOOT, server = server)) + serverContext = BareMetalServerContext(events) + return@withContext nodeState.value + } + + override suspend fun stop(): Node = withContext(domain.coroutineContext) { + val node = nodeState.value + if (node.state == NodeState.SHUTOFF) { + return@withContext node } + // We terminate the image running on the machine + serverContext!!.cancel(fail = false) + serverContext = null + + setNode(node.copy(state = NodeState.SHUTOFF, server = null)) return@withContext node } + override suspend fun reboot(): Node = withContext(domain.coroutineContext) { + stop() + start() + } + override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) { - node = node.copy(image = image) - return@withContext node + setNode(nodeState.value.copy(image = image)) + return@withContext nodeState.value } - override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node } + override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value } - /** - * Launch the server image on the machine. - */ - private suspend fun launch() { - val serverContext = serverCtx + private fun setNode(value: Node) { + val field = nodeState.value + if (field.state != value.state) { + events.emit(NodeEvent.StateChanged(value, field.state)) + } - job = domain.launch { - serverContext.init() - try { - node.server!!.image(serverContext) - serverContext.exit() - } catch (cause: Throwable) { - serverContext.exit(cause) - } + if (field.server != null && value.server != null && field.server.state != value.server.state) { + serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server.state)) } + + nodeState.value = value } - private val serverCtx = object : ServerManagementContext { - private var initialized: Boolean = false + private inner class BareMetalServerContext(val events: EventFlow<ServerEvent>) : ServerManagementContext { + private var finalized: Boolean = false override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus - override var server: Server - get() = node.server!! - set(value) { - node = node.copy(server = value) + override val server: Server + get() = nodeState.value.server!! + + private val job = domain.launch { + delay(1) // TODO Introduce boot time + init() + try { + server.image(this@BareMetalServerContext) + exit() + } catch (cause: Throwable) { + exit(cause) } + } + + /** + * Cancel the image running on the machine. + */ + suspend fun cancel(fail: Boolean) { + if (fail) + job.cancel(ShutdownException(cause = Exception("Random failure"))) + else + job.cancel(ShutdownException()) + job.join() + } + + override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) { + val server = server.copy(services = server.services.put(key, service)) + setNode(nodeState.value.copy(server = server)) + events.emit(ServerEvent.ServicePublished(server, key)) + } override suspend fun init() { - if (initialized) { - throw IllegalStateException() - } + assert(!finalized) { "Machine is already finalized" } - val previousState = server.state - server = server.copy(state = ServerState.ACTIVE) - monitor.onUpdate(server, previousState) - initialized = true + val server = server.copy(state = ServerState.ACTIVE) + setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server)) } override suspend fun exit(cause: Throwable?) { - val previousState = server.state - val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR - server = server.copy(state = state) - initialized = false - domain.launch { monitor.onUpdate(server, previousState) } + finalized = true + + val newServerState = + if (cause == null || (cause is ShutdownException && cause.cause == null)) + ServerState.SHUTOFF + else + ServerState.ERROR + val newNodeState = + if (cause == null || (cause is ShutdownException && cause.cause != null)) + nodeState.value.state + else + NodeState.ERROR + val server = server.copy(state = newServerState) + setNode(nodeState.value.copy(state = newNodeState, server = server)) } private var flush: Job? = null override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { require(burst.size == limit.size) { "Array dimensions do not match" } + assert(!finalized) { "Server instance is already finalized" } // If run is called in at the same timestamp as the previous call, cancel the load flush flush?.cancel() @@ -209,19 +266,20 @@ public class SimpleBareMetalDriver( } } - usageSignal.value = totalUsage / cpus.size + usageState.value = totalUsage / cpus.size try { delay(duration) - } catch (_: CancellationException) { - // On cancellation, we compute and return the remaining burst + } catch (e: CancellationException) { + // On non-failure cancellation, we compute and return the remaining burst + e.assertFailure() } val end = simulationContext.clock.millis() // Flush the load if the do not receive a new run call for the same timestamp - flush = domain.launch { + flush = domain.launch(job) { delay(1) - usageSignal.value = 0.0 + usageState.value = 0.0 } flush!!.invokeOnCompletion { flush = null @@ -235,4 +293,14 @@ public class SimpleBareMetalDriver( } } } + + override val scope: CoroutineScope + get() = domain + + override suspend fun fail() { + serverContext?.cancel(fail = true) + domain.cancel() + } + + override fun toString(): String = "SimpleBareMetalDriver(node = ${nodeState.value.uid})" } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt index 24ade799..105505f2 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/ProvisioningService.kt @@ -25,7 +25,6 @@ package com.atlarge.opendc.compute.metal.service import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.core.services.AbstractServiceKey @@ -53,7 +52,7 @@ public interface ProvisioningService { /** * Deploy the specified [Image] on a compute node. */ - public suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node + public suspend fun deploy(node: Node, image: Image): Node /** * The service key of this service. diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt index b18a4006..a7e143aa 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningService.kt @@ -25,31 +25,22 @@ package com.atlarge.opendc.compute.metal.service import com.atlarge.odcsim.Domain -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.compute.metal.PowerState import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import kotlinx.coroutines.withContext /** * A very basic implementation of the [ProvisioningService]. */ -public class SimpleProvisioningService(val domain: Domain) : ProvisioningService, ServerMonitor { +public class SimpleProvisioningService(val domain: Domain) : ProvisioningService { /** * The active nodes in this service. */ private val nodes: MutableMap<Node, BareMetalDriver> = mutableMapOf() - /** - * The installed monitors. - */ - private val monitors: MutableMap<Server, ServerMonitor> = mutableMapOf() - override suspend fun create(driver: BareMetalDriver): Node = withContext(domain.coroutineContext) { - val node = driver.init(this@SimpleProvisioningService) + val node = driver.init() nodes[node] = driver return@withContext node } @@ -60,19 +51,10 @@ public class SimpleProvisioningService(val domain: Domain) : ProvisioningService return@withContext nodes[node]!!.refresh() } - override suspend fun deploy(node: Node, image: Image, monitor: ServerMonitor): Node = withContext(domain.coroutineContext) { + override suspend fun deploy(node: Node, image: Image): Node = withContext(domain.coroutineContext) { val driver = nodes[node]!! - driver.setImage(image) - driver.setPower(PowerState.POWER_OFF) - val newNode = driver.setPower(PowerState.POWER_ON) - monitors[newNode.server!!] = monitor + val newNode = driver.reboot() return@withContext newNode } - - override suspend fun onUpdate(server: Server, previousState: ServerState) { - withContext(domain.coroutineContext) { - monitors[server]?.onUpdate(server, previousState) - } - } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt new file mode 100644 index 00000000..69b0124d --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt @@ -0,0 +1,58 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt + +import com.atlarge.opendc.core.Identity +import kotlinx.coroutines.flow.Flow +import java.util.UUID + +/** + * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment + * into several virtual guest machines. + */ +public class Hypervisor( + /** + * The unique identifier of the hypervisor. + */ + override val uid: UUID, + + /** + * The optional name of the hypervisor. + */ + override val name: String, + + /** + * Metadata of the hypervisor. + */ + public val metadata: Map<String, Any>, + + /** + * The events that are emitted by the hypervisor. + */ + public val events: Flow<HypervisorEvent> +) : Identity { + override fun hashCode(): Int = uid.hashCode() + override fun equals(other: Any?): Boolean = other is Hypervisor && uid == other.uid +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt new file mode 100644 index 00000000..5c19b00d --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt + +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.virt.driver.VirtDriver + +/** + * An event that is emitted by a [VirtDriver]. + */ +public sealed class HypervisorEvent { + /** + * The driver that emitted the event. + */ + public abstract val driver: VirtDriver + + /** + * This event is emitted when the number of active servers on the server managed by this driver is updated. + * + * @property driver The driver that emitted the event. + * @property numberOfActiveServers The number of active servers. + * @property availableMemory The available memory, in MB. + */ + public data class VmsUpdated( + override val driver: VirtDriver, + public val numberOfActiveServers: Int, + public val availableMemory: Long + ) : HypervisorEvent() + + /** + * This event is emitted when a slice is finished. + * + * @property driver The driver that emitted the event. + * @property requestedBurst The total requested CPU time (can be above capacity). + * @property grantedBurst The actual total granted capacity. + * @property numberOfDeployedImages The number of images deployed on this hypervisor. + */ + public data class SliceFinished( + override val driver: VirtDriver, + public val requestedBurst: Long, + public val grantedBurst: Long, + public val numberOfDeployedImages: Int, + public val hostServer: Server + ) : HypervisorEvent() +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt index 8d055953..c21b002d 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt @@ -22,32 +22,36 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.virt.driver.hypervisor +package com.atlarge.opendc.compute.virt import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import com.atlarge.opendc.core.resource.TagContainer +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.suspendCancellableCoroutine import java.util.UUID /** * A hypervisor managing the VMs of a node. */ -class HypervisorImage( - private val hypervisorMonitor: HypervisorMonitor -) : Image { +object HypervisorImage : Image { override val uid: UUID = UUID.randomUUID() override val name: String = "vmm" override val tags: TagContainer = emptyMap() override suspend fun invoke(ctx: ServerContext) { - val driver = HypervisorVirtDriver(ctx, hypervisorMonitor) + coroutineScope { + val driver = SimpleVirtDriver(ctx, this) + ctx.publishService(VirtDriver.Key, driver) - ctx.publishService(VirtDriver.Key, driver) - - // Suspend image until it is cancelled - suspendCancellableCoroutine<Unit> {} + // Suspend image until it is cancelled + try { + suspendCancellableCoroutine<Unit> {} + } finally { + driver.eventFlow.close() + } + } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt index 926234b5..0586ae00 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt @@ -1,3 +1,3 @@ -package com.atlarge.opendc.compute.virt.driver.hypervisor +package com.atlarge.opendc.compute.virt.driver public class InsufficientMemoryOnServerException : IllegalStateException("Insufficient memory left on server.") diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 430e5a37..76368080 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -22,26 +22,34 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.virt.driver.hypervisor +package com.atlarge.opendc.compute.virt.driver -import com.atlarge.odcsim.SimulationContext +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.execution.ServerManagementContext +import com.atlarge.opendc.compute.core.execution.ShutdownException +import com.atlarge.opendc.compute.core.execution.assertFailure import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import com.atlarge.opendc.compute.virt.HypervisorEvent +import com.atlarge.opendc.core.services.ServiceKey +import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.core.workload.PerformanceInterferenceModel import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.Job import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.launch import java.util.UUID import kotlin.math.ceil @@ -51,11 +59,18 @@ import kotlin.math.min /** * A [VirtDriver] that is backed by a simple hypervisor implementation. */ -class HypervisorVirtDriver( +@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) +class SimpleVirtDriver( private val hostContext: ServerContext, - private val monitor: HypervisorMonitor + private val coroutineScope: CoroutineScope ) : VirtDriver { /** + * The [Server] on which this hypervisor runs. + */ + private val server: Server + get() = hostContext.server + + /** * A set for tracking the VM context objects. */ internal val vms: MutableSet<VmServerContext> = mutableSetOf() @@ -66,31 +81,38 @@ class HypervisorVirtDriver( private var availableMemory: Long = hostContext.server.flavor.memorySize /** - * Monitors to keep informed. + * The [EventFlow] to emit the events. */ - private val monitors: MutableSet<VirtDriverMonitor> = mutableSetOf() + internal val eventFlow = EventFlow<HypervisorEvent>() - override suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: Flavor): Server { + override val events: Flow<HypervisorEvent> = eventFlow + + override suspend fun spawn( + name: String, + image: Image, + flavor: Flavor + ): Server { val requiredMemory = flavor.memorySize if (availableMemory - requiredMemory < 0) { throw InsufficientMemoryOnServerException() } require(flavor.cpuCount <= hostContext.server.flavor.cpuCount) { "Machine does not fit" } - val server = Server(UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD) + val events = EventFlow<ServerEvent>() + val server = Server( + UUID.randomUUID(), name, emptyMap(), flavor, image, ServerState.BUILD, + ServiceRegistry(), events + ) availableMemory -= requiredMemory - vms.add(VmServerContext(server, monitor, simulationContext)) - monitors.forEach { it.onUpdate(vms.size, availableMemory) } + vms.add(VmServerContext(server, events, simulationContext.domain)) + eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) return server } - override suspend fun addMonitor(monitor: VirtDriverMonitor) { - monitors.add(monitor) - } - - override suspend fun removeMonitor(monitor: VirtDriverMonitor) { - monitors.remove(monitor) - } + /** + * A flag to indicate the driver is stopped. + */ + private var stopped: Boolean = false /** * The set of [VmServerContext] instances that is being scheduled at the moment. @@ -108,12 +130,12 @@ class HypervisorVirtDriver( private suspend fun reschedule() { flush() - // Do not schedule a call if there is no work to schedule - if (activeVms.isEmpty()) { + // Do not schedule a call if there is no work to schedule or the driver stopped. + if (stopped || activeVms.isEmpty()) { return } - val call = simulationContext.domain.launch { + val call = coroutineScope.launch { val start = simulationContext.clock.millis() val vms = activeVms.toSet() @@ -200,16 +222,9 @@ class HypervisorVirtDriver( } } - monitor.onSliceFinish( - end, - totalBurst, - totalBurst - totalRemainder, - vms.size, - hostContext.server - ) + eventFlow.emit(HypervisorEvent.SliceFinished(this@SimpleVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size, server)) } this.call = call - call.invokeOnCompletion { this.call = null } } /** @@ -217,9 +232,10 @@ class HypervisorVirtDriver( */ private fun flush() { val call = call ?: return // If there is no active call, there is nothing to flush - // The progress is actually flushed in the coroutine when it notices we cancel it and wait for its + // The progress is actually flushed in the coroutine when it notices: we cancel it and wait for its // completion. call.cancel() + this.call = null } /** @@ -238,17 +254,19 @@ class HypervisorVirtDriver( } internal inner class VmServerContext( - override var server: Server, - val monitor: ServerMonitor, - ctx: SimulationContext + server: Server, + val events: EventFlow<ServerEvent>, + val domain: Domain ) : ServerManagementContext { + private var finalized: Boolean = false lateinit var requests: List<CpuRequest> lateinit var burst: LongArray var deadline: Long = 0L var chan = Channel<Unit>(Channel.RENDEZVOUS) private var initialized: Boolean = false - internal val job: Job = ctx.domain.launch { + internal val job: Job = coroutineScope.launch { + delay(1) // TODO Introduce boot time init() try { server.image(this@VmServerContext) @@ -258,28 +276,42 @@ class HypervisorVirtDriver( } } + override var server: Server = server + set(value) { + if (field.state != value.state) { + events.emit(ServerEvent.StateChanged(value, field.state)) + } + + field = value + } + override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount) + override suspend fun <T : Any> publishService(key: ServiceKey<T>, service: T) { + server = server.copy(services = server.services.put(key, service)) + events.emit(ServerEvent.ServicePublished(server, key)) + } + override suspend fun init() { - if (initialized) { - throw IllegalStateException() - } + assert(!finalized) { "VM is already finalized" } - val previousState = server.state server = server.copy(state = ServerState.ACTIVE) - monitor.onUpdate(server, previousState) initialized = true } override suspend fun exit(cause: Throwable?) { - val previousState = server.state - val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR - server = server.copy(state = state) + finalized = true + + val serverState = + if (cause == null || (cause is ShutdownException && cause.cause == null)) + ServerState.SHUTOFF + else + ServerState.ERROR + server = server.copy(state = serverState) availableMemory += server.flavor.memorySize - monitor.onUpdate(server, previousState) - initialized = false vms.remove(this) - monitors.forEach { it.onUpdate(vms.size, availableMemory) } + events.close() + eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory)) } override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { @@ -289,7 +321,14 @@ class HypervisorVirtDriver( this.burst = burst requests = cpus.asSequence() .take(burst.size) - .mapIndexed { i, cpu -> CpuRequest(this, cpu, burst[i], limit[i]) } + .mapIndexed { i, cpu -> + CpuRequest( + this, + cpu, + burst[i], + limit[i] + ) + } .toList() // Wait until the burst has been run or the coroutine is cancelled @@ -297,11 +336,13 @@ class HypervisorVirtDriver( activeVms += this reschedule() chan.receive() - } catch (_: CancellationException) { + } catch (e: CancellationException) { // On cancellation, we compute and return the remaining burst + e.assertFailure() + } finally { + activeVms -= this + reschedule() } - activeVms -= this - reschedule() } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt index d889d0f9..1002d382 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt @@ -27,8 +27,9 @@ package com.atlarge.opendc.compute.virt.driver import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor +import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.AbstractServiceKey +import kotlinx.coroutines.flow.Flow import java.util.UUID /** @@ -37,28 +38,19 @@ import java.util.UUID */ public interface VirtDriver { /** + * The events emitted by the driver. + */ + public val events: Flow<HypervisorEvent> + + /** * Spawn the given [Image] on the compute resource of this driver. * + * @param name The name of the server to spawn. * @param image The image to deploy. - * @param monitor The monitor to use for the deployment of this particular image. * @param flavor The flavor of the server which this driver is controlling. * @return The virtual server spawned by this method. */ - public suspend fun spawn(image: Image, monitor: ServerMonitor, flavor: Flavor): Server - - /** - * Adds the given [VirtDriverMonitor] to the list of monitors to keep informed on the state of this driver. - * - * @param monitor The monitor to keep informed. - */ - public suspend fun addMonitor(monitor: VirtDriverMonitor) - - /** - * Removes the given [VirtDriverMonitor] from the list of monitors. - * - * @param monitor The monitor to unsubscribe - */ - public suspend fun removeMonitor(monitor: VirtDriverMonitor) + public suspend fun spawn(name: String, image: Image, flavor: Flavor): Server companion object Key : AbstractServiceKey<VirtDriver>(UUID.randomUUID(), "virtual-driver") } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt deleted file mode 100644 index cf2f4619..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverMonitor.kt +++ /dev/null @@ -1,14 +0,0 @@ -package com.atlarge.opendc.compute.virt.driver - -/** - * Monitor for entities interested in the state of a [VirtDriver]. - */ -interface VirtDriverMonitor { - /** - * Called when the number of active servers on the server managed by this driver is updated. - * - * @param numberOfActiveServers The number of active servers. - * @param availableMemory The available memory, in MB. - */ - public suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt deleted file mode 100644 index 1e3981f6..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/monitor/HypervisorMonitor.kt +++ /dev/null @@ -1,25 +0,0 @@ -package com.atlarge.opendc.compute.virt.monitor - -import com.atlarge.opendc.compute.core.Server - -/** - * Monitoring interface for hypervisor-specific events. - */ -interface HypervisorMonitor { - /** - * Invoked after a scheduling slice has finished processed. - * - * @param time The current time (in ms). - * @param requestedBurst The total requested CPU time (can be above capacity). - * @param grantedBurst The actual total granted capacity. - * @param numberOfDeployedImages The number of images deployed on this hypervisor. - * @param hostServer The server hosting this hypervisor. - */ - suspend fun onSliceFinish( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - numberOfDeployedImages: Int, - hostServer: Server - ) -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt new file mode 100644 index 00000000..97842f18 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/HypervisorView.kt @@ -0,0 +1,12 @@ +package com.atlarge.opendc.compute.virt.service + +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.virt.driver.VirtDriver + +class HypervisorView( + var server: Server, + var numberOfActiveServers: Int, + var availableMemory: Long +) { + lateinit var driver: VirtDriver +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt deleted file mode 100644 index 41e67624..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/NodeView.kt +++ /dev/null @@ -1,11 +0,0 @@ -package com.atlarge.opendc.compute.virt.service - -import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage - -class NodeView( - val node: Node, - val hypervisor: HypervisorImage, - var numberOfActiveServers: Int, - var availableMemory: Long -) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 17960186..156521db 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -3,123 +3,175 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.odcsim.SimulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor -import com.atlarge.opendc.compute.metal.Node +import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor -import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage -import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnServerException -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import com.atlarge.opendc.compute.virt.HypervisorImage +import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerException import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy +import com.atlarge.opendc.core.services.ServiceKey +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch -import kotlinx.coroutines.yield +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlinx.coroutines.withContext +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume +@OptIn(ExperimentalCoroutinesApi::class) class SimpleVirtProvisioningService( public override val allocationPolicy: AllocationPolicy, private val ctx: SimulationContext, - private val provisioningService: ProvisioningService, - private val hypervisorMonitor: HypervisorMonitor -) : VirtProvisioningService, ServerMonitor { + private val provisioningService: ProvisioningService +) : VirtProvisioningService, CoroutineScope by ctx.domain { /** - * The nodes that are controlled by the service. + * The hypervisors that have been launched by the service. */ - internal lateinit var nodes: List<Node> + private val hypervisors: MutableMap<Server, HypervisorView> = mutableMapOf() /** - * The available nodes. + * The available hypervisors. */ - internal val availableNodes: MutableSet<NodeView> = mutableSetOf() + private val availableHypervisors: MutableSet<HypervisorView> = mutableSetOf() /** * The incoming images to be processed by the provisioner. */ - internal val incomingImages: MutableSet<ImageView> = mutableSetOf() + private val incomingImages: MutableSet<ImageView> = mutableSetOf() /** * The active images in the system. */ - internal val activeImages: MutableSet<ImageView> = mutableSetOf() + private val activeImages: MutableSet<ImageView> = mutableSetOf() init { - ctx.domain.launch { - val provisionedNodes = provisioningService.nodes().toList() - val deployedNodes = provisionedNodes.map { node -> - val hypervisorImage = HypervisorImage(hypervisorMonitor) - val deployedNode = provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService) - val nodeView = NodeView( - deployedNode, - hypervisorImage, - 0, - deployedNode.server!!.flavor.memorySize - ) - yield() - deployedNode.server.serviceRegistry[VirtDriver.Key].addMonitor(object : VirtDriverMonitor { - override suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) { - nodeView.numberOfActiveServers = numberOfActiveServers - nodeView.availableMemory = availableMemory + launch { + val provisionedNodes = provisioningService.nodes() + provisionedNodes.forEach { node -> + val hypervisorImage = HypervisorImage + val node = provisioningService.deploy(node, hypervisorImage) + node.server!!.events.onEach { event -> + when (event) { + is ServerEvent.StateChanged -> stateChanged(event.server) + is ServerEvent.ServicePublished -> servicePublished(event.server, event.key) } - }) - nodeView + }.launchIn(this) } - nodes = deployedNodes.map { it.node } - availableNodes.addAll(deployedNodes) } } - override suspend fun deploy(image: Image, monitor: ServerMonitor, flavor: Flavor) { - val vmInstance = ImageView(image, monitor, flavor) - incomingImages += vmInstance - requestCycle() + override suspend fun drivers(): Set<VirtDriver> = withContext(coroutineContext) { + availableHypervisors.map { it.driver }.toSet() + } + + override suspend fun deploy( + name: String, + image: Image, + flavor: Flavor + ): Server = withContext(coroutineContext) { + suspendCancellableCoroutine<Server> { cont -> + val vmInstance = ImageView(name, image, flavor, cont) + incomingImages += vmInstance + requestCycle() + } } + private var call: Job? = null + private fun requestCycle() { - ctx.domain.launch { + if (call != null) { + return + } + + val call = launch { + delay(1) + this@SimpleVirtProvisioningService.call = null schedule() } + this.call = call } private suspend fun schedule() { val imagesToBeScheduled = incomingImages.toSet() for (imageInstance in imagesToBeScheduled) { - println("Spawning $imageInstance") + val selectedHv = availableHypervisors.minWith(allocationPolicy().thenBy { it.server.uid }) ?: break + try { + println("Spawning ${imageInstance.image}") + incomingImages -= imageInstance - val selectedNode = availableNodes.minWith(allocationPolicy().thenBy { it.node.uid }) + // Speculatively update the hypervisor view information to prevent other images in the queue from + // deciding on stale values. + selectedHv.numberOfActiveServers++ + selectedHv.availableMemory -= (imageInstance.image as VmImage).requiredMemory // XXX Temporary hack - try { - imageInstance.server = selectedNode?.node!!.server!!.serviceRegistry[VirtDriver.Key].spawn( + val server = selectedHv.driver.spawn( + imageInstance.name, imageInstance.image, - imageInstance.monitor, imageInstance.flavor ) + imageInstance.server = server + imageInstance.continuation.resume(server) activeImages += imageInstance } catch (e: InsufficientMemoryOnServerException) { println("Unable to deploy image due to insufficient memory") - } - incomingImages -= imageInstance + selectedHv.numberOfActiveServers-- + selectedHv.availableMemory += (imageInstance.image as VmImage).requiredMemory + } } } - override suspend fun onUpdate(server: Server, previousState: ServerState) { + private fun stateChanged(server: Server) { when (server.state) { ServerState.ACTIVE -> { - // TODO handle hypervisor server becoming active + val hvView = HypervisorView( + server, + 0, + server.flavor.memorySize + ) + hypervisors[server] = hvView } ServerState.SHUTOFF, ServerState.ERROR -> { - // TODO handle hypervisor server shutting down or failing + val hv = hypervisors[server] ?: return + availableHypervisors -= hv + requestCycle() } else -> throw IllegalStateException() } } - class ImageView( + private fun servicePublished(server: Server, key: ServiceKey<*>) { + if (key == VirtDriver.Key) { + val hv = hypervisors[server] ?: return + hv.driver = server.services[VirtDriver] + availableHypervisors += hv + + hv.driver.events + .onEach { event -> + if (event is HypervisorEvent.VmsUpdated) { + hv.numberOfActiveServers = event.numberOfActiveServers + hv.availableMemory = event.availableMemory + } + }.launchIn(this) + + requestCycle() + } + } + + data class ImageView( + val name: String, val image: Image, - val monitor: ServerMonitor, val flavor: Flavor, + val continuation: Continuation<Server>, var server: Server? = null ) } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index 7770ec50..a3ade2fb 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -1,8 +1,9 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.core.monitor.ServerMonitor +import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy /** @@ -12,11 +13,16 @@ interface VirtProvisioningService { val allocationPolicy: AllocationPolicy /** + * Obtain the active hypervisors for this provisioner. + */ + public suspend fun drivers(): Set<VirtDriver> + + /** * Submit the specified [Image] to the provisioning service. * + * @param name The name of the server to deploy. * @param image The image to be deployed. - * @param monitor The monitor to inform on events. * @param flavor The flavor of the machine instance to run this [image] on. */ - public suspend fun deploy(image: Image, monitor: ServerMonitor, flavor: Flavor) + public suspend fun deploy(name: String, image: Image, flavor: Flavor): Server } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt index a1c0ab9a..e2871cca 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AllocationPolicy.kt @@ -1,7 +1,7 @@ package com.atlarge.opendc.compute.virt.service.allocation import com.atlarge.opendc.compute.metal.Node -import com.atlarge.opendc.compute.virt.service.NodeView +import com.atlarge.opendc.compute.virt.service.HypervisorView /** * A policy for selecting the [Node] an image should be deployed to, @@ -10,5 +10,5 @@ interface AllocationPolicy { /** * Builds the logic of the policy. */ - operator fun invoke(): Comparator<NodeView> + operator fun invoke(): Comparator<HypervisorView> } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt index b3e9d77e..f095849b 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/AvailableMemoryAllocationPolicy.kt @@ -1,12 +1,12 @@ package com.atlarge.opendc.compute.virt.service.allocation -import com.atlarge.opendc.compute.virt.service.NodeView +import com.atlarge.opendc.compute.virt.service.HypervisorView /** * Allocation policy that selects the node with the most available memory. */ class AvailableMemoryAllocationPolicy : AllocationPolicy { - override fun invoke(): Comparator<NodeView> = Comparator { o1, o2 -> + override fun invoke(): Comparator<HypervisorView> = Comparator { o1, o2 -> compareValuesBy(o1, o2) { -it.availableMemory } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt index 9d6582dd..59e48465 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/NumberOfActiveServersAllocationPolicy.kt @@ -1,13 +1,13 @@ package com.atlarge.opendc.compute.virt.service.allocation -import com.atlarge.opendc.compute.virt.service.NodeView +import com.atlarge.opendc.compute.virt.service.HypervisorView import kotlinx.coroutines.runBlocking /** * Allocation policy that selects the node with the least amount of active servers. */ class NumberOfActiveServersAllocationPolicy : AllocationPolicy { - override fun invoke(): Comparator<NodeView> = Comparator { o1, o2 -> + override fun invoke(): Comparator<HypervisorView> = Comparator { o1, o2 -> runBlocking { compareValuesBy(o1, o2) { it.numberOfActiveServers } } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index b8882eda..0fc64373 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -25,14 +25,12 @@ package com.atlarge.opendc.compute.metal.driver import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit -import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage -import com.atlarge.opendc.compute.core.monitor.ServerMonitor -import com.atlarge.opendc.compute.metal.PowerState +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext @@ -55,21 +53,19 @@ internal class SimpleBareMetalDriverTest { val dom = root.newDomain(name = "driver") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } - val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", cpus, emptyList()) - - val monitor = object : ServerMonitor { - override suspend fun onUpdate(server: Server, previousState: ServerState) { - println("[${simulationContext.clock.millis()}] $server") - finalState = server.state - } - } + val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 2) // Batch driver commands withContext(dom.coroutineContext) { - driver.init(monitor) + driver.init() driver.setImage(image) - driver.setPower(PowerState.POWER_ON) + val server = driver.start().server!! + server.events.collect { event -> + when (event) { + is ServerEvent.StateChanged -> { println(event); finalState = event.server.state } + } + } } } @@ -78,6 +74,6 @@ internal class SimpleBareMetalDriverTest { system.terminate() } - assertEquals(finalState, ServerState.SHUTOFF) + assertEquals(ServerState.SHUTOFF, finalState) } } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt index a837130d..f8bd786e 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt @@ -27,12 +27,10 @@ package com.atlarge.opendc.compute.metal.service import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage -import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test @@ -53,23 +51,18 @@ internal class SimpleProvisioningServiceTest { val root = system.newDomain(name = "root") root.launch { val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2) - val monitor = object : ServerMonitor { - override suspend fun onUpdate(server: Server, previousState: ServerState) { - println(server) - } - } - val dom = root.newDomain("provisioner") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } - val driver = SimpleBareMetalDriver(dom, UUID.randomUUID(), "test", cpus, emptyList()) + val driver = SimpleBareMetalDriver(dom.newDomain(), UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) val provisioner = SimpleProvisioningService(dom) provisioner.create(driver) delay(5) val nodes = provisioner.nodes() - provisioner.deploy(nodes.first(), image, monitor) + val node = provisioner.deploy(nodes.first(), image) + node.server!!.events.collect { println(it) } } runBlocking { diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt index 254ad5fe..58d784b0 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt @@ -22,22 +22,19 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.virt.driver.hypervisor +package com.atlarge.opendc.compute.virt import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit -import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingNode -import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage -import com.atlarge.opendc.compute.core.monitor.ServerMonitor -import com.atlarge.opendc.compute.metal.PowerState import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test @@ -51,6 +48,7 @@ internal class HypervisorTest { /** * A smoke test for the bare-metal driver. */ + @OptIn(ExperimentalCoroutinesApi::class) @Test fun smoke() { val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() @@ -58,41 +56,29 @@ internal class HypervisorTest { val root = system.newDomain("root") root.launch { - val vmm = HypervisorImage(object : HypervisorMonitor { - override suspend fun onSliceFinish( - time: Long, - requestedBurst: Long, - grantedBurst: Long, - numberOfDeployedImages: Int, - hostServer: Server - ) { - println("Hello World!") - } - }) + val vmm = HypervisorImage val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1) val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1) - val monitor = object : ServerMonitor { - override suspend fun onUpdate(server: Server, previousState: ServerState) { - println("[${simulationContext.clock.millis()}]: $server") - } - } val driverDom = root.newDomain("driver") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) val cpus = List(2) { ProcessingUnit(cpuNode, it, 2000.0) } - val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", cpus, emptyList()) + val metalDriver = SimpleBareMetalDriver(driverDom, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) - metalDriver.init(monitor) + metalDriver.init() metalDriver.setImage(vmm) - metalDriver.setPower(PowerState.POWER_ON) + val node = metalDriver.start() + node.server?.events?.onEach { println(it) }?.launchIn(this) delay(5) val flavor = Flavor(1, 0) - val vmDriver = metalDriver.refresh().server!!.serviceRegistry[VirtDriver] - vmDriver.spawn(workloadA, monitor, flavor) - vmDriver.spawn(workloadB, monitor, flavor) + val vmDriver = metalDriver.refresh().server!!.services[VirtDriver] + val vmA = vmDriver.spawn("a", workloadA, flavor) + vmA.events.onEach { println(it) }.launchIn(this) + val vmB = vmDriver.spawn("b", workloadB, flavor) + vmB.events.onEach { println(it) }.launchIn(this) } runBlocking { |
