diff options
Diffstat (limited to 'simulator')
21 files changed, 623 insertions, 480 deletions
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Host.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Host.kt new file mode 100644 index 00000000..60a31b69 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Host.kt @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.core.virt + +import kotlinx.coroutines.flow.Flow +import org.opendc.compute.core.Server +import java.util.* + +/** + * Base interface for representing compute resources that host virtualized [Server] instances. + */ +public interface Host { + /** + * A unique identifier representing the host. + */ + public val uid: UUID + + /** + * The state of the host. + */ + public val state: HostState + + /** + * The events emitted by the driver. + */ + public val events: Flow<HostEvent> + + /** + * Determine whether the specified [instance][server] can still fit on this host. + */ + public fun canFit(server: Server): Boolean + + /** + * Register the specified [instance][server] on the host. + * + * Once the method returns, the instance should be running if [start] is true or else the instance should be + * stopped. + */ + public suspend fun spawn(server: Server, start: Boolean = true) + + /** + * Determine whether the specified [instance][server] exists on the host. + */ + public operator fun contains(server: Server): Boolean + + /** + * Stat the server [instance][server] if it is currently not running on this host. + * + * @throws IllegalArgumentException if the server is not present on the host. + */ + public suspend fun start(server: Server) + + /** + * Stop the server [instance][server] if it is currently running on this host. + * + * @throws IllegalArgumentException if the server is not present on the host. + */ + public suspend fun stop(server: Server) + + /** + * Terminate the specified [instance][server] on this host and cleanup all resources associated with it. + */ + public suspend fun terminate(server: Server) + + /** + * Add a [HostListener] to this host. + */ + public fun addListener(listener: HostListener) + + /** + * Remove a [HostListener] from this host. + */ + public fun removeListener(listener: HostListener) +} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostEvent.kt index d1c8d790..a07523e8 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HypervisorEvent.kt +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostEvent.kt @@ -23,16 +23,15 @@ package org.opendc.compute.core.virt import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.virt.driver.VirtDriver /** - * An event that is emitted by a [VirtDriver]. + * An event that is emitted by a [Host]. */ -public sealed class HypervisorEvent { +public sealed class HostEvent { /** * The driver that emitted the event. */ - public abstract val driver: VirtDriver + public abstract val driver: Host /** * This event is emitted when the number of active servers on the server managed by this driver is updated. @@ -42,10 +41,10 @@ public sealed class HypervisorEvent { * @property availableMemory The available memory, in MB. */ public data class VmsUpdated( - override val driver: VirtDriver, + override val driver: Host, public val numberOfActiveServers: Int, public val availableMemory: Long - ) : HypervisorEvent() + ) : HostEvent() /** * This event is emitted when a slice is finished. @@ -63,7 +62,7 @@ public sealed class HypervisorEvent { * @property numberOfDeployedImages The number of images deployed on this hypervisor. */ public data class SliceFinished( - override val driver: VirtDriver, + override val driver: Host, public val requestedBurst: Long, public val grantedBurst: Long, public val overcommissionedBurst: Long, @@ -72,5 +71,5 @@ public sealed class HypervisorEvent { public val cpuDemand: Double, public val numberOfDeployedImages: Int, public val host: Node - ) : HypervisorEvent() + ) : HostEvent() } diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Hypervisor.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostListener.kt index 1ae52baa..b14d9bb5 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/Hypervisor.kt +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostListener.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 AtLarge Research + * Copyright (c) 2021 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,35 +22,20 @@ package org.opendc.compute.core.virt -import kotlinx.coroutines.flow.Flow -import org.opendc.core.Identity -import java.util.UUID +import org.opendc.compute.core.Server +import org.opendc.compute.core.ServerState /** - * A hypervisor (or virtual machine monitor) is software or firmware that virtualizes the host compute environment - * into several virtual guest machines. + * Listener interface for events originating from a [Host]. */ -public class Hypervisor( +public interface HostListener { /** - * The unique identifier of the hypervisor. + * This method is invoked when the state of an [instance][server] on [host] changes. */ - override val uid: UUID, + public fun onStateChange(host: Host, server: Server, newState: ServerState) {} /** - * The optional name of the hypervisor. + * This method is invoked when the state of a [Host] has changed. */ - override val name: String, - - /** - * Metadata of the hypervisor. - */ - public val metadata: Map<String, Any>, - - /** - * The events that are emitted by the hypervisor. - */ - public val events: Flow<HypervisorEvent> -) : Identity { - override fun hashCode(): Int = uid.hashCode() - override fun equals(other: Any?): Boolean = other is Hypervisor && uid == other.uid + public fun onStateChange(host: Host, newState: HostState) {} } diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostState.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostState.kt new file mode 100644 index 00000000..7f87f5f7 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/HostState.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.core.virt + +/** + * The state of a host. + */ +public enum class HostState { + /** + * The host is up. + */ + UP, + + /** + * The host is down. + */ + DOWN +} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/InsufficientMemoryOnServerException.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/InsufficientMemoryOnServerException.kt index 6fe84ea6..0f7b5826 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/InsufficientMemoryOnServerException.kt +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/InsufficientMemoryOnServerException.kt @@ -1,3 +1,3 @@ -package org.opendc.compute.core.virt.driver +package org.opendc.compute.core.virt public class InsufficientMemoryOnServerException : IllegalStateException("Insufficient memory left on server.") diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt deleted file mode 100644 index 68cc7b50..00000000 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.core.virt.driver - -import kotlinx.coroutines.flow.Flow -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.Server -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.virt.HypervisorEvent -import org.opendc.core.services.AbstractServiceKey -import java.util.UUID - -/** - * A driver interface for a hypervisor running on some host server and communicating with the central compute service to - * provide virtualization for that particular resource. - */ -public interface VirtDriver { - /** - * The events emitted by the driver. - */ - public val events: Flow<HypervisorEvent> - - /** - * Determine whether the specified [flavor] can still fit on this driver. - */ - public fun canFit(flavor: Flavor): Boolean - - /** - * Spawn the given [Image] on the compute resource of this driver. - * - * @param name The name of the server to spawn. - * @param image The image to deploy. - * @param flavor The flavor of the server which this driver is controlling. - * @return The virtual server spawned by this method. - */ - public suspend fun spawn( - name: String, - image: Image, - flavor: Flavor - ): Server - - public companion object Key : AbstractServiceKey<VirtDriver>(UUID.randomUUID(), "virtual-driver") -} diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt index b967044c..8da849af 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/service/VirtProvisioningService.kt @@ -26,7 +26,7 @@ import kotlinx.coroutines.flow.Flow import org.opendc.compute.core.Flavor import org.opendc.compute.core.Server import org.opendc.compute.core.image.Image -import org.opendc.compute.core.virt.driver.VirtDriver +import org.opendc.compute.core.virt.Host /** * A service for VM provisioning on a cloud. @@ -40,7 +40,7 @@ public interface VirtProvisioningService { /** * Obtain the active hypervisors for this provisioner. */ - public suspend fun drivers(): Set<VirtDriver> + public suspend fun drivers(): Set<Host> /** * The number of hosts available in the system. diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt index cf2747cd..a650144b 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/HypervisorView.kt @@ -23,7 +23,7 @@ package org.opendc.compute.simulator import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.virt.driver.VirtDriver +import org.opendc.compute.core.virt.Host import java.util.UUID public class HypervisorView( @@ -33,5 +33,5 @@ public class HypervisorView( public var availableMemory: Long, public var provisionedCores: Int ) { - public lateinit var driver: VirtDriver + public lateinit var driver: Host } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt new file mode 100644 index 00000000..85fa2cb6 --- /dev/null +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -0,0 +1,300 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.Flow +import mu.KotlinLogging +import org.opendc.compute.core.* +import org.opendc.compute.core.Flavor +import org.opendc.compute.core.metal.Node +import org.opendc.compute.core.virt.* +import org.opendc.simulator.compute.* +import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.workload.SimResourceCommand +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.utils.flow.EventFlow +import java.util.* +import kotlin.coroutines.resume + +/** + * A [Host] that is simulates virtual machines on a physical machine using [SimHypervisor]. + */ +public class SimHost( + override val uid: UUID, + private val coroutineScope: CoroutineScope, + hypervisor: SimHypervisorProvider +) : Host, SimWorkload { + /** + * The logger instance of this server. + */ + private val logger = KotlinLogging.logger {} + + /** + * The execution context in which the [Host] runs. + */ + private lateinit var ctx: SimExecutionContext + + override val events: Flow<HostEvent> + get() = _events + internal val _events = EventFlow<HostEvent>() + + /** + * The event listeners registered with this host. + */ + private val listeners = mutableListOf<HostListener>() + + /** + * Current total memory use of the images on this hypervisor. + */ + private var availableMemory: Long = 0 + + /** + * The hypervisor to run multiple workloads. + */ + private val hypervisor = hypervisor.create( + object : SimHypervisor.Listener { + override fun onSliceFinish( + hypervisor: SimHypervisor, + requestedWork: Long, + grantedWork: Long, + overcommittedWork: Long, + interferedWork: Long, + cpuUsage: Double, + cpuDemand: Double + ) { + _events.emit( + HostEvent.SliceFinished( + this@SimHost, + requestedWork, + grantedWork, + overcommittedWork, + interferedWork, + cpuUsage, + cpuDemand, + guests.size, + node + ) + ) + } + } + ) + + /** + * The virtual machines running on the hypervisor. + */ + private val guests = HashMap<Server, SimGuest>() + + /** + * The node on which the hypervisor runs. + */ + public val node: Node + get() = ctx.meta["node"] as Node + + override val state: HostState + get() = _state + private var _state: HostState = HostState.UP + set(value) { + listeners.forEach { it.onStateChange(this, value) } + field = value + } + + override fun canFit(server: Server): Boolean { + val sufficientMemory = availableMemory > server.flavor.memorySize + val enoughCpus = ctx.machine.cpus.size >= server.flavor.cpuCount + val canFit = hypervisor.canFit(server.flavor.toMachineModel()) + + return sufficientMemory && enoughCpus && canFit + } + + override suspend fun spawn(server: Server, start: Boolean) { + // Return if the server already exists on this host + if (server in this) { + return + } + + require(canFit(server)) { "Server does not fit" } + val guest = SimGuest(server, hypervisor.createMachine(server.flavor.toMachineModel())) + guests[server] = guest + + if (start) { + guest.start() + } + + _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory)) + } + + override fun contains(server: Server): Boolean { + return server in guests + } + + override suspend fun start(server: Server) { + val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } + guest.start() + } + + override suspend fun stop(server: Server) { + val guest = requireNotNull(guests[server]) { "Unknown server ${server.uid} at host $uid" } + guest.stop() + } + + override suspend fun terminate(server: Server) { + val guest = guests.remove(server) ?: return + guest.terminate() + } + + override fun addListener(listener: HostListener) { + listeners.add(listener) + } + + override fun removeListener(listener: HostListener) { + listeners.remove(listener) + } + + /** + * Convert flavor to machine model. + */ + private fun Flavor.toMachineModel(): SimMachineModel { + val originalCpu = ctx.machine.cpus[0] + val processingNode = originalCpu.node.copy(coreCount = cpuCount) + val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } + val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) + + return SimMachineModel(processingUnits, memoryUnits) + } + + private fun onGuestStart(vm: SimGuest) { + guests.forEach { _, guest -> + if (guest.state == ServerState.ACTIVE) { + vm.performanceInterferenceModel?.onStart(vm.server.image.name) + } + } + + listeners.forEach { it.onStateChange(this, vm.server, vm.state) } + } + + private fun onGuestStop(vm: SimGuest) { + guests.forEach { _, guest -> + if (guest.state == ServerState.ACTIVE) { + vm.performanceInterferenceModel?.onStop(vm.server.image.name) + } + } + + listeners.forEach { it.onStateChange(this, vm.server, vm.state) } + + _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.ACTIVE }, availableMemory)) + } + + /** + * A virtual machine instance that the driver manages. + */ + private inner class SimGuest(val server: Server, val machine: SimMachine) { + val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + + var state: ServerState = ServerState.SHUTOFF + + suspend fun start() { + when (state) { + ServerState.SHUTOFF -> { + logger.info { "User requested to start server ${server.uid}" } + launch() + } + ServerState.ACTIVE -> return + else -> assert(false) { "Invalid state transition" } + } + } + + suspend fun stop() { + when (state) { + ServerState.ACTIVE, ServerState.ERROR -> { + val job = job ?: throw IllegalStateException("Server should be active") + job.cancel() + job.join() + } + ServerState.SHUTOFF -> return + else -> assert(false) { "Invalid state transition" } + } + } + + suspend fun terminate() { + stop() + } + + private var job: Job? = null + + private suspend fun launch() = suspendCancellableCoroutine<Unit> { cont -> + assert(job == null) { "Concurrent job running" } + val workload = server.image.tags["workload"] as SimWorkload + + val job = coroutineScope.launch { + delay(1) // TODO Introduce boot time + init() + cont.resume(Unit) + try { + machine.run(workload, mapOf("driver" to this@SimHost, "server" to server)) + exit(null) + } catch (cause: Throwable) { + exit(cause) + } finally { + machine.close() + } + } + this.job = job + job.invokeOnCompletion { + this.job = null + } + } + + private fun init() { + state = ServerState.ACTIVE + onGuestStart(this) + } + + private fun exit(cause: Throwable?) { + state = + if (cause == null) + ServerState.SHUTOFF + else + ServerState.ERROR + + availableMemory += server.flavor.memorySize + onGuestStop(this) + } + } + + override fun onStart(ctx: SimExecutionContext) { + this.ctx = ctx + this.availableMemory = ctx.machine.memory.map { it.size }.sum() + this.hypervisor.onStart(ctx) + } + + override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { + return hypervisor.onStart(ctx, cpu) + } + + override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { + return hypervisor.onNext(ctx, cpu, remainingWork) + } +} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt deleted file mode 100644 index 35d82211..00000000 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.simulator - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.launch -import org.opendc.compute.core.* -import org.opendc.compute.core.Flavor -import org.opendc.compute.core.image.Image -import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.virt.HypervisorEvent -import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException -import org.opendc.compute.core.virt.driver.VirtDriver -import org.opendc.simulator.compute.* -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.workload.SimResourceCommand -import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.utils.flow.EventFlow -import java.util.* - -/** - * A [VirtDriver] that is simulates virtual machines on a physical machine using [SimHypervisor]. - */ -public class SimVirtDriver( - private val coroutineScope: CoroutineScope, - hypervisor: SimHypervisorProvider -) : VirtDriver, SimWorkload { - /** - * The execution context in which the [VirtDriver] runs. - */ - private lateinit var ctx: SimExecutionContext - - /** - * The node on which the hypervisor runs. - */ - public val node: Node - get() = ctx.meta["node"] as Node - - /** - * The [EventFlow] to emit the events. - */ - internal val eventFlow = EventFlow<HypervisorEvent>() - - override val events: Flow<HypervisorEvent> = eventFlow - - /** - * Current total memory use of the images on this hypervisor. - */ - private var availableMemory: Long = 0 - - /** - * The hypervisor to run multiple workloads. - */ - private val hypervisor = hypervisor.create( - object : SimHypervisor.Listener { - override fun onSliceFinish( - hypervisor: SimHypervisor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, - cpuUsage: Double, - cpuDemand: Double - ) { - eventFlow.emit( - HypervisorEvent.SliceFinished( - this@SimVirtDriver, - requestedWork, - grantedWork, - overcommittedWork, - interferedWork, - cpuUsage, - cpuDemand, - vms.size, - node - ) - ) - } - } - ) - - /** - * The virtual machines running on the hypervisor. - */ - private val vms = HashSet<VirtualMachine>() - - override fun canFit(flavor: Flavor): Boolean { - val sufficientMemory = availableMemory > flavor.memorySize - val enoughCpus = ctx.machine.cpus.size >= flavor.cpuCount - val canFit = hypervisor.canFit(flavor.toMachineModel()) - - return sufficientMemory && enoughCpus && canFit - } - - override suspend fun spawn(name: String, image: Image, flavor: Flavor): Server { - val requiredMemory = flavor.memorySize - if (availableMemory - requiredMemory < 0) { - throw InsufficientMemoryOnServerException() - } - require(flavor.cpuCount <= ctx.machine.cpus.size) { "Machine does not fit" } - - val events = EventFlow<ServerEvent>() - val server = Server( - UUID.randomUUID(), - name, - emptyMap(), - flavor, - image, - ServerState.BUILD, - events - ) - availableMemory -= requiredMemory - - val vm = VirtualMachine(server, events, hypervisor.createMachine(flavor.toMachineModel())) - vms.add(vm) - vmStarted(vm) - eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) - return server - } - - /** - * Convert flavor to machine model. - */ - private fun Flavor.toMachineModel(): SimMachineModel { - val originalCpu = ctx.machine.cpus[0] - val processingNode = originalCpu.node.copy(coreCount = cpuCount) - val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } - val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) - - return SimMachineModel(processingUnits, memoryUnits) - } - - private fun vmStarted(vm: VirtualMachine) { - vms.forEach { - vm.performanceInterferenceModel?.onStart(it.server.image.name) - } - } - - private fun vmStopped(vm: VirtualMachine) { - vms.forEach { - vm.performanceInterferenceModel?.onStop(it.server.image.name) - } - } - - /** - * A virtual machine instance that the driver manages. - */ - private inner class VirtualMachine(server: Server, val events: EventFlow<ServerEvent>, val machine: SimMachine) { - val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - - val job = coroutineScope.launch { - val workload = server.image.tags["workload"] as SimWorkload - - delay(1) // TODO Introduce boot time - init() - try { - machine.run(workload, mapOf("driver" to this@SimVirtDriver, "server" to server)) - exit(null) - } catch (cause: Throwable) { - exit(cause) - } finally { - machine.close() - } - } - - var server: Server = server - set(value) { - if (field.state != value.state) { - events.emit(ServerEvent.StateChanged(value, field.state)) - } - - field = value - } - - private fun init() { - server = server.copy(state = ServerState.ACTIVE) - } - - private fun exit(cause: Throwable?) { - val serverState = - if (cause == null) - ServerState.SHUTOFF - else - ServerState.ERROR - server = server.copy(state = serverState) - availableMemory += server.flavor.memorySize - vms.remove(this) - vmStopped(this) - eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimVirtDriver, vms.size, availableMemory)) - events.close() - } - } - - override fun onStart(ctx: SimExecutionContext) { - this.ctx = ctx - this.availableMemory = ctx.machine.memory.map { it.size }.sum() - this.hypervisor.onStart(ctx) - } - - override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { - return hypervisor.onStart(ctx, cpu) - } - - override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { - return hypervisor.onNext(ctx, cpu, remainingWork) - } -} diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt index 18afd0c2..ee747a9a 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt @@ -36,9 +36,7 @@ import org.opendc.compute.core.metal.Node import org.opendc.compute.core.metal.NodeEvent import org.opendc.compute.core.metal.NodeState import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.virt.HypervisorEvent -import org.opendc.compute.core.virt.driver.InsufficientMemoryOnServerException -import org.opendc.compute.core.virt.driver.VirtDriver +import org.opendc.compute.core.virt.* import org.opendc.compute.core.virt.service.VirtProvisioningEvent import org.opendc.compute.core.virt.service.VirtProvisioningService import org.opendc.compute.core.virt.service.events.* @@ -62,13 +60,18 @@ public class SimVirtProvisioningService( private val tracer: EventTracer, private val hypervisor: SimHypervisorProvider, private val schedulingQuantum: Long = 300000, // 5 minutes in milliseconds -) : VirtProvisioningService { +) : VirtProvisioningService, HostListener { /** * The logger instance to use. */ private val logger = KotlinLogging.logger {} /** + * A mapping from host to hypervisor view. + */ + private val hostToHv = mutableMapOf<Host, HypervisorView>() + + /** * The hypervisors that have been launched by the service. */ private val hypervisors: MutableMap<Node, HypervisorView> = mutableMapOf() @@ -79,14 +82,19 @@ public class SimVirtProvisioningService( private val availableHypervisors: MutableSet<HypervisorView> = mutableSetOf() /** - * The incoming images to be processed by the provisioner. + * The servers that should be launched by the service. + */ + private val queue: Deque<LaunchRequest> = ArrayDeque() + + /** + * The active servers in the system. */ - private val incomingImages: Deque<ImageView> = ArrayDeque() + private val activeServers: MutableSet<Server> = mutableSetOf() /** - * The active images in the system. + * The [Random] instance used to generate unique identifiers for the objects. */ - private val activeImages: MutableSet<ImageView> = mutableSetOf() + private val random = Random(0) public var submittedVms: Int = 0 public var queuedVms: Int = 0 @@ -102,12 +110,9 @@ public class SimVirtProvisioningService( */ private val allocationLogic = allocationPolicy() - /** - * The [EventFlow] to emit the events. - */ - internal val eventFlow = EventFlow<VirtProvisioningEvent>() - - override val events: Flow<VirtProvisioningEvent> = eventFlow + override val events: Flow<VirtProvisioningEvent> + get() = _events + private val _events = EventFlow<VirtProvisioningEvent>() /** * The [TimerScheduler] to use for scheduling the scheduler cycles. @@ -118,7 +123,8 @@ public class SimVirtProvisioningService( coroutineScope.launch { val provisionedNodes = provisioningService.nodes() provisionedNodes.forEach { node -> - val workload = SimVirtDriver(coroutineScope, hypervisor) + val workload = SimHost(UUID(random.nextLong(), random.nextLong()), coroutineScope, hypervisor) + workload.addListener(this@SimVirtProvisioningService) val hypervisorImage = Image(UUID.randomUUID(), "vmm", mapOf("workload" to workload)) launch { val deployedNode = provisioningService.deploy(node, hypervisorImage) @@ -132,7 +138,7 @@ public class SimVirtProvisioningService( } } - override suspend fun drivers(): Set<VirtDriver> { + override suspend fun drivers(): Set<Host> { return availableHypervisors.map { it.driver }.toSet() } @@ -145,7 +151,7 @@ public class SimVirtProvisioningService( ): Server { tracer.commit(VmSubmissionEvent(name, image, flavor)) - eventFlow.emit( + _events.emit( VirtProvisioningEvent.MetricsAvailable( this@SimVirtProvisioningService, hypervisors.size, @@ -158,9 +164,9 @@ public class SimVirtProvisioningService( ) ) - return suspendCancellableCoroutine<Server> { cont -> - val vmInstance = ImageView(name, image, flavor, cont) - incomingImages += vmInstance + return suspendCancellableCoroutine { cont -> + val request = LaunchRequest(createServer(name, image, flavor), cont) + queue += request requestCycle() } } @@ -170,6 +176,22 @@ public class SimVirtProvisioningService( provisionedNodes.forEach { node -> provisioningService.stop(node) } } + private fun createServer( + name: String, + image: Image, + flavor: Flavor + ): Server { + return Server( + uid = UUID(random.nextLong(), random.nextLong()), + name = name, + tags = emptyMap(), + flavor = flavor, + image = image, + state = ServerState.BUILD, + events = EventFlow() + ) + } + private fun requestCycle() { // Bail out in case we have already requested a new cycle. if (scheduler.isTimerActive(Unit)) { @@ -182,23 +204,23 @@ public class SimVirtProvisioningService( val delay = schedulingQuantum - (clock.millis() % schedulingQuantum) scheduler.startSingleTimer(Unit, delay) { - coroutineScope.launch { schedule() } + schedule() } } - private suspend fun schedule() { - while (incomingImages.isNotEmpty()) { - val imageInstance = incomingImages.peekFirst() - val requiredMemory = imageInstance.flavor.memorySize - val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) + private fun schedule() { + while (queue.isNotEmpty()) { + val (server, cont) = queue.peekFirst() + val requiredMemory = server.flavor.memorySize + val selectedHv = allocationLogic.select(availableHypervisors, server) - if (selectedHv == null || !selectedHv.driver.canFit(imageInstance.flavor)) { - logger.trace { "Image ${imageInstance.image} selected for scheduling but no capacity available for it." } + if (selectedHv == null || !selectedHv.driver.canFit(server)) { + logger.trace { "Server $server selected for scheduling but no capacity available for it." } - if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { - tracer.commit(VmSubmissionInvalidEvent(imageInstance.name)) + if (requiredMemory > maxMemory || server.flavor.cpuCount > maxCores) { + tracer.commit(VmSubmissionInvalidEvent(server.name)) - eventFlow.emit( + _events.emit( VirtProvisioningEvent.MetricsAvailable( this@SimVirtProvisioningService, hypervisors.size, @@ -212,9 +234,9 @@ public class SimVirtProvisioningService( ) // Remove the incoming image - incomingImages.poll() + queue.poll() - logger.warn("Failed to spawn ${imageInstance.image}: does not fit [${clock.millis()}]") + logger.warn("Failed to spawn $server: does not fit [${clock.millis()}]") continue } else { break @@ -222,86 +244,49 @@ public class SimVirtProvisioningService( } try { - logger.info { "[${clock.millis()}] Spawning ${imageInstance.image} on ${selectedHv.node.uid} ${selectedHv.node.name} ${selectedHv.node.flavor}" } - incomingImages.poll() + logger.info { "[${clock.millis()}] Spawning $server on ${selectedHv.node.uid} ${selectedHv.node.name} ${selectedHv.node.flavor}" } + queue.poll() // Speculatively update the hypervisor view information to prevent other images in the queue from // deciding on stale values. selectedHv.numberOfActiveServers++ - selectedHv.provisionedCores += imageInstance.flavor.cpuCount + selectedHv.provisionedCores += server.flavor.cpuCount selectedHv.availableMemory -= requiredMemory // XXX Temporary hack - val server = selectedHv.driver.spawn( - imageInstance.name, - imageInstance.image, - imageInstance.flavor - ) - imageInstance.server = server - imageInstance.continuation.resume(server) - - tracer.commit(VmScheduledEvent(imageInstance.name)) - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - ++runningVms, - finishedVms, - --queuedVms, - unscheduledVms - ) - ) - activeImages += imageInstance + coroutineScope.launch { + try { + cont.resume(server) + selectedHv.driver.spawn(server) + activeServers += server + + tracer.commit(VmScheduledEvent(server.name)) + _events.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + ++runningVms, + finishedVms, + --queuedVms, + unscheduledVms + ) + ) + } catch (e: InsufficientMemoryOnServerException) { + logger.error("Failed to deploy VM", e) - server.events - .onEach { event -> - when (event) { - is ServerEvent.StateChanged -> { - if (event.server.state == ServerState.SHUTOFF) { - logger.info { "[${clock.millis()}] Server ${event.server.uid} ${event.server.name} ${event.server.flavor} finished." } - - tracer.commit(VmStoppedEvent(event.server.name)) - - eventFlow.emit( - VirtProvisioningEvent.MetricsAvailable( - this@SimVirtProvisioningService, - hypervisors.size, - availableHypervisors.size, - submittedVms, - --runningVms, - ++finishedVms, - queuedVms, - unscheduledVms - ) - ) - - activeImages -= imageInstance - selectedHv.provisionedCores -= server.flavor.cpuCount - - // Try to reschedule if needed - if (incomingImages.isNotEmpty()) { - requestCycle() - } - } - } - } + selectedHv.numberOfActiveServers-- + selectedHv.provisionedCores -= server.flavor.cpuCount + selectedHv.availableMemory += requiredMemory } - .launchIn(coroutineScope) - } catch (e: InsufficientMemoryOnServerException) { - logger.error("Failed to deploy VM", e) - - selectedHv.numberOfActiveServers-- - selectedHv.provisionedCores -= imageInstance.flavor.cpuCount - selectedHv.availableMemory += requiredMemory + } } catch (e: Throwable) { logger.error("Failed to deploy VM", e) } } } - private fun stateChanged(node: Node, hypervisor: SimVirtDriver) { + private fun stateChanged(node: Node, hypervisor: SimHost) { when (node.state) { NodeState.ACTIVE -> { logger.debug { "[${clock.millis()}] Server ${node.uid} available: ${node.state}" } @@ -320,7 +305,7 @@ public class SimVirtProvisioningService( hv.driver = hypervisor hv.driver.events .onEach { event -> - if (event is HypervisorEvent.VmsUpdated) { + if (event is HostEvent.VmsUpdated) { hv.numberOfActiveServers = event.numberOfActiveServers hv.availableMemory = event.availableMemory } @@ -329,12 +314,13 @@ public class SimVirtProvisioningService( maxCores = max(maxCores, node.flavor.cpuCount) maxMemory = max(maxMemory, node.flavor.memorySize) hypervisors[node] = hv + hostToHv[hypervisor] = hv availableHypervisors += hv } tracer.commit(HypervisorAvailableEvent(node.uid)) - eventFlow.emit( + _events.emit( VirtProvisioningEvent.MetricsAvailable( this@SimVirtProvisioningService, hypervisors.size, @@ -348,7 +334,7 @@ public class SimVirtProvisioningService( ) // Re-schedule on the new machine - if (incomingImages.isNotEmpty()) { + if (queue.isNotEmpty()) { requestCycle() } } @@ -359,7 +345,7 @@ public class SimVirtProvisioningService( tracer.commit(HypervisorUnavailableEvent(hv.uid)) - eventFlow.emit( + _events.emit( VirtProvisioningEvent.MetricsAvailable( this@SimVirtProvisioningService, hypervisors.size, @@ -372,7 +358,7 @@ public class SimVirtProvisioningService( ) ) - if (incomingImages.isNotEmpty()) { + if (queue.isNotEmpty()) { requestCycle() } } @@ -380,11 +366,43 @@ public class SimVirtProvisioningService( } } - public data class ImageView( - public val name: String, - public val image: Image, - public val flavor: Flavor, - public val continuation: Continuation<Server>, - public var server: Server? = null - ) + override fun onStateChange(host: Host, server: Server, newState: ServerState) { + val eventFlow = server.events as EventFlow<ServerEvent> + val newServer = server.copy(state = newState) + eventFlow.emit(ServerEvent.StateChanged(newServer, server.state)) + + if (newState == ServerState.SHUTOFF) { + logger.info { "[${clock.millis()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } + + tracer.commit(VmStoppedEvent(server.name)) + + _events.emit( + VirtProvisioningEvent.MetricsAvailable( + this@SimVirtProvisioningService, + hypervisors.size, + availableHypervisors.size, + submittedVms, + --runningVms, + ++finishedVms, + queuedVms, + unscheduledVms + ) + ) + + activeServers -= server + val hv = hostToHv[host] + if (hv != null) { + hv.provisionedCores -= server.flavor.cpuCount + } else { + logger.error { "Unknown host $host" } + } + + // Try to reschedule if needed + if (queue.isNotEmpty()) { + requestCycle() + } + } + } + + public data class LaunchRequest(val server: Server, val cont: Continuation<Server>) } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt index 2018b9f2..3099f1ad 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/AllocationPolicy.kt @@ -22,9 +22,9 @@ package org.opendc.compute.simulator.allocation +import org.opendc.compute.core.Server import org.opendc.compute.core.metal.Node import org.opendc.compute.simulator.HypervisorView -import org.opendc.compute.simulator.SimVirtProvisioningService /** * A policy for selecting the [Node] an image should be deployed to, @@ -39,7 +39,7 @@ public interface AllocationPolicy { */ public fun select( hypervisors: Set<HypervisorView>, - image: SimVirtProvisioningService.ImageView + server: Server ): HypervisorView? } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt index 04a181a6..df48f405 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ComparableAllocationPolicyLogic.kt @@ -22,8 +22,8 @@ package org.opendc.compute.simulator.allocation +import org.opendc.compute.core.Server import org.opendc.compute.simulator.HypervisorView -import org.opendc.compute.simulator.SimVirtProvisioningService /** * The logic for an [AllocationPolicy] that uses a [Comparator] to select the appropriate node. @@ -36,12 +36,12 @@ public interface ComparableAllocationPolicyLogic : AllocationPolicy.Logic { override fun select( hypervisors: Set<HypervisorView>, - image: SimVirtProvisioningService.ImageView + server: Server ): HypervisorView? { return hypervisors.asSequence() .filter { hv -> - val fitsMemory = hv.availableMemory >= (image.flavor.memorySize) - val fitsCpu = hv.node.flavor.cpuCount >= image.flavor.cpuCount + val fitsMemory = hv.availableMemory >= (server.flavor.memorySize) + val fitsCpu = hv.node.flavor.cpuCount >= server.flavor.cpuCount fitsMemory && fitsCpu } .minWithOrNull(comparator.thenBy { it.node.uid }) diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt index 9a89fccd..03706b42 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/RandomAllocationPolicy.kt @@ -22,8 +22,8 @@ package org.opendc.compute.simulator.allocation +import org.opendc.compute.core.Server import org.opendc.compute.simulator.HypervisorView -import org.opendc.compute.simulator.SimVirtProvisioningService import kotlin.random.Random /** @@ -34,12 +34,12 @@ public class RandomAllocationPolicy(private val random: Random = Random(0)) : Al override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic { override fun select( hypervisors: Set<HypervisorView>, - image: SimVirtProvisioningService.ImageView + server: Server ): HypervisorView? { return hypervisors.asIterable() .filter { hv -> - val fitsMemory = hv.availableMemory >= (image.image.tags["required-memory"] as Long) - val fitsCpu = hv.node.flavor.cpuCount >= image.flavor.cpuCount + val fitsMemory = hv.availableMemory >= (server.image.tags["required-memory"] as Long) + val fitsCpu = hv.node.flavor.cpuCount >= server.flavor.cpuCount fitsMemory && fitsCpu } .randomOrNull(random) diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt index 582fe817..30065621 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/allocation/ReplayAllocationPolicy.kt @@ -23,8 +23,8 @@ package org.opendc.compute.simulator.allocation import mu.KotlinLogging +import org.opendc.compute.core.Server import org.opendc.compute.simulator.HypervisorView -import org.opendc.compute.simulator.SimVirtProvisioningService private val logger = KotlinLogging.logger {} @@ -38,14 +38,14 @@ public class ReplayAllocationPolicy(private val vmPlacements: Map<String, String override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic { override fun select( hypervisors: Set<HypervisorView>, - image: SimVirtProvisioningService.ImageView + server: Server ): HypervisorView? { - val clusterName = vmPlacements[image.name] - ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${image.name}") + val clusterName = vmPlacements[server.name] + ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${server.name}") val machinesInCluster = hypervisors.filter { it.node.name.contains(clusterName) } if (machinesInCluster.isEmpty()) { - logger.info { "Could not find any machines belonging to cluster $clusterName for image ${image.name}, assigning randomly." } + logger.info { "Could not find any machines belonging to cluster $clusterName for image ${server.name}, assigning randomly." } return hypervisors.maxByOrNull { it.availableMemory } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 6b754572..83e891cb 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimVirtDriverTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -24,6 +24,7 @@ package org.opendc.compute.simulator import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch @@ -33,8 +34,10 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.core.Flavor +import org.opendc.compute.core.Server +import org.opendc.compute.core.ServerState import org.opendc.compute.core.image.Image -import org.opendc.compute.core.virt.HypervisorEvent +import org.opendc.compute.core.virt.HostEvent import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -49,7 +52,7 @@ import java.util.UUID * Basic test-suite for the hypervisor. */ @OptIn(ExperimentalCoroutinesApi::class) -internal class SimVirtDriverTest { +internal class SimHostTest { private lateinit var scope: TestCoroutineScope private lateinit var clock: Clock private lateinit var machineModel: SimMachineModel @@ -77,7 +80,7 @@ internal class SimVirtDriverTest { var overcommittedWork = 0L scope.launch { - val virtDriver = SimVirtDriver(this, SimFairShareHypervisorProvider()) + val virtDriver = SimHost(UUID.randomUUID(), this, SimFairShareHypervisorProvider()) val vmm = Image(UUID.randomUUID(), "vmm", mapOf("workload" to virtDriver)) val duration = 5 * 60L val vmImageA = Image( @@ -122,7 +125,7 @@ internal class SimVirtDriverTest { virtDriver.events .onEach { event -> when (event) { - is HypervisorEvent.SliceFinished -> { + is HostEvent.SliceFinished -> { requestedWork += event.requestedBurst grantedWork += event.grantedBurst overcommittedWork += event.overcommissionedBurst @@ -131,8 +134,8 @@ internal class SimVirtDriverTest { } .launchIn(this) - virtDriver.spawn("a", vmImageA, flavor) - virtDriver.spawn("b", vmImageB, flavor) + launch { virtDriver.spawn(Server(UUID.randomUUID(), "a", emptyMap(), flavor, vmImageA, ServerState.BUILD, emptyFlow())) } + launch { virtDriver.spawn(Server(UUID.randomUUID(), "b", emptyMap(), flavor, vmImageB, ServerState.BUILD, emptyFlow())) } } scope.advanceUntilIdle() diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index b941d135..728d6c11 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -37,11 +37,11 @@ import org.opendc.compute.core.ServerEvent import org.opendc.compute.core.metal.NODE_CLUSTER import org.opendc.compute.core.metal.NodeEvent import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.compute.core.virt.HypervisorEvent +import org.opendc.compute.core.virt.HostEvent import org.opendc.compute.core.virt.service.VirtProvisioningEvent import org.opendc.compute.core.workload.VmWorkload import org.opendc.compute.simulator.SimBareMetalDriver -import org.opendc.compute.simulator.SimVirtDriver +import org.opendc.compute.simulator.SimHost import org.opendc.compute.simulator.SimVirtProvisioningService import org.opendc.compute.simulator.allocation.AllocationPolicy import org.opendc.experiments.capelin.monitor.ExperimentMonitor @@ -174,8 +174,8 @@ public suspend fun attachMonitor( // Monitor hypervisor events for (hypervisor in hypervisors) { - // TODO Do not expose VirtDriver directly but use Hypervisor class. - val server = (hypervisor as SimVirtDriver).node + // TODO Do not expose Host directly but use Hypervisor class. + val server = (hypervisor as SimHost).node monitor.reportHostStateChange(clock.millis(), hypervisor, server) server.events .onEach { event -> @@ -190,7 +190,7 @@ public suspend fun attachMonitor( hypervisor.events .onEach { event -> when (event) { - is HypervisorEvent.SliceFinished -> monitor.reportHostSlice( + is HostEvent.SliceFinished -> monitor.reportHostSlice( clock.millis(), event.requestedBurst, event.grantedBurst, diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 04ffd148..8432025b 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -24,7 +24,7 @@ package org.opendc.experiments.capelin.monitor import org.opendc.compute.core.Server import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.virt.driver.VirtDriver +import org.opendc.compute.core.virt.Host import org.opendc.compute.core.virt.service.VirtProvisioningEvent import java.io.Closeable @@ -42,7 +42,7 @@ public interface ExperimentMonitor : Closeable { */ public fun reportHostStateChange( time: Long, - driver: VirtDriver, + driver: Host, host: Node ) { } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index e8aa5915..2af43701 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -25,7 +25,7 @@ package org.opendc.experiments.capelin.monitor import mu.KotlinLogging import org.opendc.compute.core.Server import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.virt.driver.VirtDriver +import org.opendc.compute.core.virt.Host import org.opendc.compute.core.virt.service.VirtProvisioningEvent import org.opendc.experiments.capelin.telemetry.HostEvent import org.opendc.experiments.capelin.telemetry.ProvisionerEvent @@ -64,7 +64,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: override fun reportHostStateChange( time: Long, - driver: VirtDriver, + driver: Host, host: Node ) { logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 0d6c057f..fca523cd 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -148,9 +148,9 @@ class CapelinIntegrationTest { assertAll( { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") }, { assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") }, - { assertEquals(1678587333640, monitor.totalRequestedBurst) }, - { assertEquals(438118200924, monitor.totalGrantedBurst) }, - { assertEquals(1220323969993, monitor.totalOvercommissionedBurst) }, + { assertEquals(1679510908774, monitor.totalRequestedBurst) }, + { assertEquals(384100282091, monitor.totalGrantedBurst) }, + { assertEquals(1282152242721, monitor.totalOvercommissionedBurst) }, { assertEquals(0, monitor.totalInterferedBurst) } ) } @@ -195,9 +195,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(705128393966, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(173489747029, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(526858997740, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(710487768664, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(118846235815, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(584211294239, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) } diff --git a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt index b7a26d34..2a41be65 100644 --- a/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt +++ b/simulator/opendc-runner-web/src/main/kotlin/org/opendc/runner/web/WebExperimentMonitor.kt @@ -26,7 +26,7 @@ import mu.KotlinLogging import org.opendc.compute.core.Server import org.opendc.compute.core.metal.Node import org.opendc.compute.core.metal.NodeState -import org.opendc.compute.core.virt.driver.VirtDriver +import org.opendc.compute.core.virt.Host import org.opendc.compute.core.virt.service.VirtProvisioningEvent import org.opendc.experiments.capelin.monitor.ExperimentMonitor import org.opendc.experiments.capelin.telemetry.HostEvent @@ -51,7 +51,7 @@ public class WebExperimentMonitor : ExperimentMonitor { override fun reportHostStateChange( time: Long, - driver: VirtDriver, + driver: Host, host: Node ) { logger.debug { "Host ${host.uid} changed state ${host.state} [$time]" } |
