diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-01-11 15:37:34 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-01-11 16:12:12 +0100 |
| commit | f2028b23e25c8520f25a53771a1b261c4e991bb8 (patch) | |
| tree | 1b03ee8a72e498f107c19788553fb24892ff24cd | |
| parent | a71d4885efcf01850bc236d3e9f77ab3f44b48aa (diff) | |
Add hypervisor supporting space-shared VMs
This change adds a new hypervisor implementation that supports virtual
machine that have exclusive access to resources (e.g., CPU).
5 files changed, 494 insertions, 9 deletions
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt index 5ecfd357..68cc7b50 100644 --- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt +++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt @@ -23,6 +23,7 @@ package org.opendc.compute.core.virt.driver import kotlinx.coroutines.flow.Flow +import org.opendc.compute.core.Flavor import org.opendc.compute.core.Server import org.opendc.compute.core.image.Image import org.opendc.compute.core.virt.HypervisorEvent @@ -40,6 +41,11 @@ public interface VirtDriver { public val events: Flow<HypervisorEvent> /** + * Determine whether the specified [flavor] can still fit on this driver. + */ + public fun canFit(flavor: Flavor): Boolean + + /** * Spawn the given [Image] on the compute resource of this driver. * * @param name The name of the server to spawn. @@ -50,7 +56,7 @@ public interface VirtDriver { public suspend fun spawn( name: String, image: Image, - flavor: org.opendc.compute.core.Flavor + flavor: Flavor ): Server public companion object Key : AbstractServiceKey<VirtDriver>(UUID.randomUUID(), "virtual-driver") diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt index f7eb9248..508720e2 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt @@ -104,6 +104,14 @@ public class SimVirtDriver(private val coroutineScope: CoroutineScope) : VirtDri */ private val vms = HashSet<VirtualMachine>() + override fun canFit(flavor: Flavor): Boolean { + val sufficientMemory = availableMemory > flavor.memorySize + val enoughCpus = ctx.machine.cpus.size >= flavor.cpuCount + val canFit = hypervisor.canFit(flavor.toMachineModel()) + + return sufficientMemory && enoughCpus && canFit + } + override suspend fun spawn(name: String, image: Image, flavor: Flavor): Server { val requiredMemory = flavor.memorySize if (availableMemory - requiredMemory < 0) { @@ -124,19 +132,26 @@ public class SimVirtDriver(private val coroutineScope: CoroutineScope) : VirtDri ) availableMemory -= requiredMemory - val originalCpu = ctx.machine.cpus[0] - val processingNode = originalCpu.node.copy(coreCount = flavor.cpuCount) - val processingUnits = (0 until flavor.cpuCount).map { originalCpu.copy(id = it, node = processingNode) } - val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, flavor.memorySize)) - val machine = SimMachineModel(processingUnits, memoryUnits) - val vm = VirtualMachine(server, events, hypervisor.createMachine(machine)) + val vm = VirtualMachine(server, events, hypervisor.createMachine(flavor.toMachineModel())) vms.add(vm) vmStarted(vm) eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) return server } + /** + * Convert flavor to machine model. + */ + private fun Flavor.toMachineModel(): SimMachineModel { + val originalCpu = ctx.machine.cpus[0] + val processingNode = originalCpu.node.copy(coreCount = cpuCount) + val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) } + val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize)) + + return SimMachineModel(processingUnits, memoryUnits) + } + private fun vmStarted(vm: VirtualMachine) { vms.forEach { it -> vm.performanceInterferenceModel?.onStart(it.server.image.name) @@ -152,7 +167,7 @@ public class SimVirtDriver(private val coroutineScope: CoroutineScope) : VirtDri /** * A virtual machine instance that the driver manages. */ - private inner class VirtualMachine(server: Server, val events: EventFlow<ServerEvent>, machine: SimMachine) { + private inner class VirtualMachine(server: Server, val events: EventFlow<ServerEvent>, val machine: SimMachine) { val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? val job = coroutineScope.launch { @@ -190,6 +205,8 @@ public class SimVirtDriver(private val coroutineScope: CoroutineScope) : VirtDri exit(null) } catch (cause: Throwable) { exit(cause) + } finally { + machine.close() } } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt index f3a69f48..6b0021e1 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt @@ -196,7 +196,9 @@ public class SimVirtProvisioningService( val requiredMemory = imageInstance.flavor.memorySize val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) - if (selectedHv == null) { + if (selectedHv == null || !selectedHv.driver.canFit(imageInstance.flavor)) { + logger.debug { "Server ${imageInstance.server} selected for scheduling but no capacity available for it." } + if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { tracer.commit(VmSubmissionInvalidEvent(imageInstance.name)) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt new file mode 100644 index 00000000..66d3eda7 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt @@ -0,0 +1,284 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.suspendCancellableCoroutine +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimResourceCommand +import org.opendc.simulator.compute.workload.SimWorkload +import java.time.Clock +import java.util.ArrayDeque +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.math.min + +/** + * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts. + * + * @param listener The hypervisor listener to use. + */ +public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor { + /** + * The execution context in which the hypervisor runs. + */ + private lateinit var ctx: SimExecutionContext + + /** + * The mapping from pCPU to vCPU. + */ + private lateinit var vcpus: Array<VCpu?> + + /** + * The available physical CPUs to schedule on. + */ + private val availableCpus = ArrayDeque<Int>() + + override fun canFit(model: SimMachineModel): Boolean = availableCpus.size >= model.cpus.size + + override fun createMachine( + model: SimMachineModel, + performanceInterferenceModel: PerformanceInterferenceModel? + ): SimMachine { + require(canFit(model)) { "Cannot fit machine" } + return SimVm(model, performanceInterferenceModel) + } + + override fun onStart(ctx: SimExecutionContext) { + this.ctx = ctx + this.vcpus = arrayOfNulls(ctx.machine.cpus.size) + this.availableCpus.addAll(ctx.machine.cpus.indices) + } + + override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand { + return onNext(ctx, cpu, 0.0) + } + + override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand { + return vcpus[cpu]?.next(0.0) ?: SimResourceCommand.Idle() + } + + /** + * A virtual machine running on the hypervisor. + * + * @property model The machine model of the virtual machine. + * @property performanceInterferenceModel The performance interference model to utilize. + */ + private inner class SimVm( + override val model: SimMachineModel, + val performanceInterferenceModel: PerformanceInterferenceModel? = null, + ) : SimMachine { + /** + * A flag to indicate that the machine is terminated. + */ + private var isTerminated = false + + /** + * A [StateFlow] representing the CPU usage of the simulated machine. + */ + override val usage: MutableStateFlow<Double> = MutableStateFlow(0.0) + + /** + * The current active workload. + */ + private var cont: Continuation<Unit>? = null + + /** + * The physical CPUs that have been allocated. + */ + private val pCPUs = model.cpus.map { availableCpus.poll() }.toIntArray() + + /** + * The active CPUs of this virtual machine. + */ + private var cpus: List<VCpu> = emptyList() + + /** + * The execution context in which the workload runs. + */ + val ctx = object : SimExecutionContext { + override val machine: SimMachineModel + get() = model + + override val clock: Clock + get() = this@SimSpaceSharedHypervisor.ctx.clock + + override fun interrupt(cpu: Int) { + require(cpu < cpus.size) { "Invalid CPU identifier" } + cpus[cpu].interrupt() + } + } + + /** + * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + */ + override suspend fun run(workload: SimWorkload) { + require(!isTerminated) { "Machine is terminated" } + require(cont == null) { "Run should not be called concurrently" } + + workload.onStart(ctx) + + return suspendCancellableCoroutine { cont -> + this.cont = cont + this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, model, workload, pCPUs[index]) } + + for (cpu in cpus) { + cpu.start() + } + } + } + + override fun close() { + isTerminated = true + for (pCPU in pCPUs) { + vcpus[pCPU] = null + availableCpus.add(pCPU) + } + } + + /** + * Update the usage of the VM. + */ + fun updateUsage() { + usage.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.model.frequency } + } + + /** + * This method is invoked when one of the CPUs has exited. + */ + fun onCpuExit(cpu: Int) { + // Check whether all other CPUs have finished + if (cpus.all { it.hasExited }) { + val cont = cont + this.cont = null + cont?.resume(Unit) + } + } + + /** + * This method is invoked when one of the CPUs failed. + */ + fun onCpuFailure(e: Throwable) { + // In case the flush fails with an exception, immediately propagate to caller, cancelling all other + // tasks. + val cont = cont + this.cont = null + cont?.resumeWithException(e) + } + } + + /** + * A CPU of the virtual machine. + */ + private inner class VCpu(val vm: SimVm, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) { + /** + * The processing speed of the vCPU. + */ + var speed: Double = 0.0 + set(value) { + field = value + vm.updateUsage() + } + + /** + * A flag to indicate that the CPU has exited. + */ + var hasExited: Boolean = false + + /** + * A flag to indicate that the CPU was started. + */ + var hasStarted: Boolean = false + + /** + * Process the specified [SimResourceCommand] for this CPU. + */ + fun process(command: SimResourceCommand): SimResourceCommand { + return when (command) { + is SimResourceCommand.Idle -> { + speed = 0.0 + command + } + is SimResourceCommand.Consume -> { + speed = min(model.frequency, command.limit) + command + } + is SimResourceCommand.Exit -> { + speed = 0.0 + hasExited = true + + vm.onCpuExit(model.id) + + SimResourceCommand.Idle() + } + } + } + + /** + * Start the CPU. + */ + fun start() { + vcpus[pCPU] = this + interrupt() + } + + /** + * Request the workload for more work. + */ + fun next(remainingWork: Double): SimResourceCommand { + return try { + val command = + if (hasStarted) { + workload.onNext(ctx, model.id, remainingWork) + } else { + hasStarted = true + workload.onStart(ctx, model.id) + } + process(command) + } catch (e: Throwable) { + fail(e) + } + } + + /** + * Interrupt the CPU. + */ + fun interrupt() { + ctx.interrupt(pCPU) + } + + /** + * Fail the CPU. + */ + fun fail(e: Throwable): SimResourceCommand { + hasExited = true + + vm.onCpuFailure(e) + + return SimResourceCommand.Idle() + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt new file mode 100644 index 00000000..7356a1b9 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.simulator.compute.workload.SimRuntimeWorkload +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.utils.DelayControllerClockAdapter +import java.time.Clock + +/** + * A test suite for the [SimSpaceSharedHypervisor]. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class SimSpaceSharedHypervisorTest { + private lateinit var scope: TestCoroutineScope + private lateinit var clock: Clock + private lateinit var machineModel: SimMachineModel + + @BeforeEach + fun setUp() { + scope = TestCoroutineScope() + clock = DelayControllerClockAdapter(scope) + + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) + machineModel = SimMachineModel( + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + ) + } + + /** + * Test a trace workload. + */ + @Test + fun testTrace() { + val usagePm = mutableListOf<Double>() + val usageVm = mutableListOf<Double>() + + scope.launch { + val duration = 5 * 60L + val workloadA = + SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 1), + SimTraceWorkload.Fragment(duration * 1000, 183.0, 1) + ), + ) + + val machine = SimBareMetalMachine(scope, clock, machineModel) + val hypervisor = SimSpaceSharedHypervisor() + + launch { machine.usage.toList(usagePm) } + launch { machine.run(hypervisor) } + + yield() + launch { + val vm = hypervisor.createMachine(machineModel) + launch { vm.usage.toList(usageVm) } + vm.run(workloadA) + } + } + + scope.advanceUntilIdle() + + assertAll( + { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), usagePm) { "Correct PM usage" } }, + { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), usageVm) { "Correct VM usage" } }, + { assertEquals(5 * 60L * 4000, scope.currentTime) { "Took enough time" } } + ) + } + + /** + * Test runtime workload on hypervisor. + */ + @Test + fun testRuntimeWorkload() { + val duration = 5 * 60L * 1000 + val workload = SimRuntimeWorkload(duration) + val machine = SimBareMetalMachine(scope, clock, machineModel) + val hypervisor = SimSpaceSharedHypervisor() + + scope.launch { + launch { machine.run(hypervisor) } + + yield() + launch { hypervisor.createMachine(machineModel).run(workload) } + } + + scope.advanceUntilIdle() + + assertEquals(duration, scope.currentTime) { "Took enough time" } + } + + /** + * Test concurrent workloads on the machine. + */ + @Test + fun testConcurrentWorkloadFails() { + val machine = SimBareMetalMachine(scope, clock, machineModel) + val hypervisor = SimSpaceSharedHypervisor() + + scope.launch { + launch { machine.run(hypervisor) } + + yield() + + hypervisor.createMachine(machineModel) + + assertAll( + { assertFalse(hypervisor.canFit(machineModel)) }, + { assertThrows<IllegalStateException> { hypervisor.createMachine(machineModel) } } + ) + } + + scope.advanceUntilIdle() + } + + /** + * Test concurrent workloads on the machine. + */ + @Test + fun testConcurrentWorkloadSucceeds() { + val machine = SimBareMetalMachine(scope, clock, machineModel) + val hypervisor = SimSpaceSharedHypervisor() + + scope.launch { + launch { machine.run(hypervisor) } + + yield() + + hypervisor.createMachine(machineModel).close() + + assertAll( + { assertTrue(hypervisor.canFit(machineModel)) }, + { assertDoesNotThrow { hypervisor.createMachine(machineModel) } } + ) + } + + scope.advanceUntilIdle() + } +} |
