summaryrefslogtreecommitdiff
path: root/simulator
diff options
context:
space:
mode:
Diffstat (limited to 'simulator')
-rw-r--r--simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt8
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt31
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt4
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt284
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt176
5 files changed, 494 insertions, 9 deletions
diff --git a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt
index 5ecfd357..68cc7b50 100644
--- a/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt
+++ b/simulator/opendc-compute/opendc-compute-core/src/main/kotlin/org/opendc/compute/core/virt/driver/VirtDriver.kt
@@ -23,6 +23,7 @@
package org.opendc.compute.core.virt.driver
import kotlinx.coroutines.flow.Flow
+import org.opendc.compute.core.Flavor
import org.opendc.compute.core.Server
import org.opendc.compute.core.image.Image
import org.opendc.compute.core.virt.HypervisorEvent
@@ -40,6 +41,11 @@ public interface VirtDriver {
public val events: Flow<HypervisorEvent>
/**
+ * Determine whether the specified [flavor] can still fit on this driver.
+ */
+ public fun canFit(flavor: Flavor): Boolean
+
+ /**
* Spawn the given [Image] on the compute resource of this driver.
*
* @param name The name of the server to spawn.
@@ -50,7 +56,7 @@ public interface VirtDriver {
public suspend fun spawn(
name: String,
image: Image,
- flavor: org.opendc.compute.core.Flavor
+ flavor: Flavor
): Server
public companion object Key : AbstractServiceKey<VirtDriver>(UUID.randomUUID(), "virtual-driver")
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt
index f7eb9248..508720e2 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtDriver.kt
@@ -104,6 +104,14 @@ public class SimVirtDriver(private val coroutineScope: CoroutineScope) : VirtDri
*/
private val vms = HashSet<VirtualMachine>()
+ override fun canFit(flavor: Flavor): Boolean {
+ val sufficientMemory = availableMemory > flavor.memorySize
+ val enoughCpus = ctx.machine.cpus.size >= flavor.cpuCount
+ val canFit = hypervisor.canFit(flavor.toMachineModel())
+
+ return sufficientMemory && enoughCpus && canFit
+ }
+
override suspend fun spawn(name: String, image: Image, flavor: Flavor): Server {
val requiredMemory = flavor.memorySize
if (availableMemory - requiredMemory < 0) {
@@ -124,19 +132,26 @@ public class SimVirtDriver(private val coroutineScope: CoroutineScope) : VirtDri
)
availableMemory -= requiredMemory
- val originalCpu = ctx.machine.cpus[0]
- val processingNode = originalCpu.node.copy(coreCount = flavor.cpuCount)
- val processingUnits = (0 until flavor.cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
- val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, flavor.memorySize))
- val machine = SimMachineModel(processingUnits, memoryUnits)
- val vm = VirtualMachine(server, events, hypervisor.createMachine(machine))
+ val vm = VirtualMachine(server, events, hypervisor.createMachine(flavor.toMachineModel()))
vms.add(vm)
vmStarted(vm)
eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory))
return server
}
+ /**
+ * Convert flavor to machine model.
+ */
+ private fun Flavor.toMachineModel(): SimMachineModel {
+ val originalCpu = ctx.machine.cpus[0]
+ val processingNode = originalCpu.node.copy(coreCount = cpuCount)
+ val processingUnits = (0 until cpuCount).map { originalCpu.copy(id = it, node = processingNode) }
+ val memoryUnits = listOf(MemoryUnit("Generic", "Generic", 3200.0, memorySize))
+
+ return SimMachineModel(processingUnits, memoryUnits)
+ }
+
private fun vmStarted(vm: VirtualMachine) {
vms.forEach { it ->
vm.performanceInterferenceModel?.onStart(it.server.image.name)
@@ -152,7 +167,7 @@ public class SimVirtDriver(private val coroutineScope: CoroutineScope) : VirtDri
/**
* A virtual machine instance that the driver manages.
*/
- private inner class VirtualMachine(server: Server, val events: EventFlow<ServerEvent>, machine: SimMachine) {
+ private inner class VirtualMachine(server: Server, val events: EventFlow<ServerEvent>, val machine: SimMachine) {
val performanceInterferenceModel: PerformanceInterferenceModel? = server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
val job = coroutineScope.launch {
@@ -190,6 +205,8 @@ public class SimVirtDriver(private val coroutineScope: CoroutineScope) : VirtDri
exit(null)
} catch (cause: Throwable) {
exit(cause)
+ } finally {
+ machine.close()
}
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
index f3a69f48..6b0021e1 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimVirtProvisioningService.kt
@@ -196,7 +196,9 @@ public class SimVirtProvisioningService(
val requiredMemory = imageInstance.flavor.memorySize
val selectedHv = allocationLogic.select(availableHypervisors, imageInstance)
- if (selectedHv == null) {
+ if (selectedHv == null || !selectedHv.driver.canFit(imageInstance.flavor)) {
+ logger.debug { "Server ${imageInstance.server} selected for scheduling but no capacity available for it." }
+
if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) {
tracer.commit(VmSubmissionInvalidEvent(imageInstance.name))
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
new file mode 100644
index 00000000..66d3eda7
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisor.kt
@@ -0,0 +1,284 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute
+
+import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.StateFlow
+import kotlinx.coroutines.suspendCancellableCoroutine
+import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.workload.SimResourceCommand
+import org.opendc.simulator.compute.workload.SimWorkload
+import java.time.Clock
+import java.util.ArrayDeque
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.resume
+import kotlin.coroutines.resumeWithException
+import kotlin.math.min
+
+/**
+ * A [SimHypervisor] that allocates its sub-resources exclusively for the virtual machine that it hosts.
+ *
+ * @param listener The hypervisor listener to use.
+ */
+public class SimSpaceSharedHypervisor(private val listener: SimHypervisor.Listener? = null) : SimHypervisor {
+ /**
+ * The execution context in which the hypervisor runs.
+ */
+ private lateinit var ctx: SimExecutionContext
+
+ /**
+ * The mapping from pCPU to vCPU.
+ */
+ private lateinit var vcpus: Array<VCpu?>
+
+ /**
+ * The available physical CPUs to schedule on.
+ */
+ private val availableCpus = ArrayDeque<Int>()
+
+ override fun canFit(model: SimMachineModel): Boolean = availableCpus.size >= model.cpus.size
+
+ override fun createMachine(
+ model: SimMachineModel,
+ performanceInterferenceModel: PerformanceInterferenceModel?
+ ): SimMachine {
+ require(canFit(model)) { "Cannot fit machine" }
+ return SimVm(model, performanceInterferenceModel)
+ }
+
+ override fun onStart(ctx: SimExecutionContext) {
+ this.ctx = ctx
+ this.vcpus = arrayOfNulls(ctx.machine.cpus.size)
+ this.availableCpus.addAll(ctx.machine.cpus.indices)
+ }
+
+ override fun onStart(ctx: SimExecutionContext, cpu: Int): SimResourceCommand {
+ return onNext(ctx, cpu, 0.0)
+ }
+
+ override fun onNext(ctx: SimExecutionContext, cpu: Int, remainingWork: Double): SimResourceCommand {
+ return vcpus[cpu]?.next(0.0) ?: SimResourceCommand.Idle()
+ }
+
+ /**
+ * A virtual machine running on the hypervisor.
+ *
+ * @property model The machine model of the virtual machine.
+ * @property performanceInterferenceModel The performance interference model to utilize.
+ */
+ private inner class SimVm(
+ override val model: SimMachineModel,
+ val performanceInterferenceModel: PerformanceInterferenceModel? = null,
+ ) : SimMachine {
+ /**
+ * A flag to indicate that the machine is terminated.
+ */
+ private var isTerminated = false
+
+ /**
+ * A [StateFlow] representing the CPU usage of the simulated machine.
+ */
+ override val usage: MutableStateFlow<Double> = MutableStateFlow(0.0)
+
+ /**
+ * The current active workload.
+ */
+ private var cont: Continuation<Unit>? = null
+
+ /**
+ * The physical CPUs that have been allocated.
+ */
+ private val pCPUs = model.cpus.map { availableCpus.poll() }.toIntArray()
+
+ /**
+ * The active CPUs of this virtual machine.
+ */
+ private var cpus: List<VCpu> = emptyList()
+
+ /**
+ * The execution context in which the workload runs.
+ */
+ val ctx = object : SimExecutionContext {
+ override val machine: SimMachineModel
+ get() = model
+
+ override val clock: Clock
+ get() = this@SimSpaceSharedHypervisor.ctx.clock
+
+ override fun interrupt(cpu: Int) {
+ require(cpu < cpus.size) { "Invalid CPU identifier" }
+ cpus[cpu].interrupt()
+ }
+ }
+
+ /**
+ * Run the specified [SimWorkload] on this machine and suspend execution util the workload has finished.
+ */
+ override suspend fun run(workload: SimWorkload) {
+ require(!isTerminated) { "Machine is terminated" }
+ require(cont == null) { "Run should not be called concurrently" }
+
+ workload.onStart(ctx)
+
+ return suspendCancellableCoroutine { cont ->
+ this.cont = cont
+ this.cpus = model.cpus.mapIndexed { index, model -> VCpu(this, model, workload, pCPUs[index]) }
+
+ for (cpu in cpus) {
+ cpu.start()
+ }
+ }
+ }
+
+ override fun close() {
+ isTerminated = true
+ for (pCPU in pCPUs) {
+ vcpus[pCPU] = null
+ availableCpus.add(pCPU)
+ }
+ }
+
+ /**
+ * Update the usage of the VM.
+ */
+ fun updateUsage() {
+ usage.value = cpus.sumByDouble { it.speed } / cpus.sumByDouble { it.model.frequency }
+ }
+
+ /**
+ * This method is invoked when one of the CPUs has exited.
+ */
+ fun onCpuExit(cpu: Int) {
+ // Check whether all other CPUs have finished
+ if (cpus.all { it.hasExited }) {
+ val cont = cont
+ this.cont = null
+ cont?.resume(Unit)
+ }
+ }
+
+ /**
+ * This method is invoked when one of the CPUs failed.
+ */
+ fun onCpuFailure(e: Throwable) {
+ // In case the flush fails with an exception, immediately propagate to caller, cancelling all other
+ // tasks.
+ val cont = cont
+ this.cont = null
+ cont?.resumeWithException(e)
+ }
+ }
+
+ /**
+ * A CPU of the virtual machine.
+ */
+ private inner class VCpu(val vm: SimVm, val model: ProcessingUnit, val workload: SimWorkload, val pCPU: Int) {
+ /**
+ * The processing speed of the vCPU.
+ */
+ var speed: Double = 0.0
+ set(value) {
+ field = value
+ vm.updateUsage()
+ }
+
+ /**
+ * A flag to indicate that the CPU has exited.
+ */
+ var hasExited: Boolean = false
+
+ /**
+ * A flag to indicate that the CPU was started.
+ */
+ var hasStarted: Boolean = false
+
+ /**
+ * Process the specified [SimResourceCommand] for this CPU.
+ */
+ fun process(command: SimResourceCommand): SimResourceCommand {
+ return when (command) {
+ is SimResourceCommand.Idle -> {
+ speed = 0.0
+ command
+ }
+ is SimResourceCommand.Consume -> {
+ speed = min(model.frequency, command.limit)
+ command
+ }
+ is SimResourceCommand.Exit -> {
+ speed = 0.0
+ hasExited = true
+
+ vm.onCpuExit(model.id)
+
+ SimResourceCommand.Idle()
+ }
+ }
+ }
+
+ /**
+ * Start the CPU.
+ */
+ fun start() {
+ vcpus[pCPU] = this
+ interrupt()
+ }
+
+ /**
+ * Request the workload for more work.
+ */
+ fun next(remainingWork: Double): SimResourceCommand {
+ return try {
+ val command =
+ if (hasStarted) {
+ workload.onNext(ctx, model.id, remainingWork)
+ } else {
+ hasStarted = true
+ workload.onStart(ctx, model.id)
+ }
+ process(command)
+ } catch (e: Throwable) {
+ fail(e)
+ }
+ }
+
+ /**
+ * Interrupt the CPU.
+ */
+ fun interrupt() {
+ ctx.interrupt(pCPU)
+ }
+
+ /**
+ * Fail the CPU.
+ */
+ fun fail(e: Throwable): SimResourceCommand {
+ hasExited = true
+
+ vm.onCpuFailure(e)
+
+ return SimResourceCommand.Idle()
+ }
+ }
+}
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
new file mode 100644
index 00000000..7356a1b9
--- /dev/null
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimSpaceSharedHypervisorTest.kt
@@ -0,0 +1,176 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.flow.toList
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.TestCoroutineScope
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.compute.workload.SimFlopsWorkload
+import org.opendc.simulator.compute.workload.SimRuntimeWorkload
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import java.time.Clock
+
+/**
+ * A test suite for the [SimSpaceSharedHypervisor].
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class SimSpaceSharedHypervisorTest {
+ private lateinit var scope: TestCoroutineScope
+ private lateinit var clock: Clock
+ private lateinit var machineModel: SimMachineModel
+
+ @BeforeEach
+ fun setUp() {
+ scope = TestCoroutineScope()
+ clock = DelayControllerClockAdapter(scope)
+
+ val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1)
+ machineModel = SimMachineModel(
+ cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 3200.0) },
+ memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ )
+ }
+
+ /**
+ * Test a trace workload.
+ */
+ @Test
+ fun testTrace() {
+ val usagePm = mutableListOf<Double>()
+ val usageVm = mutableListOf<Double>()
+
+ scope.launch {
+ val duration = 5 * 60L
+ val workloadA =
+ SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 3500.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 1),
+ SimTraceWorkload.Fragment(duration * 1000, 183.0, 1)
+ ),
+ )
+
+ val machine = SimBareMetalMachine(scope, clock, machineModel)
+ val hypervisor = SimSpaceSharedHypervisor()
+
+ launch { machine.usage.toList(usagePm) }
+ launch { machine.run(hypervisor) }
+
+ yield()
+ launch {
+ val vm = hypervisor.createMachine(machineModel)
+ launch { vm.usage.toList(usageVm) }
+ vm.run(workloadA)
+ }
+ }
+
+ scope.advanceUntilIdle()
+
+ assertAll(
+ { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), usagePm) { "Correct PM usage" } },
+ { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), usageVm) { "Correct VM usage" } },
+ { assertEquals(5 * 60L * 4000, scope.currentTime) { "Took enough time" } }
+ )
+ }
+
+ /**
+ * Test runtime workload on hypervisor.
+ */
+ @Test
+ fun testRuntimeWorkload() {
+ val duration = 5 * 60L * 1000
+ val workload = SimRuntimeWorkload(duration)
+ val machine = SimBareMetalMachine(scope, clock, machineModel)
+ val hypervisor = SimSpaceSharedHypervisor()
+
+ scope.launch {
+ launch { machine.run(hypervisor) }
+
+ yield()
+ launch { hypervisor.createMachine(machineModel).run(workload) }
+ }
+
+ scope.advanceUntilIdle()
+
+ assertEquals(duration, scope.currentTime) { "Took enough time" }
+ }
+
+ /**
+ * Test concurrent workloads on the machine.
+ */
+ @Test
+ fun testConcurrentWorkloadFails() {
+ val machine = SimBareMetalMachine(scope, clock, machineModel)
+ val hypervisor = SimSpaceSharedHypervisor()
+
+ scope.launch {
+ launch { machine.run(hypervisor) }
+
+ yield()
+
+ hypervisor.createMachine(machineModel)
+
+ assertAll(
+ { assertFalse(hypervisor.canFit(machineModel)) },
+ { assertThrows<IllegalStateException> { hypervisor.createMachine(machineModel) } }
+ )
+ }
+
+ scope.advanceUntilIdle()
+ }
+
+ /**
+ * Test concurrent workloads on the machine.
+ */
+ @Test
+ fun testConcurrentWorkloadSucceeds() {
+ val machine = SimBareMetalMachine(scope, clock, machineModel)
+ val hypervisor = SimSpaceSharedHypervisor()
+
+ scope.launch {
+ launch { machine.run(hypervisor) }
+
+ yield()
+
+ hypervisor.createMachine(machineModel).close()
+
+ assertAll(
+ { assertTrue(hypervisor.canFit(machineModel)) },
+ { assertDoesNotThrow { hypervisor.createMachine(machineModel) } }
+ )
+ }
+
+ scope.advanceUntilIdle()
+ }
+}