diff options
| author | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-03-04 13:08:26 +0100 |
|---|---|---|
| committer | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-03-04 13:08:26 +0100 |
| commit | fa7455ac6aaa1e0c34a4218c32423d544373e795 (patch) | |
| tree | fd3a2b12bf5b3841ded39930ad2d3b0c1336448b /opendc/opendc-compute | |
| parent | ac6e6f7c611fa7d10fff5467c4a61af932e4c171 (diff) | |
| parent | 5f5d54b6f1a96bc595f99f367bea54f1d852ec63 (diff) | |
Merge branch 'refactor/2.x-vm-improvements' into 'feat/2.x'
Report CPU usage per server instance
Closes #51
See merge request opendc/opendc-simulator!34
Diffstat (limited to 'opendc/opendc-compute')
13 files changed, 267 insertions, 331 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingNode.kt index 7b00d99c..91f5dde9 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingNode.kt @@ -22,17 +22,19 @@ * SOFTWARE. */ -package com.atlarge.opendc.compute.virt.driver.hypervisor - -import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.execution.ProcessorContext +package com.atlarge.opendc.compute.core /** - * A scheduler that assigns virtual CPUs to virtual machines and maps them to physical CPUs. + * A processing node/package/socket containing possibly several CPU cores. + * + * @property vendor The vendor string of the processor node. + * @property modelName The name of the processor node. + * @property arch The micro-architecture of the processor node. + * @property coreCount The number of logical CPUs in the processor node. */ -public interface VmScheduler { - /** - * Create the virtual CPUs for the specified [flavor]. - */ - fun createVirtualCpus(flavor: Flavor): List<ProcessorContext> -} +data class ProcessingNode( + val vendor: String, + val arch: String, + val modelName: String, + val coreCount: Int +) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingUnit.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingUnit.kt index 945b7061..dbf6d824 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingUnit.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/ProcessingUnit.kt @@ -25,18 +25,14 @@ package com.atlarge.opendc.compute.core /** - * A processing unit of a compute resource, either virtual or physical. + * A single logical compute unit of processor node, either virtual or physical. * - * @property vendor The vendor string of the cpu. - * @property modelName The name of the cpu model. - * @property arch The architecture of the CPU. - * @property clockRate The clock speed of the cpu in MHz. - * @property cores The number of logical cores in the cpu. + * @property node The processing node containing the CPU core. + * @property id The identifier of the CPU core within the processing node. + * @property frequency The clock rate of the CPU. */ public data class ProcessingUnit( - public val vendor: String, - public val modelName: String, - public val arch: String, - public val clockRate: Double, - public val cores: Int + public val node: ProcessingNode, + public val id: Int, + public val frequency: Double ) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt deleted file mode 100644 index c081acd5..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.core.execution - -import com.atlarge.opendc.compute.core.ProcessingUnit - -/** - * An interface for managing a single processing core (CPU) of a (virtual) machine. - */ -public interface ProcessorContext { - /** - * The information about the processing unit. - */ - public val info: ProcessingUnit - - /** - * Request the specified burst time from the processor and suspend execution until the processor finishes - * processing of the requested burst. - * - * @param burst The burst time to request from the processor. - * @param maxUsage The maximum usage in terms of MHz that the processing core may use while running the burst. - * @param deadline The instant at which this request needs to be fulfilled. - * @return The remaining burst time in case the method was cancelled or zero if the processor finished running. - */ - public suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index 53b00aa6..b09a5a7d 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -24,6 +24,7 @@ package com.atlarge.opendc.compute.core.execution +import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.services.AbstractServiceKey @@ -38,9 +39,9 @@ public interface ServerContext { public val server: Server /** - * A list of available processor context instances to use. + * A list of processing units available to use. */ - public val cpus: List<ProcessorContext> + public val cpus: List<ProcessingUnit> /** * Publishes the given [service] with key [serviceKey] in the server's registry. @@ -48,4 +49,20 @@ public interface ServerContext { public suspend fun <T : Any> publishService(serviceKey: AbstractServiceKey<T>, service: T) { server.serviceRegistry[serviceKey] = service } + + /** + * Request the specified burst time from the processor cores and suspend execution until a processor core finishes + * processing a **non-zero** burst or until the deadline is reached. + * + * After the method returns, [burst] will contain the remaining burst length for each of the cores (which may be + * zero). + * + * Both [burst] and [limit] must be of the same size and in any other case the method will throw an + * [IllegalArgumentException]. + * + * @param burst The burst time to request from each of the processor cores. + * @param limit The maximum usage in terms of MHz that the processing core may use while running the burst. + * @param deadline The instant at which this request needs to be fulfilled. + */ + public suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt index 3576b488..27d8091a 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt @@ -26,9 +26,9 @@ package com.atlarge.opendc.compute.core.image import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch +import kotlinx.coroutines.isActive import java.util.UUID +import kotlin.coroutines.coroutineContext import kotlin.math.min /** @@ -42,7 +42,7 @@ import kotlin.math.min * @property cores The number of cores that the image is able to utilize. * @property utilization A model of the CPU utilization of the application. */ -class FlopsApplicationImage( +data class FlopsApplicationImage( public override val uid: UUID, public override val name: String, public override val tags: TagContainer, @@ -61,12 +61,15 @@ class FlopsApplicationImage( */ override suspend fun invoke(ctx: ServerContext) { val cores = min(this.cores, ctx.server.flavor.cpuCount) - val req = flops / cores + val burst = LongArray(cores) { flops / cores } + val maxUsage = DoubleArray(cores) { i -> ctx.cpus[i].frequency * utilization } - coroutineScope { - for (cpu in ctx.cpus.take(cores)) { - launch { cpu.run(req, cpu.info.clockRate * utilization, Long.MAX_VALUE) } + while (coroutineContext.isActive) { + if (burst.all { it == 0L }) { + break } + + ctx.run(burst, maxUsage, Long.MAX_VALUE) } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index 436f4653..52f9068d 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -3,10 +3,10 @@ package com.atlarge.opendc.compute.core.image import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay -import kotlinx.coroutines.launch +import kotlinx.coroutines.ensureActive import java.util.UUID +import kotlin.coroutines.coroutineContext import kotlin.math.min class VmImage( @@ -19,19 +19,20 @@ class VmImage( ) : Image { override suspend fun invoke(ctx: ServerContext) { - flopsHistory.forEach { fragment -> + for (fragment in flopsHistory) { + coroutineContext.ensureActive() + if (fragment.flops == 0L) { delay(fragment.duration) } else { val cores = min(this.cores, ctx.server.flavor.cpuCount) - val req = fragment.flops / cores - coroutineScope { - for (cpu in ctx.cpus.take(cores)) { - val usage = req / (fragment.usage * 1_000_000L) - launch { cpu.run(req, usage, simulationContext.clock.millis() + fragment.duration) } - } - } + val burst = LongArray(cores) { fragment.flops / cores } + val maxUsage = DoubleArray(cores) { i -> burst[i] / (fragment.usage * 1_000_000L) } + + ctx.run(burst, maxUsage, simulationContext.clock.millis() + fragment.duration) } } } + + override fun toString(): String = "VmImage(uid=$uid, name=$name, cores=$cores, requiredMemory=$requiredMemory)" } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 1adc8652..fcdc9363 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -31,7 +31,6 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.execution.ProcessorContext import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.image.EmptyImage import com.atlarge.opendc.compute.core.image.Image @@ -53,14 +52,14 @@ import kotlinx.coroutines.withContext * * @param uid The unique identifier of the machine. * @param name An optional name of the machine. - * @param cpuNodes The CPU nodes/packages available to the bare metal machine. + * @param cpus The CPUs available to the bare metal machine. * @param memoryUnits The memory units in this machine. * @param domain The simulation domain the driver runs in. */ public class SimpleBareMetalDriver( uid: UUID, name: String, - val cpuNodes: List<ProcessingUnit>, + val cpus: List<ProcessingUnit>, val memoryUnits: List<MemoryUnit>, private val domain: Domain ) : BareMetalDriver { @@ -77,7 +76,7 @@ public class SimpleBareMetalDriver( /** * The flavor that corresponds to this machine. */ - private val flavor = Flavor(cpuNodes.sumBy { it.cores }, memoryUnits.map { it.size }.sum()) + private val flavor = Flavor(cpus.size, memoryUnits.map { it.size }.sum()) /** * The job that is running the image. @@ -138,35 +137,10 @@ public class SimpleBareMetalDriver( } } - private data class ProcessorContextImpl(override val info: ProcessingUnit) : ProcessorContext { - override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long { - val start = simulationContext.clock.millis() - val usage = min(maxUsage, info.clockRate) * 1_000_000 // Usage from MHz to Hz - - try { - val duration = min( - max(0, deadline - start), // Determine duration between now and deadline - ceil(burst / usage * 1000).toLong() // Convert from seconds to milliseconds - ) - delay(duration) - } catch (_: CancellationException) { - // On cancellation, we compute and return the remaining burst - } - val end = simulationContext.clock.millis() - val granted = ceil((end - start) / 1000.0 * usage).toLong() - return max(0, burst - granted) - } - } - private val serverCtx = object : ServerManagementContext { private var initialized: Boolean = false - override val cpus: List<ProcessorContextImpl> = cpuNodes - .asSequence() - .flatMap { cpu -> - generateSequence { ProcessorContextImpl(cpu) }.take(cpu.cores) - } - .toList() + override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus override var server: Server get() = node.server!! @@ -189,8 +163,41 @@ public class SimpleBareMetalDriver( val previousState = server.state val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR server = server.copy(state = state) - monitor.onUpdate(server, previousState) initialized = false + domain.launch { monitor.onUpdate(server, previousState) } + } + + override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { + require(burst.size == limit.size) { "Array dimensions do not match" } + + val start = simulationContext.clock.millis() + var duration = max(0, deadline - start) + + // Determine the duration of the first CPU to finish + for (i in 0 until min(cpus.size, burst.size)) { + val cpu = cpus[i] + val usage = min(limit[i], cpu.frequency) * 1_000_000 // Usage from MHz to Hz + val cpuDuration = ceil(burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds + + if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst + duration = min(duration, cpuDuration) + } + } + + try { + delay(duration) + } catch (_: CancellationException) { + // On cancellation, we compute and return the remaining burst + } + + val end = simulationContext.clock.millis() + + // Write back the remaining burst time + for (i in 0 until min(cpus.size, burst.size)) { + val usage = min(limit[i], cpus[i].frequency) * 1_000_000 + val granted = ceil((end - start) / 1000.0 * usage).toLong() + burst[i] = max(0, burst[i] - granted) + } } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt index b7848cf3..8d055953 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt @@ -43,7 +43,7 @@ class HypervisorImage( override val tags: TagContainer = emptyMap() override suspend fun invoke(ctx: ServerContext) { - val driver = HypervisorVirtDriver(ctx, VmSchedulerImpl(ctx, hypervisorMonitor)) + val driver = HypervisorVirtDriver(ctx, hypervisorMonitor) ctx.publishService(VirtDriver.Key, driver) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index e0547dcf..3f358516 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -28,24 +28,30 @@ import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.execution.ProcessorContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor +import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import java.util.UUID +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min /** * A [VirtDriver] that is backed by a simple hypervisor implementation. */ class HypervisorVirtDriver( private val hostContext: ServerContext, - private val scheduler: VmScheduler + private val monitor: HypervisorMonitor ) : VirtDriver { /** * A set for tracking the VM context objects. @@ -84,11 +90,131 @@ class HypervisorVirtDriver( monitors.remove(monitor) } + /** + * The set of [VmServerContext] instances that is being scheduled at the moment. + */ + private val activeVms = mutableSetOf<VmServerContext>() + + /** + * The deferred run call. + */ + private var call: Job? = null + + /** + * Schedule the vCPUs on the physical CPUs. + */ + private suspend fun reschedule() { + flush() + + // Do not schedule a call if there is no work to schedule + if (activeVms.isEmpty()) { + return + } + + val call = simulationContext.domain.launch { + val start = simulationContext.clock.millis() + val vms = activeVms.toSet() + + var duration: Long = Long.MAX_VALUE + var deadline: Long = Long.MAX_VALUE + val usage = DoubleArray(hostContext.cpus.size) + + for (vm in vms) { + for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) { + val cpu = vm.cpus[i] + + // Limit each vCPU to at most an equal share of the host CPU + val actualUsage = min(vm.limit[i], cpu.frequency / vms.size) + + // The duration that we want to run is that of the shortest request from a vCPU + duration = min(duration, ceil(vm.requestedBurst[i] / (actualUsage * 1_000_000L)).toLong()) + deadline = min(deadline, vm.deadline) + usage[i] += actualUsage + } + } + + val burst = LongArray(hostContext.cpus.size) + + for (vm in vms) { + for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) { + val cpu = vm.cpus[i] + + // Limit each vCPU to at most an equal share of the host CPU + val actualUsage = min(vm.limit[i], cpu.frequency / vms.size) + val actualBurst = (duration * actualUsage * 1_000_000L).toLong() + + burst[i] += actualBurst + } + } + + val granted = burst.clone() + // We run the total burst on the host processor. Note that this call may be cancelled at any moment in + // time, so not all of the burst may be executed. + hostContext.run(granted, usage, deadline) + val end = simulationContext.clock.millis() + + // No work was performed + if ((end - start) <= 0) { + return@launch + } + + for (vm in vms) { + for (i in 0 until min(vm.cpus.size, vm.requestedBurst.size)) { + val cpu = vm.cpus[i] + + // Limit each vCPU to at most an equal share of the host CPU + val actualUsage = min(vm.limit[i], cpu.frequency / vms.size) + val actualBurst = (duration * actualUsage * 1_000_000L).toLong() + + // Compute the fraction of compute time allocated to the VM + val fraction = actualUsage / usage[i] + + // Compute the burst time that the VM was actually granted + val grantedBurst = max(0, actualBurst - ceil(burst[i] * fraction).toLong()) + + // Compute remaining burst time to be executed for the request + vm.requestedBurst[i] = max(0, vm.requestedBurst[i] - grantedBurst) + } + + if (vm.requestedBurst.any { it == 0L } || vm.deadline <= end) { + // Return vCPU `run` call: the requested burst was completed or deadline was exceeded + vm.chan.send(Unit) + } + } + + for (i in burst.indices) { + monitor.onSliceFinish( + end, + burst[i], + granted[i], + vms.size, + hostContext.server + ) + } + } + this.call = call + call.invokeOnCompletion { this.call = null } + } + + /** + * Flush the progress of the current active VMs. + */ + private fun flush() { + val call = call ?: return // If there is no active call, there is nothing to flush + // The progress is actually flushed in the coroutine when it notices we cancel it and wait for its + // completion. + call.cancel() + } + internal inner class VmServerContext( override var server: Server, val monitor: ServerMonitor, ctx: SimulationContext ) : ServerManagementContext { + lateinit var requestedBurst: LongArray + lateinit var limit: DoubleArray + var deadline: Long = 0L + var chan = Channel<Unit>(Channel.RENDEZVOUS) private var initialized: Boolean = false internal val job: Job = ctx.domain.launch { @@ -101,7 +227,7 @@ class HypervisorVirtDriver( } } - override val cpus: List<ProcessorContext> = scheduler.createVirtualCpus(server.flavor) + override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount) override suspend fun init() { if (initialized) { @@ -124,5 +250,24 @@ class HypervisorVirtDriver( vms.remove(this) monitors.forEach { it.onUpdate(vms.size, availableMemory) } } + + override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { + require(burst.size == limit.size) { "Array dimensions do not match" } + + requestedBurst = burst + this.limit = limit + this.deadline = deadline + + // Wait until the burst has been run or the coroutine is cancelled + try { + activeVms += this + reschedule() + chan.receive() + } catch (_: CancellationException) { + // On cancellation, we compute and return the remaining burst + } + activeVms -= this + reschedule() + } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt deleted file mode 100644 index 4cc5ac9e..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt +++ /dev/null @@ -1,199 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.virt.driver.hypervisor - -import com.atlarge.odcsim.simulationContext -import com.atlarge.opendc.compute.core.ProcessingUnit -import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.execution.ProcessorContext -import com.atlarge.opendc.compute.core.execution.ServerContext -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.Job -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min - -/** - * A basic implementation of the [VmScheduler] interface. - * - * @property hostContext The [ServerContext] of the host. - * @property hypervisorMonitor The [HypervisorMonitor] to inform with hypervisor scheduling events. - */ -public class VmSchedulerImpl( - private val hostContext: ServerContext, - private val hypervisorMonitor: HypervisorMonitor -) : VmScheduler { - /** - * The available physical CPUs to schedule. - */ - private val cpus = hostContext.cpus.map { HostProcessorContext(it, hostContext, hypervisorMonitor) } - - override fun createVirtualCpus(flavor: Flavor): List<ProcessorContext> { - // TODO At the moment, the first N cores get filled the first. Distribute over all cores instead - require(flavor.cpuCount <= cpus.size) { "Flavor cannot fit on machine" } - - return cpus - .asSequence() - .take(flavor.cpuCount) - .sortedBy { it.vcpus.size } - .map { VirtualProcessorContext(it) } - .toList() - } - - /** - * A wrapper around a host [ProcessorContext] that carries additional information about the vCPUs scheduled on the - * processor. - */ - internal class HostProcessorContext( - delegate: ProcessorContext, - private val hostContext: ServerContext, - private val hypervisorMonitor: HypervisorMonitor - ) : ProcessorContext by delegate { - /** - * The set of vCPUs scheduled on this processor. - */ - var vcpus: MutableSet<VirtualProcessorContext> = mutableSetOf() - - /** - * The deferred run call. - */ - var call: Job? = null - - /** - * Schedule the vCPUs on the physical CPU. - */ - suspend fun reschedule() { - flush() - - val vcpus = HashSet(vcpus) // Create snapshot of the vCPUs that were scheduled at this moment - val call = simulationContext.domain.launch { - var duration: Long = Long.MAX_VALUE - var deadline: Long = Long.MAX_VALUE - - for (vcpu in vcpus) { - // Limit each vCPU to at most an equal share of the host CPU - vcpu.actualUsage = min(vcpu.requestedUsage, info.clockRate / vcpus.size) - - // The duration that we want to run is that of the shortest request from a vCPU - duration = min(duration, ceil(vcpu.requestedBurst / (vcpu.actualUsage * 1_000_000L)).toLong()) - deadline = min(deadline, vcpu.requestedDeadline) - } - - var burst: Long = 0 - var usage: Double = 0.0 - - for (vcpu in vcpus) { - vcpu.actualBurst = (duration * vcpu.actualUsage * 1_000_000L).toLong() - burst += vcpu.actualBurst - usage += vcpu.actualUsage - } - - // Ignore time slice if no work to request - if (burst <= 0L) { - return@launch - } - - // We run the total burst on the host processor. Note that this call may be cancelled at any moment in - // time, so not all of the burst may be executed. - val remainder = run(burst, usage, deadline) - val time = simulationContext.clock.millis() - val totalGrantedBurst: Long = burst - remainder - - // Compute for each vCPU the - for (vcpu in vcpus) { - // Compute the fraction of compute time allocated to the VM - val fraction = vcpu.actualUsage / usage - // Compute the burst time that the VM was actually granted - val grantedBurst = max(0, vcpu.actualBurst - ceil(remainder * fraction).toLong()) - // Compute remaining burst time to be executed for the request - vcpu.requestedBurst = max(0, vcpu.requestedBurst - grantedBurst) - - if (vcpu.requestedBurst == 0L || vcpu.requestedDeadline <= time) { - // Return vCPU `run` call: the requested burst was completed or deadline was exceeded - vcpu.chan.send(Unit) - } - } - - hypervisorMonitor.onSliceFinish( - time, - burst, - totalGrantedBurst, - vcpus.size, - hostContext.server - ) - } - - this.call = call - call.invokeOnCompletion { this.call = null } - } - - /** - * Flush the progress of the current active VMs. - */ - fun flush() { - val call = call ?: return // If there is no active call, there is nothing to flush - // The progress is actually flushed in the coroutine when it notices we cancel it and wait for its - // completion. - call.cancel() - } - } - - /** - * An implementation of [ProcessorContext] that delegates the work to a physical CPU. - */ - internal class VirtualProcessorContext(val host: HostProcessorContext) : ProcessorContext { - var actualBurst: Long = 0 - var actualUsage: Double = 0.0 - var requestedBurst: Long = 0 - var requestedUsage: Double = 0.0 - var requestedDeadline: Long = 0 - var chan = Channel<Unit>(Channel.RENDEZVOUS) - - override val info: ProcessingUnit - get() = host.info - - override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long { - requestedBurst = burst - requestedUsage = maxUsage - requestedDeadline = deadline - - // Wait until the burst has been run or the coroutine is cancelled - try { - host.vcpus.add(this) - host.reschedule() - chan.receive() - } catch (_: CancellationException) { - // On cancellation, we compute and return the remaining burst - } - - host.vcpus.remove(this) - host.reschedule() - return requestedBurst - } - } -} diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index 6b234b73..84b16b68 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -25,7 +25,8 @@ package com.atlarge.opendc.compute.metal.driver import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.odcsim.simulationContext +import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState @@ -52,15 +53,17 @@ internal class SimpleBareMetalDriverTest { val root = system.newDomain(name = "root") root.launch { val dom = root.newDomain(name = "driver") - val flavor = Flavor(4, 0) - val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList(), dom) + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) + val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } + val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", cpus, emptyList(), dom) val monitor = object : ServerMonitor { override suspend fun onUpdate(server: Server, previousState: ServerState) { + println("[${simulationContext.clock.millis()}] $server") finalState = server.state } } - val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2) + val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000_000_000, 2) // Batch driver commands withContext(dom.coroutineContext) { diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt index 3b32b3b8..d5366552 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt @@ -25,6 +25,7 @@ package com.atlarge.opendc.compute.metal.service import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState @@ -59,7 +60,10 @@ internal class SimpleProvisioningServiceTest { } val dom = root.newDomain("provisioner") - val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2300.0, 4)), emptyList(), dom) + + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) + val cpus = List(5) { ProcessingUnit(cpuNode, it, 2400.0) } + val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", cpus, emptyList(), dom) val provisioner = SimpleProvisioningService(dom) provisioner.create(driver) diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt index 002fa175..6cfb2317 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt @@ -29,6 +29,7 @@ import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.core.monitor.ServerMonitor @@ -68,8 +69,8 @@ internal class HypervisorTest { println("Hello World!") } }) - val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000_000, 1) - val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000_000, 1) + val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000_000_000, 1) + val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000_000_000, 1) val monitor = object : ServerMonitor { override suspend fun onUpdate(server: Server, previousState: ServerState) { println("[${simulationContext.clock.millis()}]: $server") @@ -77,11 +78,15 @@ internal class HypervisorTest { } val driverDom = root.newDomain("driver") - val metalDriver = SimpleBareMetalDriver(UUID.randomUUID(), "test", listOf(ProcessingUnit("Intel", "Xeon", "amd64", 2000.0, 1)), emptyList(), driverDom) + + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) + val cpus = List(4) { ProcessingUnit(cpuNode, it, 2000.0) } + val metalDriver = SimpleBareMetalDriver(UUID.randomUUID(), "test", cpus, emptyList(), driverDom) metalDriver.init(monitor) metalDriver.setImage(vmm) metalDriver.setPower(PowerState.POWER_ON) + delay(5) val flavor = Flavor(1, 0) |
