From 76bfeb44c5a02be143c152c52bc1029cff360744 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 21 Mar 2020 22:04:31 +0100 Subject: refactor: Migrate to Flow for event listeners --- .../opendc/compute/core/execution/ServerContext.kt | 2 +- .../com/atlarge/opendc/compute/metal/Node.kt | 4 +- .../opendc/compute/metal/driver/BareMetalDriver.kt | 5 + .../compute/metal/driver/SimpleBareMetalDriver.kt | 72 +++-- .../com/atlarge/opendc/compute/virt/Hypervisor.kt | 58 ++++ .../atlarge/opendc/compute/virt/HypervisorEvent.kt | 65 ++++ .../atlarge/opendc/compute/virt/HypervisorImage.kt | 57 ++++ .../driver/InsufficientMemoryOnServerException.kt | 3 + .../opendc/compute/virt/driver/SimpleVirtDriver.kt | 348 +++++++++++++++++++++ .../opendc/compute/virt/driver/VirtDriver.kt | 3 +- .../opendc/compute/virt/driver/VirtDriverEvent.kt | 59 ---- .../virt/driver/hypervisor/HypervisorImage.kt | 56 ---- .../virt/driver/hypervisor/HypervisorVirtDriver.kt | 342 -------------------- .../InsufficientMemoryOnServerException.kt | 3 - .../virt/service/SimpleVirtProvisioningService.kt | 36 ++- .../virt/service/VirtProvisioningService.kt | 3 +- .../virt/driver/hypervisor/HypervisorTest.kt | 7 +- 17 files changed, 611 insertions(+), 512 deletions(-) create mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt create mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt create mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt create mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt create mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt delete mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt delete mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt delete mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt delete mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt (limited to 'opendc/opendc-compute') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index c8caaca6..e0a491c8 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -30,7 +30,7 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.services.ServiceKey /** - * Represents the execution context in which an bootable [Image] runs on a [Server]. + * Represents the execution context in which a bootable [Image] runs on a [Server]. */ public interface ServerContext { /** diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt index 8b8d1596..7cb4c0c5 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt @@ -33,7 +33,7 @@ import java.util.UUID /** * A bare-metal compute node. */ -data class Node( +public data class Node( /** * The unique identifier of the node. */ @@ -45,7 +45,7 @@ data class Node( public override val name: String, /** - * Meta data of the node. + * Metadata of the node. */ public val metadata: Map, diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt index 5d1db378..41cec291 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt @@ -37,6 +37,11 @@ import java.util.UUID * A driver interface for the management interface of a bare-metal compute node. */ public interface BareMetalDriver : Powerable, FailureDomain { + /** + * The [Node] that is controlled by this driver. + */ + public val node: Flow + /** * The amount of work done by the machine in percentage with respect to the total amount of processing power * available. diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 49c3fa2e..67069c03 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -48,10 +48,13 @@ import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.cancel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.scanReduce import kotlinx.coroutines.launch import java.util.UUID import kotlin.math.ceil @@ -96,33 +99,40 @@ public class SimpleBareMetalDriver( /** * The flow containing the load of the server. */ - private val usageSignal = StateFlow(0.0) + private val usageState = StateFlow(0.0) /** * The machine state. */ - private var node: Node = Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events) - set(value) { + private val nodeState = StateFlow(Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null, events)) + + override val node: Flow = nodeState + + override val usage: Flow = usageState + + override val powerDraw: Flow = powerModel(this) + + init { + @OptIn(ExperimentalCoroutinesApi::class) + nodeState.scanReduce { field, value -> if (field.state != value.state) { events.emit(NodeEvent.StateChanged(value, field.state)) } - if (field.server != null && value.server != null && field.server!!.state != value.server.state) { - serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server!!.state)) + if (field.server != null && value.server != null && field.server.state != value.server.state) { + serverContext!!.events.emit(ServerEvent.StateChanged(value.server, field.server.state)) } - field = value - } - - override val usage: Flow = usageSignal - - override val powerDraw: Flow = powerModel(this) + value + }.launchIn(domain) + } override suspend fun init(): Node = withContext(domain.coroutineContext) { - node + nodeState.value } override suspend fun start(): Node = withContext(domain.coroutineContext) { + val node = nodeState.value if (node.state != NodeState.SHUTOFF) { return@withContext node } @@ -139,12 +149,13 @@ public class SimpleBareMetalDriver( events ) - node = node.copy(state = NodeState.BOOT, server = server) + nodeState.value = node.copy(state = NodeState.BOOT, server = server) serverContext = BareMetalServerContext(events) - return@withContext node + return@withContext nodeState.value } override suspend fun stop(): Node = withContext(domain.coroutineContext) { + val node = nodeState.value if (node.state == NodeState.SHUTOFF) { return@withContext node } @@ -153,7 +164,7 @@ public class SimpleBareMetalDriver( serverContext!!.cancel(fail = false) serverContext = null - node = node.copy(state = NodeState.SHUTOFF, server = null) + nodeState.value = node.copy(state = NodeState.SHUTOFF, server = null) return@withContext node } @@ -163,11 +174,11 @@ public class SimpleBareMetalDriver( } override suspend fun setImage(image: Image): Node = withContext(domain.coroutineContext) { - node = node.copy(image = image) - return@withContext node + nodeState.value = nodeState.value.copy(image = image) + return@withContext nodeState.value } - override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node } + override suspend fun refresh(): Node = withContext(domain.coroutineContext) { nodeState.value } private inner class BareMetalServerContext(val events: EventFlow) : ServerManagementContext { private var finalized: Boolean = false @@ -175,7 +186,7 @@ public class SimpleBareMetalDriver( override val cpus: List = this@SimpleBareMetalDriver.cpus override val server: Server - get() = node.server!! + get() = nodeState.value.server!! private val job = domain.launch { delay(1) // TODO Introduce boot time @@ -193,15 +204,15 @@ public class SimpleBareMetalDriver( */ suspend fun cancel(fail: Boolean) { if (fail) - domain.cancel(ShutdownException(cause = Exception("Random failure"))) + job.cancel(ShutdownException(cause = Exception("Random failure"))) else - domain.cancel(ShutdownException()) + job.cancel(ShutdownException()) job.join() } override suspend fun publishService(key: ServiceKey, service: T) { val server = server.copy(services = server.services.put(key, service)) - node = node.copy(server = server) + nodeState.value = nodeState.value.copy(server = server) events.emit(ServerEvent.ServicePublished(server, key)) } @@ -209,24 +220,24 @@ public class SimpleBareMetalDriver( assert(!finalized) { "Machine is already finalized" } val server = server.copy(state = ServerState.ACTIVE) - node = node.copy(state = NodeState.ACTIVE, server = server) + nodeState.value = nodeState.value.copy(state = NodeState.ACTIVE, server = server) } override suspend fun exit(cause: Throwable?) { finalized = true - val serverState = + val newServerState = if (cause == null || (cause is ShutdownException && cause.cause == null)) ServerState.SHUTOFF else ServerState.ERROR - val nodeState = + val newNodeState = if (cause == null || (cause is ShutdownException && cause.cause != null)) - node.state + nodeState.value.state else NodeState.ERROR - val server = server.copy(state = serverState) - node = node.copy(state = nodeState, server = server) + val server = server.copy(state = newServerState) + nodeState.value = nodeState.value.copy(state = newNodeState, server = server) } private var flush: Job? = null @@ -256,7 +267,7 @@ public class SimpleBareMetalDriver( } } - usageSignal.value = totalUsage / cpus.size + usageState.value = totalUsage / cpus.size try { delay(duration) @@ -269,7 +280,7 @@ public class SimpleBareMetalDriver( // Flush the load if the do not receive a new run call for the same timestamp flush = domain.launch(job) { delay(1) - usageSignal.value = 0.0 + usageState.value = 0.0 } flush!!.invokeOnCompletion { flush = null @@ -289,5 +300,6 @@ public class SimpleBareMetalDriver( override suspend fun fail() { serverContext?.cancel(fail = true) + domain.cancel() } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt new file mode 100644 index 00000000..69b0124d --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/Hypervisor.kt @@ -0,0 +1,58 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt + +import com.atlarge.opendc.core.Identity +import kotlinx.coroutines.flow.Flow +import java.util.UUID + +/** + * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment + * into several virtual guest machines. + */ +public class Hypervisor( + /** + * The unique identifier of the hypervisor. + */ + override val uid: UUID, + + /** + * The optional name of the hypervisor. + */ + override val name: String, + + /** + * Metadata of the hypervisor. + */ + public val metadata: Map, + + /** + * The events that are emitted by the hypervisor. + */ + public val events: Flow +) : Identity { + override fun hashCode(): Int = uid.hashCode() + override fun equals(other: Any?): Boolean = other is Hypervisor && uid == other.uid +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt new file mode 100644 index 00000000..3230c2ba --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt @@ -0,0 +1,65 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt + +import com.atlarge.opendc.compute.virt.driver.VirtDriver + +/** + * An event that is emitted by a [VirtDriver]. + */ +public sealed class HypervisorEvent { + /** + * The driver that emitted the event. + */ + public abstract val driver: VirtDriver + + /** + * This event is emitted when the number of active servers on the server managed by this driver is updated. + * + * @property driver The driver that emitted the event. + * @property numberOfActiveServers The number of active servers. + * @property availableMemory The available memory, in MB. + */ + public data class VmsUpdated( + override val driver: VirtDriver, + public val numberOfActiveServers: Int, + public val availableMemory: Long + ) : HypervisorEvent() + + /** + * This event is emitted when a slice is finished. + * + * @property driver The driver that emitted the event. + * @property requestedBurst The total requested CPU time (can be above capacity). + * @property grantedBurst The actual total granted capacity. + * @property numberOfDeployedImages The number of images deployed on this hypervisor. + */ + public data class SliceFinished( + override val driver: VirtDriver, + public val requestedBurst: Long, + public val grantedBurst: Long, + public val numberOfDeployedImages: Int + ) : HypervisorEvent() +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt new file mode 100644 index 00000000..c21b002d --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt @@ -0,0 +1,57 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt + +import com.atlarge.opendc.compute.core.execution.ServerContext +import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.virt.driver.SimpleVirtDriver +import com.atlarge.opendc.compute.virt.driver.VirtDriver +import com.atlarge.opendc.core.resource.TagContainer +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.suspendCancellableCoroutine +import java.util.UUID + +/** + * A hypervisor managing the VMs of a node. + */ +object HypervisorImage : Image { + override val uid: UUID = UUID.randomUUID() + override val name: String = "vmm" + override val tags: TagContainer = emptyMap() + + override suspend fun invoke(ctx: ServerContext) { + coroutineScope { + val driver = SimpleVirtDriver(ctx, this) + ctx.publishService(VirtDriver.Key, driver) + + // Suspend image until it is cancelled + try { + suspendCancellableCoroutine {} + } finally { + driver.eventFlow.close() + } + } + } +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt new file mode 100644 index 00000000..0586ae00 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/InsufficientMemoryOnServerException.kt @@ -0,0 +1,3 @@ +package com.atlarge.opendc.compute.virt.driver + +public class InsufficientMemoryOnServerException : IllegalStateException("Insufficient memory left on server.") diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt new file mode 100644 index 00000000..fc4c7634 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -0,0 +1,348 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.virt.driver + +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.flow.EventFlow +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ProcessingUnit +import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerEvent +import com.atlarge.opendc.compute.core.ServerState +import com.atlarge.opendc.compute.core.execution.ServerContext +import com.atlarge.opendc.compute.core.execution.ServerManagementContext +import com.atlarge.opendc.compute.core.execution.ShutdownException +import com.atlarge.opendc.compute.core.execution.assertFailure +import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.virt.HypervisorEvent +import com.atlarge.opendc.core.services.ServiceKey +import com.atlarge.opendc.core.services.ServiceRegistry +import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.launch +import java.util.UUID +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min + +/** + * A [VirtDriver] that is backed by a simple hypervisor implementation. + */ +@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) +class SimpleVirtDriver( + private val hostContext: ServerContext, + private val coroutineScope: CoroutineScope +) : VirtDriver { + /** + * The [Server] on which this hypervisor runs. + */ + public val server: Server + get() = hostContext.server + + /** + * A set for tracking the VM context objects. + */ + internal val vms: MutableSet = mutableSetOf() + + /** + * Current total memory use of the images on this hypervisor. + */ + private var availableMemory: Long = hostContext.server.flavor.memorySize + + /** + * The [EventFlow] to emit the events. + */ + internal val eventFlow = EventFlow() + + override val events: Flow = eventFlow + + override suspend fun spawn( + image: Image, + flavor: Flavor + ): Server { + val requiredMemory = flavor.memorySize + if (availableMemory - requiredMemory < 0) { + throw InsufficientMemoryOnServerException() + } + require(flavor.cpuCount <= hostContext.server.flavor.cpuCount) { "Machine does not fit" } + + val events = EventFlow() + val server = Server( + UUID.randomUUID(), "", emptyMap(), flavor, image, ServerState.BUILD, + ServiceRegistry(), events + ) + availableMemory -= requiredMemory + vms.add(VmServerContext(server, events, simulationContext.domain)) + eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) + return server + } + + /** + * A flag to indicate the driver is stopped. + */ + private var stopped: Boolean = false + + /** + * The set of [VmServerContext] instances that is being scheduled at the moment. + */ + private val activeVms = mutableSetOf() + + /** + * The deferred run call. + */ + private var call: Job? = null + + /** + * Schedule the vCPUs on the physical CPUs. + */ + private suspend fun reschedule() { + flush() + + // Do not schedule a call if there is no work to schedule or the driver stopped. + if (stopped || activeVms.isEmpty()) { + return + } + + val call = coroutineScope.launch { + val start = simulationContext.clock.millis() + val vms = activeVms.toSet() + + var duration: Double = Double.POSITIVE_INFINITY + var deadline: Long = Long.MAX_VALUE + + val maxUsage = hostContext.cpus.sumByDouble { it.frequency } + var availableUsage = maxUsage + val requests = vms.asSequence() + .flatMap { it.requests.asSequence() } + .sortedBy { it.limit } + .toList() + + // Divide the available host capacity fairly across the vCPUs using max-min fair sharing + for ((i, req) in requests.withIndex()) { + val remaining = requests.size - i + val availableShare = availableUsage / remaining + val grantedUsage = min(req.limit, availableShare) + + req.allocatedUsage = grantedUsage + availableUsage -= grantedUsage + + // The duration that we want to run is that of the shortest request from a vCPU + duration = min(duration, req.burst / req.allocatedUsage) + deadline = min(deadline, req.vm.deadline) + } + + val usage = DoubleArray(hostContext.cpus.size) + val burst = LongArray(hostContext.cpus.size) + val totalUsage = maxUsage - availableUsage + availableUsage = totalUsage + val serverLoad = totalUsage / maxUsage + + // Divide the requests over the available capacity of the pCPUs fairly + for (i in hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency }) { + val remaining = hostContext.cpus.size - i + val availableShare = availableUsage / remaining + val grantedUsage = min(hostContext.cpus[i].frequency, availableShare) + + usage[i] = grantedUsage + burst[i] = (duration * grantedUsage).toLong() + availableUsage -= grantedUsage + } + + val remainder = burst.clone() + // We run the total burst on the host processor. Note that this call may be cancelled at any moment in + // time, so not all of the burst may be executed. + hostContext.run(remainder, usage, deadline) + val end = simulationContext.clock.millis() + + // No work was performed + if ((end - start) <= 0) { + return@launch + } + + val totalRemainder = remainder.sum() + val totalBurst = burst.sum() + val imagesRunning = vms.map { it.server.image }.toSet() + + for (vm in vms) { + // Apply performance interference model + val performanceModel = + vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + val performanceScore = performanceModel?.apply(imagesRunning, serverLoad) ?: 1.0 + + for ((i, req) in vm.requests.withIndex()) { + // Compute the fraction of compute time allocated to the VM + val fraction = req.allocatedUsage / totalUsage + + // Derive the burst that was allocated to this vCPU + val allocatedBurst = ceil(duration * req.allocatedUsage).toLong() + + // Compute the burst time that the VM was actually granted + val grantedBurst = (performanceScore * (allocatedBurst - ceil(totalRemainder * fraction))).toLong() + + // Compute remaining burst time to be executed for the request + req.burst = max(0, vm.burst[i] - grantedBurst) + vm.burst[i] = req.burst + } + + if (vm.burst.any { it == 0L } || vm.deadline <= end) { + // Return vCPU `run` call: the requested burst was completed or deadline was exceeded + vm.chan.send(Unit) + } + } + + eventFlow.emit(HypervisorEvent.SliceFinished(this@SimpleVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size)) + } + this.call = call + } + + /** + * Flush the progress of the current active VMs. + */ + private suspend fun flush() { + val call = call ?: return // If there is no active call, there is nothing to flush + // The progress is actually flushed in the coroutine when it notices: we cancel it and wait for its + // completion. + call.cancelAndJoin() + this.call = null + } + + /** + * A request to schedule a virtual CPU on the host cpu. + */ + internal data class CpuRequest( + val vm: VmServerContext, + val vcpu: ProcessingUnit, + var burst: Long, + val limit: Double + ) { + /** + * The usage that was actually granted. + */ + var allocatedUsage: Double = 0.0 + } + + internal inner class VmServerContext( + server: Server, + val events: EventFlow, + val domain: Domain + ) : ServerManagementContext { + private var finalized: Boolean = false + lateinit var requests: List + lateinit var burst: LongArray + var deadline: Long = 0L + var chan = Channel(Channel.RENDEZVOUS) + private var initialized: Boolean = false + + internal val job: Job = coroutineScope.launch { + delay(1) // TODO Introduce boot time + init() + try { + server.image(this@VmServerContext) + exit() + } catch (cause: Throwable) { + exit(cause) + } + } + + override var server: Server = server + set(value) { + if (field.state != value.state) { + events.emit(ServerEvent.StateChanged(value, field.state)) + } + + field = value + } + + override val cpus: List = hostContext.cpus.take(server.flavor.cpuCount) + + override suspend fun publishService(key: ServiceKey, service: T) { + server = server.copy(services = server.services.put(key, service)) + events.emit(ServerEvent.ServicePublished(server, key)) + } + + override suspend fun init() { + assert(!finalized) { "VM is already finalized" } + + server = server.copy(state = ServerState.ACTIVE) + initialized = true + } + + override suspend fun exit(cause: Throwable?) { + finalized = true + + val serverState = + if (cause == null || (cause is ShutdownException && cause.cause == null)) + ServerState.SHUTOFF + else + ServerState.ERROR + server = server.copy(state = serverState) + availableMemory += server.flavor.memorySize + vms.remove(this) + events.close() + eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory)) + } + + override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { + require(burst.size == limit.size) { "Array dimensions do not match" } + + this.deadline = deadline + this.burst = burst + requests = cpus.asSequence() + .take(burst.size) + .mapIndexed { i, cpu -> + CpuRequest( + this, + cpu, + burst[i], + limit[i] + ) + } + .toList() + + // Wait until the burst has been run or the coroutine is cancelled + try { + activeVms += this + reschedule() + chan.receive() + } catch (e: CancellationException) { + // On cancellation, we compute and return the remaining burst + e.assertFailure() + } finally { + activeVms -= this + reschedule() + } + } + } +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt index 296f170e..d7ae0c12 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriver.kt @@ -27,6 +27,7 @@ package com.atlarge.opendc.compute.virt.driver import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image +import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.AbstractServiceKey import kotlinx.coroutines.flow.Flow import java.util.UUID @@ -39,7 +40,7 @@ public interface VirtDriver { /** * The events emitted by the driver. */ - public val events: Flow + public val events: Flow /** * Spawn the given [Image] on the compute resource of this driver. diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt deleted file mode 100644 index ccbe8b3c..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/VirtDriverEvent.kt +++ /dev/null @@ -1,59 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.virt.driver - -/** - * An event that is emitted by a [VirtDriver]. - */ -public sealed class VirtDriverEvent { - /** - * The driver that emitted the event. - */ - public abstract val driver: VirtDriver - - /** - * This event is emitted when the number of active servers on the server managed by this driver is updated. - * - * @property driver The driver that emitted the event. - * @property numberOfActiveServers The number of active servers. - * @property availableMemory The available memory, in MB. - */ - public data class VmsUpdated(override val driver: VirtDriver, public val numberOfActiveServers: Int, public val availableMemory: Long) : VirtDriverEvent() - - /** - * This event is emitted when a slice is finished. - * - * @property driver The driver that emitted the event. - * @property requestedBurst The total requested CPU time (can be above capacity). - * @property grantedBurst The actual total granted capacity. - * @property numberOfDeployedImages The number of images deployed on this hypervisor. - */ - public data class SliceFinished( - override val driver: VirtDriver, - public val requestedBurst: Long, - public val grantedBurst: Long, - public val numberOfDeployedImages: Int - ) : VirtDriverEvent() -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt deleted file mode 100644 index 1eb0e0ff..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.virt.driver.hypervisor - -import com.atlarge.opendc.compute.core.execution.ServerContext -import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.core.resource.TagContainer -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.suspendCancellableCoroutine -import java.util.UUID - -/** - * A hypervisor managing the VMs of a node. - */ -object HypervisorImage : Image { - override val uid: UUID = UUID.randomUUID() - override val name: String = "vmm" - override val tags: TagContainer = emptyMap() - - override suspend fun invoke(ctx: ServerContext) { - coroutineScope { - val driver = HypervisorVirtDriver(ctx, this) - ctx.publishService(VirtDriver.Key, driver) - - // Suspend image until it is cancelled - try { - suspendCancellableCoroutine {} - } finally { - driver.eventFlow.close() - } - } - } -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt deleted file mode 100644 index 0b4a7109..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ /dev/null @@ -1,342 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.virt.driver.hypervisor - -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.flow.EventFlow -import com.atlarge.odcsim.simulationContext -import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.ProcessingUnit -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerEvent -import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.execution.ServerContext -import com.atlarge.opendc.compute.core.execution.ServerManagementContext -import com.atlarge.opendc.compute.core.execution.ShutdownException -import com.atlarge.opendc.compute.core.execution.assertFailure -import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.driver.VirtDriverEvent -import com.atlarge.opendc.core.services.ServiceKey -import com.atlarge.opendc.core.services.ServiceRegistry -import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancelAndJoin -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.launch -import java.util.UUID -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min - -/** - * A [VirtDriver] that is backed by a simple hypervisor implementation. - */ -@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) -class HypervisorVirtDriver( - private val hostContext: ServerContext, - private val coroutineScope: CoroutineScope -) : VirtDriver { - /** - * The [Server] on which this hypervisor runs. - */ - public val server: Server - get() = hostContext.server - - /** - * A set for tracking the VM context objects. - */ - internal val vms: MutableSet = mutableSetOf() - - /** - * Current total memory use of the images on this hypervisor. - */ - private var availableMemory: Long = hostContext.server.flavor.memorySize - - /** - * The [EventFlow] to emit the events. - */ - internal val eventFlow = EventFlow() - - override val events: Flow = eventFlow - - override suspend fun spawn( - image: Image, - flavor: Flavor - ): Server { - val requiredMemory = flavor.memorySize - if (availableMemory - requiredMemory < 0) { - throw InsufficientMemoryOnServerException() - } - require(flavor.cpuCount <= hostContext.server.flavor.cpuCount) { "Machine does not fit" } - - val events = EventFlow() - val server = Server( - UUID.randomUUID(), "", emptyMap(), flavor, image, ServerState.BUILD, - ServiceRegistry(), events - ) - availableMemory -= requiredMemory - vms.add(VmServerContext(server, events, simulationContext.domain)) - eventFlow.emit(VirtDriverEvent.VmsUpdated(this, vms.size, availableMemory)) - return server - } - - /** - * A flag to indicate the driver is stopped. - */ - private var stopped: Boolean = false - - /** - * The set of [VmServerContext] instances that is being scheduled at the moment. - */ - private val activeVms = mutableSetOf() - - /** - * The deferred run call. - */ - private var call: Job? = null - - /** - * Schedule the vCPUs on the physical CPUs. - */ - private suspend fun reschedule() { - flush() - - // Do not schedule a call if there is no work to schedule or the driver stopped. - if (stopped || activeVms.isEmpty()) { - return - } - - val call = coroutineScope.launch { - val start = simulationContext.clock.millis() - val vms = activeVms.toSet() - - var duration: Double = Double.POSITIVE_INFINITY - var deadline: Long = Long.MAX_VALUE - - val maxUsage = hostContext.cpus.sumByDouble { it.frequency } - var availableUsage = maxUsage - val requests = vms.asSequence() - .flatMap { it.requests.asSequence() } - .sortedBy { it.limit } - .toList() - - // Divide the available host capacity fairly across the vCPUs using max-min fair sharing - for ((i, req) in requests.withIndex()) { - val remaining = requests.size - i - val availableShare = availableUsage / remaining - val grantedUsage = min(req.limit, availableShare) - - req.allocatedUsage = grantedUsage - availableUsage -= grantedUsage - - // The duration that we want to run is that of the shortest request from a vCPU - duration = min(duration, req.burst / req.allocatedUsage) - deadline = min(deadline, req.vm.deadline) - } - - val usage = DoubleArray(hostContext.cpus.size) - val burst = LongArray(hostContext.cpus.size) - val totalUsage = maxUsage - availableUsage - availableUsage = totalUsage - val serverLoad = totalUsage / maxUsage - - // Divide the requests over the available capacity of the pCPUs fairly - for (i in hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency }) { - val remaining = hostContext.cpus.size - i - val availableShare = availableUsage / remaining - val grantedUsage = min(hostContext.cpus[i].frequency, availableShare) - - usage[i] = grantedUsage - burst[i] = (duration * grantedUsage).toLong() - availableUsage -= grantedUsage - } - - val remainder = burst.clone() - // We run the total burst on the host processor. Note that this call may be cancelled at any moment in - // time, so not all of the burst may be executed. - hostContext.run(remainder, usage, deadline) - val end = simulationContext.clock.millis() - - // No work was performed - if ((end - start) <= 0) { - return@launch - } - - val totalRemainder = remainder.sum() - val totalBurst = burst.sum() - val imagesRunning = vms.map { it.server.image }.toSet() - - for (vm in vms) { - // Apply performance interference model - val performanceModel = - vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - val performanceScore = performanceModel?.apply(imagesRunning, serverLoad) ?: 1.0 - - for ((i, req) in vm.requests.withIndex()) { - // Compute the fraction of compute time allocated to the VM - val fraction = req.allocatedUsage / totalUsage - - // Derive the burst that was allocated to this vCPU - val allocatedBurst = ceil(duration * req.allocatedUsage).toLong() - - // Compute the burst time that the VM was actually granted - val grantedBurst = (performanceScore * (allocatedBurst - ceil(totalRemainder * fraction))).toLong() - - // Compute remaining burst time to be executed for the request - req.burst = max(0, vm.burst[i] - grantedBurst) - vm.burst[i] = req.burst - } - - if (vm.burst.any { it == 0L } || vm.deadline <= end) { - // Return vCPU `run` call: the requested burst was completed or deadline was exceeded - vm.chan.send(Unit) - } - } - - eventFlow.emit(VirtDriverEvent.SliceFinished(this@HypervisorVirtDriver, totalBurst, totalBurst - totalRemainder, vms.size)) - } - this.call = call - } - - /** - * Flush the progress of the current active VMs. - */ - private suspend fun flush() { - val call = call ?: return // If there is no active call, there is nothing to flush - // The progress is actually flushed in the coroutine when it notices: we cancel it and wait for its - // completion. - call.cancelAndJoin() - this.call = null - } - - /** - * A request to schedule a virtual CPU on the host cpu. - */ - internal data class CpuRequest( - val vm: VmServerContext, - val vcpu: ProcessingUnit, - var burst: Long, - val limit: Double - ) { - /** - * The usage that was actually granted. - */ - var allocatedUsage: Double = 0.0 - } - - internal inner class VmServerContext( - server: Server, - val events: EventFlow, - val domain: Domain - ) : ServerManagementContext { - private var finalized: Boolean = false - lateinit var requests: List - lateinit var burst: LongArray - var deadline: Long = 0L - var chan = Channel(Channel.RENDEZVOUS) - private var initialized: Boolean = false - - internal val job: Job = coroutineScope.launch { - delay(1) // TODO Introduce boot time - init() - try { - server.image(this@VmServerContext) - exit() - } catch (cause: Throwable) { - exit(cause) - } - } - - override var server: Server = server - set(value) { - if (field.state != value.state) { - events.emit(ServerEvent.StateChanged(value, field.state)) - } - - field = value - } - - override val cpus: List = hostContext.cpus.take(server.flavor.cpuCount) - - override suspend fun publishService(key: ServiceKey, service: T) { - server = server.copy(services = server.services.put(key, service)) - events.emit(ServerEvent.ServicePublished(server, key)) - } - - override suspend fun init() { - assert(!finalized) { "VM is already finalized" } - - server = server.copy(state = ServerState.ACTIVE) - initialized = true - } - - override suspend fun exit(cause: Throwable?) { - finalized = true - - val serverState = - if (cause == null || (cause is ShutdownException && cause.cause == null)) - ServerState.SHUTOFF - else - ServerState.ERROR - server = server.copy(state = serverState) - availableMemory += server.flavor.memorySize - vms.remove(this) - events.close() - eventFlow.emit(VirtDriverEvent.VmsUpdated(this@HypervisorVirtDriver, vms.size, availableMemory)) - } - - override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { - require(burst.size == limit.size) { "Array dimensions do not match" } - - this.deadline = deadline - this.burst = burst - requests = cpus.asSequence() - .take(burst.size) - .mapIndexed { i, cpu -> CpuRequest(this, cpu, burst[i], limit[i]) } - .toList() - - // Wait until the burst has been run or the coroutine is cancelled - try { - activeVms += this - reschedule() - chan.receive() - } catch (e: CancellationException) { - // On cancellation, we compute and return the remaining burst - e.assertFailure() - } finally { - activeVms -= this - reschedule() - } - } - } -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt deleted file mode 100644 index 926234b5..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/InsufficientMemoryOnServerException.kt +++ /dev/null @@ -1,3 +0,0 @@ -package com.atlarge.opendc.compute.virt.driver.hypervisor - -public class InsufficientMemoryOnServerException : IllegalStateException("Insufficient memory left on server.") diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 8365f8c9..8393dfa9 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -8,22 +8,26 @@ import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.driver.hypervisor.HypervisorImage -import com.atlarge.opendc.compute.virt.driver.hypervisor.InsufficientMemoryOnServerException +import com.atlarge.opendc.compute.virt.HypervisorImage +import com.atlarge.opendc.compute.virt.driver.InsufficientMemoryOnServerException import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy import com.atlarge.opendc.core.services.ServiceKey +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume @OptIn(ExperimentalCoroutinesApi::class) class SimpleVirtProvisioningService( public override val allocationPolicy: AllocationPolicy, private val ctx: SimulationContext, private val provisioningService: ProvisioningService -) : VirtProvisioningService { +) : VirtProvisioningService, CoroutineScope by ctx.domain { /** * The hypervisors that have been launched by the service. */ @@ -45,18 +49,17 @@ class SimpleVirtProvisioningService( private val activeImages: MutableSet = mutableSetOf() init { - ctx.domain.launch { + launch { val provisionedNodes = provisioningService.nodes() provisionedNodes.forEach { node -> val hypervisorImage = HypervisorImage val node = provisioningService.deploy(node, hypervisorImage) node.server!!.events.onEach { event -> - when (event) { - is ServerEvent.StateChanged -> stateChanged(event.server, event.previousState) - is ServerEvent.ServicePublished -> servicePublished(event.server, event.key) - } + when (event) { + is ServerEvent.StateChanged -> stateChanged(event.server) + is ServerEvent.ServicePublished -> servicePublished(event.server, event.key) } - .launchIn(ctx.domain) + }.collect() } } } @@ -64,8 +67,8 @@ class SimpleVirtProvisioningService( override suspend fun deploy( image: Image, flavor: Flavor - ) { - val vmInstance = ImageView(image, flavor) + ): Server = suspendCancellableCoroutine { cont -> + val vmInstance = ImageView(image, flavor, cont) incomingImages += vmInstance requestCycle() } @@ -77,7 +80,7 @@ class SimpleVirtProvisioningService( return } - val call = ctx.domain.launch { + val call = launch { schedule() } call.invokeOnCompletion { this.call = null } @@ -92,10 +95,12 @@ class SimpleVirtProvisioningService( try { println("Spawning ${imageInstance.image}") incomingImages -= imageInstance - imageInstance.server = selectedHv.driver.spawn( + val server = selectedHv.driver.spawn( imageInstance.image, imageInstance.flavor ) + imageInstance.server = server + imageInstance.continuation.resume(server) activeImages += imageInstance } catch (e: InsufficientMemoryOnServerException) { println("Unable to deploy image due to insufficient memory") @@ -103,7 +108,7 @@ class SimpleVirtProvisioningService( } } - private fun stateChanged(server: Server, previousState: ServerState) { + private fun stateChanged(server: Server) { when (server.state) { ServerState.ACTIVE -> { val hvView = HypervisorView( @@ -134,6 +139,7 @@ class SimpleVirtProvisioningService( data class ImageView( val image: Image, val flavor: Flavor, + val continuation: Continuation, var server: Server? = null ) } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index da72d742..12543ce3 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -1,6 +1,7 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy @@ -16,5 +17,5 @@ interface VirtProvisioningService { * @param image The image to be deployed. * @param flavor The flavor of the machine instance to run this [image] on. */ - public suspend fun deploy(image: Image, flavor: Flavor) + public suspend fun deploy(image: Image, flavor: Flavor): Server } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt index d86045c0..bcaafb59 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt @@ -30,6 +30,7 @@ import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver +import com.atlarge.opendc.compute.virt.HypervisorImage import com.atlarge.opendc.compute.virt.driver.VirtDriver import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay @@ -75,8 +76,10 @@ internal class HypervisorTest { val flavor = Flavor(1, 0) val vmDriver = metalDriver.refresh().server!!.services[VirtDriver] - vmDriver.spawn(workloadA, flavor).events.onEach { println(it) }.launchIn(this) - vmDriver.spawn(workloadB, flavor) + val vmA = vmDriver.spawn(workloadA, flavor) + vmA.events.onEach { println(it) }.launchIn(this) + val vmB = vmDriver.spawn(workloadB, flavor) + vmB.events.onEach { println(it) }.launchIn(this) } runBlocking { -- cgit v1.2.3