diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-05 14:44:30 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-07 16:13:15 +0100 |
| commit | a0c0657dc867db61951edff24ddc944bed132ac0 (patch) | |
| tree | f28f0aa5ef644b99365984de92ac1b2bccb1ff94 /simulator | |
| parent | 2ba5fc1247472d026f10ad5cf738dcb7e078a9ee (diff) | |
compute: Make VirtProvisoningService responsible for Server lifecycle
This change refactors the OpenDC Compute module so that the
VirtProvisioningService is now responsible for managing the lifecycle of
Server objects as opposed to the VirtDriver and BareMetalDriver
previously.
Diffstat (limited to 'simulator')
21 files changed, 623 insertions, 480 deletions
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Host.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Host.kt new file mode 100644 index 00000000..60a31b69 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Host.kt @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.core.virt + +import kotlinx.coroutines.flow.Flow +import org.opendc.compute.core.Server +import java.util.* + +/** + * Base interface for representing compute resources that host virtualized [Server] instances. + */ +public interface Host { + /** + * A unique identifier representing the host. + */ + public val uid: UUID + + /** + * The state of the host. + */ + public val state: HostState + + /** + * The events emitted by the driver. + */ + public val events: Flow<HostEvent> + + /** + * Determine whether the specified [instance][server] can still fit on this host. + */ + public fun canFit(server: Server): Boolean + + /** + * Register the specified [instance][server] on the host. + * + * Once the method returns, the instance should be running if [start] is true or else the instance should be + * stopped. + */ + public suspend fun spawn(server: Server, start: Boolean = true) + + /** + * Determine whether the specified [instance][server] exists on the host. + */ + public operator fun contains(server: Server): Boolean + + /** + * Stat the server [instance][server] if it is currently not running on this host. + * + * @throws IllegalArgumentException if the server is not present on the host. + */ + public suspend fun start(server: Server) + + /** + * Stop the server [instance][server] if it is currently running on this host. + * + * @throws IllegalArgumentException if the server is not present on the host. + */ + public suspend fun stop(server: Server) + + /** + * Terminate the specified [instance][server] on this host and cleanup all resources associated with it. + */ + public suspend fun terminate(server: Server) + + /** + * Add a [HostListener] to this host. + */ + public fun addListener(listener: HostListener) + + /** + * Remove a [HostListener] from this host. + */ + public fun removeListener(listener: HostListener) +} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostEvent.kt index d1c8d790..a07523e8 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostEvent.kt @@ -23,16 +23,15 @@ package org.opendc.compute.core.virt import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.virt.driver.VirtDriver /** - * An event that is emitted by a [VirtDriver]. + * An event that is emitted by a [Host]. */ -public sealed class HypervisorEvent { +public sealed class HostEvent { /** * The driver that emitted the event. */ - public abstract val driver: VirtDriver + public abstract val driver: Host /** * This event is emitted when the number of active servers on the server managed by this driver is updated. @@ -42,10 +41,10 @@ public sealed class HypervisorEvent { * @property availableMemory The available memory, in MB. */ public data class VmsUpdated( - override val driver: VirtDriver, + override val driver: Host, public val numberOfActiveServers: Int, public val availableMemory: Long - ) : HypervisorEvent() + ) : HostEvent() /** * This event is emitted when a slice is finished. @@ -63,7 +62,7 @@ public sealed class HypervisorEvent { * @property numberOfDeployedImages The number of images deployed on this hypervisor. */ public data class SliceFinished( - override val driver: VirtDriver, + override val driver: Host, public val requestedBurst: Long, public val grantedBurst: Long, public val overcommissionedBurst: Long, @@ -72,5 +71,5 @@ public sealed class HypervisorEvent { public val cpuDemand: Double, public val numberOfDeployedImages: Int, public val host: Node - ) : HypervisorEvent() + ) : HostEvent() } diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Hypervisor.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostListener.kt index 1ae52baa..b14d9bb5 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Hypervisor.kt +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostListener.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,35 +22,20 @@ package org.opendc.compute.core.virt -import kotlinx.coroutines.flow.Flow -import org.opendc.core.Identity -import java.util.UUID +import org.opendc.compute.core.Server +import org.opendc.compute.core.ServerState /** - * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment - * into several virtual guest machines. + * Listener interface for events originating from a [Host]. */ -public class Hypervisor( +public interface HostListener { /** - * The unique identifier of the hypervisor. + * This method is invoked when the state of an [instance][server] on [host] changes. */ - override val uid: UUID, + public fun onStateChange(host: Host, server: Server, newState: ServerState) {} /** - * The optional name of the hypervisor. + * This method is invoked when the state of a [Host] has changed. */ - override val name: String, - - /** - * Metadata of the hypervisor. - */ - public val metadata: Map<String, Any>, - - /** - * The events that are emitted by the hypervisor. - */ - public val events: Flow<HypervisorEvent> -) : Identity { - override fun hashCode(): Int = uid.hashCode() - override fun equals(other: Any?): Boolean = other is Hypervisor && uid == other.uid + public fun onStateChange(host: Host, newState: HostState) {} } diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostState.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostState.kt new file mode 100644 index 00000000..7f87f5f7 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostState.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.core.virt + +/** + * The state of a host. + */ +public enum class HostState { + /** + * The host is up. + */ + UP, + + /** + * The host is down. + */ + DOWN +} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/InsufficientMemoryOnServerException.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/InsufficientMemoryOnServerException.kt index 6fe84ea6..0f7b5826 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/InsufficientMemoryOnServerException.kt +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/InsufficientMemoryOnServerException.kt @@ -1,3 +1,3 @@ -package org.opendc.compute.core.virt.driver +package org.opendc.compute.core.virt public class InsufficientMemoryOnServerException : IllegalStateException("Insufficient memory left on server.") diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt deleted file mode 100644 index 68cc7b50..00000000 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.core.virt.driver - -import kotlinx.coroutines.flow.Flow -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.Server -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.virt.HypervisorEvent -import org.opendc.core.services.AbstractServiceKey -import java.util.UUID - -/** - * A driver interface for a hypervisor running on some host server and communicating with the central compute service to - * provide virtualization for that particular resource. - */ -public interface VirtDriver { - /** - * The events emitted by the driver. - */ - public val events: Flow<HypervisorEvent> - - /** - * Determine whether the specified [flavor] can still fit on this driver. - */ - public fun canFit(flavor: Flavor): Boolean - - /** - * Spawn the given [Image] on the compute resource of this driver. - * - * @param name The name of the server to spawn. - * @param image The image to deploy. - * @param flavor The flavor of the server which this driver is controlling. - * @return The virtual server spawned by this method. - */ - public suspend fun spawn( - name: String, - image: Image, - flavor: Flavor - ): Server - - public companion object Key : AbstractServiceKey<VirtDriver>(UUID.randomUUID(), "virtual-driver") -} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt index b967044c..8da849af 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt @@ -26,7 +26,7 @@ import kotlinx.coroutines.flow.Flow import org.opendc.compute.core.Flavor import org.opendc.compute.core.Server import org.opendc.compute.core.image.Image -import org.opendc.compute.core.virt.driver.VirtDriver +import org.opendc.compute.core.virt.Host /** * A service for VM provisioning on a cloud. @@ -40,7 +40,7 @@ public interface VirtProvisioningService { /** * Obtain the active hypervisors for this provisioner. */ - public suspend fun drivers(): Set<VirtDriver> + public suspend fun drivers(): Set<Host> /** * The number of hosts available in the system. diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt index cf2747cd..a650144b 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt @@ -23,7 +23,7 @@ package org.opendc.compute.simulator import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.virt.driver.VirtDriver +import org.opendc.compute.core.virt.Host import java.util.UUID public class HypervisorView( @@ -33,5 +33,5 @@ public class HypervisorView( public var availableMemory: Long, public var provisionedCores: Int ) { - public lateinit var driver: VirtDriver + public lateinit var driver: Host } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt new file mode 100644 index 00000000..85fa2cb6 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -0,0 +1,300 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.Flow +import mu.KotlinLogging +import org.opendc.compute.core.* +import org.opendc.compute.core.Flavor +import org.opendc.compute.core.metal.Node +import org.opendc.compute.core.virt.* +import org.opendc.simulator.compute.* +import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.workload.SimResourceCommand +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.utils.flow.EventFlow +import java.util.* +import kotlin.coroutines.resume + +/** + * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor]. + */ +public class SimHost( + override val uid: UUID, + private val coroutineScope: CoroutineScope, + hypervisor: SimHypervisorProvider +) : Host, SimWorkload { + /** + * The logger instance of this server. + */ + private val logger = KotlinLogging.logger {} + + /** + * The execution context in which the [Host] runs. + */ + private lateinit var ctx: SimExecutionContext + + override val events: Flow<HostEvent> + get() = _events + internal val _events = EventFlow<HostEvent>() + + /** + * The event listeners registered with this host. + */ + private val listeners = mutableListOf<HostListener>() + + /** + * Current total memory use of the images on this hypervisor. + */ + private var availableMemory: Long = 0 + + /** + * The hypervisor to run multiple workloads. + */ + private val hypervisor = hypervisor.create( + object : SimHypervisor.Listener { + override fun onSliceFinish( + hypervisor: SimHypervisor, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) { + _events.emit( + HostEvent.SliceFinished( + this@SimHost, + requestedWork, + grantedWork, + overcommittedWork, + interferedWork, + cpuUsage, + cpuDemand, + guests.size, + node + ) + ) + } + } + ) + + /** + * The virtual machines running on the hypervisor. + */ + private val guests = HashMap<Server, SimGuest>() + + /** + * The node on which the hypervisor runs. + */ + public val node: Node + get() = ctx.meta["node"] as Node + + override val state: HostState + get() = _state + private var _state: HostState = HostState.UP + set(value) { + listeners.forEach { it.onStateChange(this, value) } + field = value + } + + override fun canFit(server: Server): Boolean { + val sufficientMemory = availableMemory > server.flavor.memorySize + val enoughCpus = ctx.machine.cpus.size >= server.flavor.cpuCount + val canFit = hypervisor.canFit(server.flavor.toMachineModel()) + + return sufficientMemory && enoughCpus && canFit + } + + override suspend fun spawn(server: Server, start: Boolean) { + // Return if the server already exists on this host + if (server in this) { + return + } + + require(canFit(server)) { "Server does not fit" } + val guest = SimGuest(server, hypervisor.createMachine(server.flavor.toMachineModel())) + guests[server] = guest + + if (start) { + guest.start() + } + + _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory)) + } + + override fun contains(server: Server): Boolean { + return server in guests + } + + override suspend fun start(server: Server) { + val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } + guest.start() + } + + override suspend fun stop(server: Server) { + val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } + guest.stop() + } + + override suspend fun terminate(server: Server) { + val guest = guests.remove(server) ?: return + guest.terminate() + } + + override fun addListener(listener: HostListener) { + listeners.add(listener) + } + + override fun removeListener(listener: HostListener) { + listeners.remove(listener) + } + + /** + * Convert flavor to machine model. + */ + private fun Flavor.toMachineModel(): SimMachineModel { + val originalCpu = ctx.machine.cpus[0] + val processingNode = originalCpu.node.copy(coreCount = cpuCount) + val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } + val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) + + return SimMachineModel(processingUnits, memoryUnits) + } + + private fun onGuestStart(vm: SimGuest) { + guests.forEach { _, guest -> + if (guest.state == ServerState.ACTIVE) { + vm.performanceInterferenceModel?.onStart(vm.server.image.name) + } + } + + listeners.forEach { it.onStateChange(this, vm.server, vm.state) } + } + + private fun onGuestStop(vm: SimGuest) { + guests.forEach { _, guest -> + if (guest.state == ServerState.ACTIVE) { + vm.performanceInterferenceModel?.onStop(vm.server.image.name) + } + } + + listeners.forEach { it.onStateChange(this, vm.server, vm.state) } + + _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory)) + } + + /** + * A virtual machine instance that the driver manages. + */ + private inner class SimGuest(val server: Server, val machine: SimMachine) { + val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + + var state: ServerState = ServerState.SHUTOFF + + suspend fun start() { + when (state) { + ServerState.SHUTOFF -> { + logger.info { "User requested to start server ${server.uid}" } + launch() + } + ServerState.ACTIVE -> return + else -> assert(false) { "Invalid state transition" } + } + } + + suspend fun stop() { + when (state) { + ServerState.ACTIVE, ServerState.ERROR -> { + val job = job ?: throw IllegalStateException("Server should be active") + job.cancel() + job.join() + } + ServerState.SHUTOFF -> return + else -> assert(false) { "Invalid state transition" } + } + } + + suspend fun terminate() { + stop() + } + + private var job: Job? = null + + private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont -> + assert(job == null) { "Concurrent job running" } + val workload = server.image.tags["workload"] as SimWorkload + + val job = coroutineScope.launch { + delay(1) // TODO Introduce boot time + init() + cont.resume(Unit) + try { + machine.run(workload, mapOf("driver" to this@SimHost, "server" to server)) + exit(null) + } catch (cause: Throwable) { + exit(cause) + } finally { + machine.close() + } + } + this.job = job + job.invokeOnCompletion { + this.job = null + } + } + + private fun init() { + state = ServerState.ACTIVE + onGuestStart(this) + } + + private fun exit(cause: Throwable?) { + state = + if (cause == null) + ServerState.SHUTOFF + else + ServerState.ERROR + + availableMemory += server.flavor.memorySize + onGuestStop(this) + } + } + + override fun onStart(ctx: SimExecutionContext) { + this.ctx = ctx + this.availableMemory = ctx.machine.memory.map { it.size }.sum() + this.hypervisor.onStart(ctx) + } + + override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { + return hypervisor.onStart(ctx, cpu) + } + + override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { + return hypervisor.onNext(ctx, cpu, remainingWork) + } +} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt deleted file mode 100644 index 35d82211..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.launch -import org.opendc.compute.core.* -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.virt.HypervisorEvent -import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException -import org.opendc.compute.core.virt.driver.VirtDriver -import org.opendc.simulator.compute.* -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.workload.SimResourceCommand -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.utils.flow.EventFlow -import java.util.* - -/** - * A [VirtDriver] that is simulates virtual machines on a physical machine using [SimHypervisor]. - */ -public class SimVirtDriver( - private val coroutineScope: CoroutineScope, - hypervisor: SimHypervisorProvider -) : VirtDriver, SimWorkload { - /** - * The execution context in which the [VirtDriver] runs. - */ - private lateinit var ctx: SimExecutionContext - - /** - * The node on which the hypervisor runs. - */ - public val node: Node - get() = ctx.meta["node"] as Node - - /** - * The [EventFlow] to emit the events. - */ - internal val eventFlow = EventFlow<HypervisorEvent>() - - override val events: Flow<HypervisorEvent> = eventFlow - - /** - * Current total memory use of the images on this hypervisor. - */ - private var availableMemory: Long = 0 - - /** - * The hypervisor to run multiple workloads. - */ - private val hypervisor = hypervisor.create( - object : SimHypervisor.Listener { - override fun onSliceFinish( - hypervisor: SimHypervisor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - eventFlow.emit( - HypervisorEvent.SliceFinished( - this@SimVirtDriver, - requestedWork, - grantedWork, - overcommittedWork, - interferedWork, - cpuUsage, - cpuDemand, - vms.size, - node - ) - ) - } - } - ) - - /** - * The virtual machines running on the hypervisor. - */ - private val vms = HashSet<VirtualMachine>() - - override fun canFit(flavor: Flavor): Boolean { - val sufficientMemory = availableMemory > flavor.memorySize - val enoughCpus = ctx.machine.cpus.size >= flavor.cpuCount - val canFit = hypervisor.canFit(flavor.toMachineModel()) - - return sufficientMemory && enoughCpus && canFit - } - - override suspend fun spawn(name: String, image: Image, flavor: Flavor): Server { - val requiredMemory = flavor.memorySize - if (availableMemory - requiredMemory < 0) { - throw InsufficientMemoryOnServerException() - } - require(flavor.cpuCount <= ctx.machine.cpus.size) { "Machine does not fit" } - - val events = EventFlow<ServerEvent>() - val server = Server( - UUID.randomUUID(), - name, - emptyMap(), - flavor, - image, - ServerState.BUILD, - events - ) - availableMemory -= requiredMemory - - val vm = VirtualMachine(server, events, hypervisor.createMachine(flavor.toMachineModel())) - vms.add(vm) - vmStarted(vm) - eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) - return server - } - - /** - * Convert flavor to machine model. - */ - private fun Flavor.toMachineModel(): SimMachineModel { - val originalCpu = ctx.machine.cpus[0] - val processingNode = originalCpu.node.copy(coreCount = cpuCount) - val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } - val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) - - return SimMachineModel(processingUnits, memoryUnits) - } - - private fun vmStarted(vm: VirtualMachine) { - vms.forEach { - vm.performanceInterferenceModel?.onStart(it.server.image.name) - } - } - - private fun vmStopped(vm: VirtualMachine) { - vms.forEach { - vm.performanceInterferenceModel?.onStop(it.server.image.name) - } - } - - /** - * A virtual machine instance that the driver manages. - */ - private inner class VirtualMachine(server: Server, val events: EventFlow<ServerEvent>, val machine: SimMachine) { - val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - - val job = coroutineScope.launch { - val workload = server.image.tags["workload"] as SimWorkload - - delay(1) // TODO Introduce boot time - init() - try { - machine.run(workload, mapOf("driver" to this@SimVirtDriver, "server" to server)) - exit(null) - } catch (cause: Throwable) { - exit(cause) - } finally { - machine.close() - } - } - - var server: Server = server - set(value) { - if (field.state != value.state) { - events.emit(ServerEvent.StateChanged(value, field.state)) - } - - field = value - } - - private fun init() { - server = server.copy(state = ServerState.ACTIVE) - } - - private fun exit(cause: Throwable?) { - val serverState = - if (cause == null) - ServerState.SHUTOFF - else - ServerState.ERROR - server = server.copy(state = serverState) - availableMemory += server.flavor.memorySize - vms.remove(this) - vmStopped(this) - eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimVirtDriver, vms.size, availableMemory)) - events.close() - } - } - - override fun onStart(ctx: SimExecutionContext) { - this.ctx = ctx - this.availableMemory = ctx.machine.memory.map { it.size }.sum() - this.hypervisor.onStart(ctx) - } - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return hypervisor.onStart(ctx, cpu) - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return hypervisor.onNext(ctx, cpu, remainingWork) - } -} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt index 18afd0c2..ee747a9a 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt @@ -36,9 +36,7 @@ import org.opendc.compute.core.metal.Node import org.opendc.compute.core.metal.NodeEvent import org.opendc.compute.core.metal.NodeState import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.virt.HypervisorEvent -import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException -import org.opendc.compute.core.virt.driver.VirtDriver +import org.opendc.compute.core.virt.* import org.opendc.compute.core.virt.service.VirtProvisioningEvent import org.opendc.compute.core.virt.service.VirtProvisioningService import org.opendc.compute.core.virt.service.events.* @@ -62,13 +60,18 @@ public class SimVirtProvisioningService( private val tracer: EventTracer, private val hypervisor: SimHypervisorProvider, private val schedulingQuantum: Long = 300000, // 5 minutes in milliseconds -) : VirtProvisioningService { +) : VirtProvisioningService, HostListener { /** * The logger instance to use. */ private val logger = KotlinLogging.logger {} /** + * A mapping from host to hypervisor view. + */ + private val hostToHv = mutableMapOf<Host, HypervisorView>() + + /** * The hypervisors that have been launched by the service. */ private val hypervisors: MutableMap<Node, HypervisorView> = mutableMapOf() @@ -79,14 +82,19 @@ public class SimVirtProvisioningService( private val availableHypervisors: MutableSet<HypervisorView> = mutableSetOf() /** - * The incoming images to be processed by the provisioner. + * The servers that should be launched by the service. + */ + private val queue: Deque<LaunchRequest> = ArrayDeque() + + /** + * The active servers in the system. */ - private val incomingImages: Deque<ImageView> = ArrayDeque() + private val activeServers: MutableSet<Server> = mutableSetOf() /** - * The active images in the system. + * The [Random] instance used to generate unique identifiers for the objects. */ - private val activeImages: MutableSet<ImageView> = mutableSetOf() + private val random = Random(0) public var submittedVms: Int = 0 public var queuedVms: Int = 0 @@ -102,12 +110,9 @@ public class SimVirtProvisioningService( */ private val allocationLogic = allocationPolicy() - /** - * The [EventFlow] to emit the events. - */ - internal val eventFlow = EventFlow<VirtProvisioningEvent>() - - override val events: Flow<VirtProvisioningEvent> = eventFlow + override val events: Flow<VirtProvisioningEvent> + get() = _events + private val _events = EventFlow<VirtProvisioningEvent>() /** * The [TimerScheduler] to use for scheduling the scheduler cycles. @@ -118,7 +123,8 @@ public class SimVirtProvisioningService( coroutineScope.launch { val provisionedNodes = provisioningService.nodes() provisionedNodes.forEach { node -> - val workload = SimVirtDriver(coroutineScope, hypervisor) + val workload = SimHost(UUID(random.nextLong(), random.nextLong()), coroutineScope, hypervisor) + workload.addListener(this@SimVirtProvisioningService) val hypervisorImage = Image(UUID.randomUUID(), "vmm", mapOf("workload" to workload)) launch { val deployedNode = provisioningService.deploy(node, hypervisorImage) @@ -132,7 +138,7 @@ public class SimVirtProvisioningService( } } - override suspend fun drivers(): Set<VirtDriver> { + override suspend fun drivers(): Set<Host> { return availableHypervisors.map { it.driver }.toSet() } @@ -145,7 +151,7 @@ public class SimVirtProvisioningService( ): Server { tracer.commit(VmSubmissionEvent(name, image, flavor)) - eventFlow.emit( + _events.emit( VirtProvisioningEvent.MetricsAvailable( this@SimVirtProvisioningService, hypervisors.size, @@ -158,9 +164,9 @@ public class SimVirtProvisioningService( ) ) - return suspendCancellableCoroutine<Server> { cont -> - val vmInstance = ImageView(name, image, flavor, cont) - incomingImages += vmInstance + return suspendCancellableCoroutine { cont -> + val request = LaunchRequest(createServer(name, image, flavor), cont) + queue += request requestCycle() } } @@ -170,6 +176,22 @@ public class SimVirtProvisioningService( provisionedNodes.forEach { node -> provisioningService.stop(node) } } + private fun createServer( + name: String, + image: Image, + flavor: Flavor + ): Server { + return Server( + uid = UUID(random.nextLong(), random.nextLong()), + name = name, + tags = emptyMap(), + flavor = flavor, + image = image, + state = ServerState.BUILD, + events = EventFlow() + ) + } + private fun requestCycle() { // Bail out in case we have already requested a new cycle. if (scheduler.isTimerActive(Unit)) { @@ -182,23 +204,23 @@ public class SimVirtProvisioningService( val delay = schedulingQuantum - (clock.millis() % schedulingQuantum) scheduler.startSingleTimer(Unit, delay) { - coroutineScope.launch { schedule() } + schedule() } } - private suspend fun schedule() { - while (incomingImages.isNotEmpty()) { - val imageInstance = incomingImages.peekFirst() - val requiredMemory = imageInstance.flavor.memorySize - val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) + private fun schedule() { + while (queue.isNotEmpty()) { + val (server, cont) = queue.peekFirst() + val requiredMemory = server.flavor.memorySize + val selectedHv = allocationLogic.select(availableHypervisors, server) - if (selectedHv == null || !selectedHv.driver.canFit(imageInstance.flavor)) { - logger.trace { "Image ${imageInstance.image} selected for scheduling but no capacity available for it." } + if (selectedHv == null || !selectedHv.driver.canFit(server)) { + logger.trace { "Server $server selected for scheduling but no capacity available for it." } - if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { - tracer.commit(VmSubmissionInvalidEvent(imageInstance.name)) + if (requiredMemory > maxMemory || server.flavor.cpuCount > maxCores) { + tracer.commit(VmSubmissionInvalidEvent(server.name)) - eventFlow.emit( + _events.emit( VirtProvisioningEvent.MetricsAvailable( this@SimVirtProvisioningService, hypervisors.size, @@ -212,9 +234,9 @@ public class SimVirtProvisioningService( ) // Remove the incoming image - incomingImages.poll() + queue.poll() - logger.warn("Failed to spawn ${imageInstance.image}: does not fit [${clock.millis()}]") + logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]") continue } else { break @@ -222,86 +244,49 @@ public class SimVirtProvisioningService( } try { - logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.node.uid} ${selectedHv.node.name} ${selectedHv.node.flavor}" } - incomingImages.poll() + logger.info { "[${clock.millis()}] Spawning $server on ${selectedHv.node.uid} ${selectedHv.node.name} ${selectedHv.node.flavor}" } + queue.poll() // Speculatively update the hypervisor view information to prevent other images in the queue from // deciding on stale values. selectedHv.numberOfActiveServers++ - selectedHv.provisionedCores += imageInstance.flavor.cpuCount + selectedHv.provisionedCores += server.flavor.cpuCount selectedHv.availableMemory -= requiredMemory // XXX Temporary hack - val server = selectedHv.driver.spawn( - imageInstance.name, - imageInstance.image, - imageInstance.flavor - ) - imageInstance.server = server - imageInstance.continuation.resume(server) - - tracer.commit(VmScheduledEvent(imageInstance.name)) - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - ++runningVms, - finishedVms, - --queuedVms, - unscheduledVms - ) - ) - activeImages += imageInstance + coroutineScope.launch { + try { + cont.resume(server) + selectedHv.driver.spawn(server) + activeServers += server + + tracer.commit(VmScheduledEvent(server.name)) + _events.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + ++runningVms, + finishedVms, + --queuedVms, + unscheduledVms + ) + ) + } catch (e: InsufficientMemoryOnServerException) { + logger.error("Failed to deploy VM", e) - server.events - .onEach { event -> - when (event) { - is ServerEvent.StateChanged -> { - if (event.server.state == ServerState.SHUTOFF) { - logger.info { "[${clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } - - tracer.commit(VmStoppedEvent(event.server.name)) - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - --runningVms, - ++finishedVms, - queuedVms, - unscheduledVms - ) - ) - - activeImages -= imageInstance - selectedHv.provisionedCores -= server.flavor.cpuCount - - // Try to reschedule if needed - if (incomingImages.isNotEmpty()) { - requestCycle() - } - } - } - } + selectedHv.numberOfActiveServers-- + selectedHv.provisionedCores -= server.flavor.cpuCount + selectedHv.availableMemory += requiredMemory } - .launchIn(coroutineScope) - } catch (e: InsufficientMemoryOnServerException) { - logger.error("Failed to deploy VM", e) - - selectedHv.numberOfActiveServers-- - selectedHv.provisionedCores -= imageInstance.flavor.cpuCount - selectedHv.availableMemory += requiredMemory + } } catch (e: Throwable) { logger.error("Failed to deploy VM", e) } } } - private fun stateChanged(node: Node, hypervisor: SimVirtDriver) { + private fun stateChanged(node: Node, hypervisor: SimHost) { when (node.state) { NodeState.ACTIVE -> { logger.debug { "[${clock.millis()}] Server ${node.uid} available: ${node.state}" } @@ -320,7 +305,7 @@ public class SimVirtProvisioningService( hv.driver = hypervisor hv.driver.events .onEach { event -> - if (event is HypervisorEvent.VmsUpdated) { + if (event is HostEvent.VmsUpdated) { hv.numberOfActiveServers = event.numberOfActiveServers hv.availableMemory = event.availableMemory } @@ -329,12 +314,13 @@ public class SimVirtProvisioningService( maxCores = max(maxCores, node.flavor.cpuCount) maxMemory = max(maxMemory, node.flavor.memorySize) hypervisors[node] = hv + hostToHv[hypervisor] = hv availableHypervisors += hv } tracer.commit(HypervisorAvailableEvent(node.uid)) - eventFlow.emit( + _events.emit( VirtProvisioningEvent.MetricsAvailable( this@SimVirtProvisioningService, hypervisors.size, @@ -348,7 +334,7 @@ public class SimVirtProvisioningService( ) // Re-schedule on the new machine - if (incomingImages.isNotEmpty()) { + if (queue.isNotEmpty()) { requestCycle() } } @@ -359,7 +345,7 @@ public class SimVirtProvisioningService( tracer.commit(HypervisorUnavailableEvent(hv.uid)) - eventFlow.emit( + _events.emit( VirtProvisioningEvent.MetricsAvailable( this@SimVirtProvisioningService, hypervisors.size, @@ -372,7 +358,7 @@ public class SimVirtProvisioningService( ) ) - if (incomingImages.isNotEmpty()) { + if (queue.isNotEmpty()) { requestCycle() } } @@ -380,11 +366,43 @@ public class SimVirtProvisioningService( } } - public data class ImageView( - public val name: String, - public val image: Image, - public val flavor: Flavor, - public val continuation: Continuation<Server>, - public var server: Server? = null - ) + override fun onStateChange(host: Host, server: Server, newState: ServerState) { + val eventFlow = server.events as EventFlow<ServerEvent> + val newServer = server.copy(state = newState) + eventFlow.emit(ServerEvent.StateChanged(newServer, server.state)) + + if (newState == ServerState.SHUTOFF) { + logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } + + tracer.commit(VmStoppedEvent(server.name)) + + _events.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + --runningVms, + ++finishedVms, + queuedVms, + unscheduledVms + ) + ) + + activeServers -= server + val hv = hostToHv[host] + if (hv != null) { + hv.provisionedCores -= server.flavor.cpuCount + } else { + logger.error { "Unknown host $host" } + } + + // Try to reschedule if needed + if (queue.isNotEmpty()) { + requestCycle() + } + } + } + + public data class LaunchRequest(val server: Server, val cont: Continuation<Server>) } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt index 2018b9f2..3099f1ad 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt @@ -22,9 +22,9 @@ package org.opendc.compute.simulator.allocation +import org.opendc.compute.core.Server import org.opendc.compute.core.metal.Node import org.opendc.compute.simulator.HypervisorView -import org.opendc.compute.simulator.SimVirtProvisioningService /** * A policy for selecting the [Node] an image should be deployed to, @@ -39,7 +39,7 @@ public interface AllocationPolicy { */ public fun select( hypervisors: Set<HypervisorView>, - image: SimVirtProvisioningService.ImageView + server: Server ): HypervisorView? } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt index 04a181a6..df48f405 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt @@ -22,8 +22,8 @@ package org.opendc.compute.simulator.allocation +import org.opendc.compute.core.Server import org.opendc.compute.simulator.HypervisorView -import org.opendc.compute.simulator.SimVirtProvisioningService /** * The logic for an [AllocationPolicy] that uses a [Comparator] to select the appropriate node. @@ -36,12 +36,12 @@ public interface ComparableAllocationPolicyLogic : AllocationPolicy.Logic { override fun select( hypervisors: Set<HypervisorView>, - image: SimVirtProvisioningService.ImageView + server: Server ): HypervisorView? { return hypervisors.asSequence() .filter { hv -> - val fitsMemory = hv.availableMemory >= (image.flavor.memorySize) - val fitsCpu = hv.node.flavor.cpuCount >= image.flavor.cpuCount + val fitsMemory = hv.availableMemory >= (server.flavor.memorySize) + val fitsCpu = hv.node.flavor.cpuCount >= server.flavor.cpuCount fitsMemory && fitsCpu } .minWithOrNull(comparator.thenBy { it.node.uid }) diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt index 9a89fccd..03706b42 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt @@ -22,8 +22,8 @@ package org.opendc.compute.simulator.allocation +import org.opendc.compute.core.Server import org.opendc.compute.simulator.HypervisorView -import org.opendc.compute.simulator.SimVirtProvisioningService import kotlin.random.Random /** @@ -34,12 +34,12 @@ public class RandomAllocationPolicy(private val random: Random = Random(0)) : Al override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic { override fun select( hypervisors: Set<HypervisorView>, - image: SimVirtProvisioningService.ImageView + server: Server ): HypervisorView? { return hypervisors.asIterable() .filter { hv -> - val fitsMemory = hv.availableMemory >= (image.image.tags["required-memory"] as Long) - val fitsCpu = hv.node.flavor.cpuCount >= image.flavor.cpuCount + val fitsMemory = hv.availableMemory >= (server.image.tags["required-memory"] as Long) + val fitsCpu = hv.node.flavor.cpuCount >= server.flavor.cpuCount fitsMemory && fitsCpu } .randomOrNull(random) diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt index 582fe817..30065621 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt @@ -23,8 +23,8 @@ package org.opendc.compute.simulator.allocation import mu.KotlinLogging +import org.opendc.compute.core.Server import org.opendc.compute.simulator.HypervisorView -import org.opendc.compute.simulator.SimVirtProvisioningService private val logger = KotlinLogging.logger {} @@ -38,14 +38,14 @@ public class ReplayAllocationPolicy(private val vmPlacements: Map<String, String override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic { override fun select( hypervisors: Set<HypervisorView>, - image: SimVirtProvisioningService.ImageView + server: Server ): HypervisorView? { - val clusterName = vmPlacements[image.name] - ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${image.name}") + val clusterName = vmPlacements[server.name] + ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${server.name}") val machinesInCluster = hypervisors.filter { it.node.name.contains(clusterName) } if (machinesInCluster.isEmpty()) { - logger.info { "Could not find any machines belonging to cluster $clusterName for image ${image.name}, assigning randomly." } + logger.info { "Could not find any machines belonging to cluster $clusterName for image ${server.name}, assigning randomly." } return hypervisors.maxByOrNull { it.availableMemory } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 6b754572..83e891cb 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -24,6 +24,7 @@ package org.opendc.compute.simulator import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch @@ -33,8 +34,10 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.core.Flavor +import org.opendc.compute.core.Server +import org.opendc.compute.core.ServerState import org.opendc.compute.core.image.Image -import org.opendc.compute.core.virt.HypervisorEvent +import org.opendc.compute.core.virt.HostEvent import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -49,7 +52,7 @@ import java.util.UUID * Basic test-suite for the hypervisor. */ @OptIn(ExperimentalCoroutinesApi::class) -internal class SimVirtDriverTest { +internal class SimHostTest { private lateinit var scope: TestCoroutineScope private lateinit var clock: Clock private lateinit var machineModel: SimMachineModel @@ -77,7 +80,7 @@ internal class SimVirtDriverTest { var overcommittedWork = 0L scope.launch { - val virtDriver = SimVirtDriver(this, SimFairShareHypervisorProvider()) + val virtDriver = SimHost(UUID.randomUUID(), this, SimFairShareHypervisorProvider()) val vmm = Image(UUID.randomUUID(), "vmm", mapOf("workload" to virtDriver)) val duration = 5 * 60L val vmImageA = Image( @@ -122,7 +125,7 @@ internal class SimVirtDriverTest { virtDriver.events .onEach { event -> when (event) { - is HypervisorEvent.SliceFinished -> { + is HostEvent.SliceFinished -> { requestedWork += event.requestedBurst grantedWork += event.grantedBurst overcommittedWork += event.overcommissionedBurst @@ -131,8 +134,8 @@ internal class SimVirtDriverTest { } .launchIn(this) - virtDriver.spawn("a", vmImageA, flavor) - virtDriver.spawn("b", vmImageB, flavor) + launch { virtDriver.spawn(Server(UUID.randomUUID(), "a", emptyMap(), flavor, vmImageA, ServerState.BUILD, emptyFlow())) } + launch { virtDriver.spawn(Server(UUID.randomUUID(), "b", emptyMap(), flavor, vmImageB, ServerState.BUILD, emptyFlow())) } } scope.advanceUntilIdle() diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index b941d135..728d6c11 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -37,11 +37,11 @@ import org.opendc.compute.core.ServerEvent import org.opendc.compute.core.metal.NODE_CLUSTER import org.opendc.compute.core.metal.NodeEvent import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.virt.HypervisorEvent +import org.opendc.compute.core.virt.HostEvent import org.opendc.compute.core.virt.service.VirtProvisioningEvent import org.opendc.compute.core.workload.VmWorkload import org.opendc.compute.simulator.SimBareMetalDriver -import org.opendc.compute.simulator.SimVirtDriver +import org.opendc.compute.simulator.SimHost import org.opendc.compute.simulator.SimVirtProvisioningService import org.opendc.compute.simulator.allocation.AllocationPolicy import org.opendc.experiments.capelin.monitor.ExperimentMonitor @@ -174,8 +174,8 @@ public suspend fun attachMonitor( // Monitor hypervisor events for (hypervisor in hypervisors) { - // TODO Do not expose VirtDriver directly but use Hypervisor class. - val server = (hypervisor as SimVirtDriver).node + // TODO Do not expose Host directly but use Hypervisor class. + val server = (hypervisor as SimHost).node monitor.reportHostStateChange(clock.millis(), hypervisor, server) server.events .onEach { event -> @@ -190,7 +190,7 @@ public suspend fun attachMonitor( hypervisor.events .onEach { event -> when (event) { - is HypervisorEvent.SliceFinished -> monitor.reportHostSlice( + is HostEvent.SliceFinished -> monitor.reportHostSlice( clock.millis(), event.requestedBurst, event.grantedBurst, diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 04ffd148..8432025b 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -24,7 +24,7 @@ package org.opendc.experiments.capelin.monitor import org.opendc.compute.core.Server import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.virt.driver.VirtDriver +import org.opendc.compute.core.virt.Host import org.opendc.compute.core.virt.service.VirtProvisioningEvent import java.io.Closeable @@ -42,7 +42,7 @@ public interface ExperimentMonitor : Closeable { */ public fun reportHostStateChange( time: Long, - driver: VirtDriver, + driver: Host, host: Node ) { } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index e8aa5915..2af43701 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -25,7 +25,7 @@ package org.opendc.experiments.capelin.monitor import mu.KotlinLogging import org.opendc.compute.core.Server import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.virt.driver.VirtDriver +import org.opendc.compute.core.virt.Host import org.opendc.compute.core.virt.service.VirtProvisioningEvent import org.opendc.experiments.capelin.telemetry.HostEvent import org.opendc.experiments.capelin.telemetry.ProvisionerEvent @@ -64,7 +64,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: override fun reportHostStateChange( time: Long, - driver: VirtDriver, + driver: Host, host: Node ) { logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 0d6c057f..fca523cd 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -148,9 +148,9 @@ class CapelinIntegrationTest { assertAll( { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, - { assertEquals(1678587333640, monitor.totalRequestedBurst) }, - { assertEquals(438118200924, monitor.totalGrantedBurst) }, - { assertEquals(1220323969993, monitor.totalOvercommissionedBurst) }, + { assertEquals(1679510908774, monitor.totalRequestedBurst) }, + { assertEquals(384100282091, monitor.totalGrantedBurst) }, + { assertEquals(1282152242721, monitor.totalOvercommissionedBurst) }, { assertEquals(0, monitor.totalInterferedBurst) } ) } @@ -195,9 +195,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(705128393966, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(173489747029, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(710487768664, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(118846235815, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(584211294239, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) } diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt index b7a26d34..2a41be65 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt @@ -26,7 +26,7 @@ import mu.KotlinLogging import org.opendc.compute.core.Server import org.opendc.compute.core.metal.Node import org.opendc.compute.core.metal.NodeState -import org.opendc.compute.core.virt.driver.VirtDriver +import org.opendc.compute.core.virt.Host import org.opendc.compute.core.virt.service.VirtProvisioningEvent import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.telemetry.HostEvent @@ -51,7 +51,7 @@ public class WebExperimentMonitor : ExperimentMonitor { override fun reportHostStateChange( time: Long, - driver: VirtDriver, + driver: Host, host: Node ) { logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" } |
