diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-02-29 15:27:06 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-02-29 15:41:41 +0100 |
| commit | 6ba8fec85214feca97d6c809aa816e2807ae547b (patch) | |
| tree | d31085b00552339fc8113d268d5bc92ff9ce20ee /opendc | |
| parent | 1631157a0c6d3ee6f93d422ad39bcee0ece4de3b (diff) | |
refactor: Report CPU usage per server
This change refactors the codebase so that the CPU usage of the server
is only reported per server, instead of per CPU reducing the total
amount of messages needed and additionally simplifying synchronization
of various computations.
Diffstat (limited to 'opendc')
11 files changed, 211 insertions, 337 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt deleted file mode 100644 index c081acd5..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.core.execution - -import com.atlarge.opendc.compute.core.ProcessingUnit - -/** - * An interface for managing a single processing core (CPU) of a (virtual) machine. - */ -public interface ProcessorContext { - /** - * The information about the processing unit. - */ - public val info: ProcessingUnit - - /** - * Request the specified burst time from the processor and suspend execution until the processor finishes - * processing of the requested burst. - * - * @param burst The burst time to request from the processor. - * @param maxUsage The maximum usage in terms of MHz that the processing core may use while running the burst. - * @param deadline The instant at which this request needs to be fulfilled. - * @return The remaining burst time in case the method was cancelled or zero if the processor finished running. - */ - public suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index 53b00aa6..3a804f51 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -24,6 +24,7 @@ package com.atlarge.opendc.compute.core.execution +import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.services.AbstractServiceKey @@ -38,9 +39,9 @@ public interface ServerContext { public val server: Server /** - * A list of available processor context instances to use. + * A list of processing units available to use. */ - public val cpus: List<ProcessorContext> + public val cpus: List<ProcessingUnit> /** * Publishes the given [service] with key [serviceKey] in the server's registry. @@ -48,4 +49,15 @@ public interface ServerContext { public suspend fun <T : Any> publishService(serviceKey: AbstractServiceKey<T>, service: T) { server.serviceRegistry[serviceKey] = service } + + /** + * Request the specified burst time from the processor cores and suspend execution until a processor core finishes + * processing a **non-zero** burst or until the deadline is reached. + * + * @param burst The burst time to request from each of the processor cores. + * @param maxUsage The maximum usage in terms of MHz that the processing core may use while running the burst. + * @param deadline The instant at which this request needs to be fulfilled. + * @return The remaining burst time of the processor cores. + */ + public suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long): LongArray } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt index de429d41..d2d35db9 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt @@ -26,9 +26,9 @@ package com.atlarge.opendc.compute.core.image import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch +import kotlinx.coroutines.isActive import java.util.UUID +import kotlin.coroutines.coroutineContext import kotlin.math.min /** @@ -61,12 +61,15 @@ class FlopsApplicationImage( */ override suspend fun invoke(ctx: ServerContext) { val cores = min(this.cores, ctx.server.flavor.cpuCount) - val req = flops / cores + var burst = LongArray(cores) { flops / cores } + val maxUsage = DoubleArray(cores) { i -> ctx.cpus[i].frequency * utilization } - coroutineScope { - for (cpu in ctx.cpus.take(cores)) { - launch { cpu.run(req, cpu.info.frequency * utilization, Long.MAX_VALUE) } + while (coroutineContext.isActive) { + if (burst.all { it == 0L }) { + break } + + burst = ctx.run(burst, maxUsage, Long.MAX_VALUE) } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index 436f4653..7c4fe839 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -3,10 +3,10 @@ package com.atlarge.opendc.compute.core.image import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay -import kotlinx.coroutines.launch +import kotlinx.coroutines.ensureActive import java.util.UUID +import kotlin.coroutines.coroutineContext import kotlin.math.min class VmImage( @@ -19,18 +19,17 @@ class VmImage( ) : Image { override suspend fun invoke(ctx: ServerContext) { - flopsHistory.forEach { fragment -> + for (fragment in flopsHistory) { + coroutineContext.ensureActive() + if (fragment.flops == 0L) { delay(fragment.duration) } else { val cores = min(this.cores, ctx.server.flavor.cpuCount) - val req = fragment.flops / cores - coroutineScope { - for (cpu in ctx.cpus.take(cores)) { - val usage = req / (fragment.usage * 1_000_000L) - launch { cpu.run(req, usage, simulationContext.clock.millis() + fragment.duration) } - } - } + val burst = LongArray(cores) { fragment.flops / cores } + val maxUsage = DoubleArray(cores) { i -> burst[i] / (fragment.usage * 1_000_000L) } + + ctx.run(burst, maxUsage, simulationContext.clock.millis() + fragment.duration) } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index e1b7b178..d86e04ff 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -31,7 +31,6 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.execution.ProcessorContext import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.image.EmptyImage import com.atlarge.opendc.compute.core.image.Image @@ -138,32 +137,10 @@ public class SimpleBareMetalDriver( } } - private data class ProcessorContextImpl(override val info: ProcessingUnit) : ProcessorContext { - override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long { - val start = simulationContext.clock.millis() - val usage = min(maxUsage, info.frequency) * 1_000_000 // Usage from MHz to Hz - - try { - val duration = min( - max(0, deadline - start), // Determine duration between now and deadline - ceil(burst / usage * 1000).toLong() // Convert from seconds to milliseconds - ) - delay(duration) - } catch (_: CancellationException) { - // On cancellation, we compute and return the remaining burst - } - val end = simulationContext.clock.millis() - val granted = ceil((end - start) / 1000.0 * usage).toLong() - return max(0, burst - granted) - } - } - private val serverCtx = object : ServerManagementContext { private var initialized: Boolean = false - override val cpus: List<ProcessorContextImpl> = this@SimpleBareMetalDriver.cpus - .map { ProcessorContextImpl(it) } - .toList() + override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus override var server: Server get() = node.server!! @@ -186,8 +163,44 @@ public class SimpleBareMetalDriver( val previousState = server.state val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR server = server.copy(state = state) - monitor.onUpdate(server, previousState) initialized = false + domain.launch { monitor.onUpdate(server, previousState) } + } + + override suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long): LongArray { + val start = simulationContext.clock.millis() + var duration = max(0, deadline - start) + + for (i in 0..cpus.size) { + if (i >= burst.size || i >= maxUsage.size) { + continue + } + + val cpu = cpus[i] + val usage = min(maxUsage[i], cpu.frequency) * 1_000_000 // Usage from MHz to Hz + val cpuDuration = ceil(burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds + + if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst + duration = min(duration, cpuDuration) + } + } + + try { + delay(duration) + } catch (_: CancellationException) { + // On cancellation, we compute and return the remaining burst + } + + val end = simulationContext.clock.millis() + return LongArray(cpus.size) { i -> + if (i < burst.size && i < maxUsage.size) { + val usage = min(maxUsage[i], cpus[i].frequency) * 1_000_000 + val granted = ceil((end - start) / 1000.0 * usage).toLong() + max(0, burst[i] - granted) + } else { + 0 + } + } } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt index b7848cf3..8d055953 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt @@ -43,7 +43,7 @@ class HypervisorImage( override val tags: TagContainer = emptyMap() override suspend fun invoke(ctx: ServerContext) { - val driver = HypervisorVirtDriver(ctx, VmSchedulerImpl(ctx, hypervisorMonitor)) + val driver = HypervisorVirtDriver(ctx, hypervisorMonitor) ctx.publishService(VirtDriver.Key, driver) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index e0547dcf..87c4f073 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -28,24 +28,30 @@ import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.execution.ProcessorContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor +import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import java.util.UUID +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min /** * A [VirtDriver] that is backed by a simple hypervisor implementation. */ class HypervisorVirtDriver( private val hostContext: ServerContext, - private val scheduler: VmScheduler + private val monitor: HypervisorMonitor ) : VirtDriver { /** * A set for tracking the VM context objects. @@ -84,11 +90,114 @@ class HypervisorVirtDriver( monitors.remove(monitor) } + /** + * The set of [VmServerContext] instances that is being scheduled at the moment. + */ + private val activeVms = mutableSetOf<VmServerContext>() + + /** + * The deferred run call. + */ + private var call: Job? = null + + /** + * Schedule the vCPUs on the physical CPUs. + */ + private suspend fun reschedule() { + flush() + val call = simulationContext.domain.launch { + val start = simulationContext.clock.millis() + val vms = activeVms.toSet() + + var duration: Long = Long.MAX_VALUE + var deadline: Long = Long.MAX_VALUE + val usage = DoubleArray(hostContext.cpus.size) + + for (vm in vms) { + for (i in vm.cpus.indices) { + val cpu = vm.cpus[i] + + // Limit each vCPU to at most an equal share of the host CPU + val actualUsage = min(vm.requestedUsage[i], cpu.frequency / vms.size) + + // The duration that we want to run is that of the shortest request from a vCPU + duration = min(duration, ceil(vm.requestedBurst[i] / (actualUsage * 1_000_000L)).toLong()) + deadline = min(deadline, vm.requestedDeadline) + usage[i] += actualUsage + } + } + + val burst = LongArray(hostContext.cpus.size) + + for (vm in vms) { + for (i in vm.cpus.indices) { + val cpu = vm.cpus[i] + + // Limit each vCPU to at most an equal share of the host CPU + val actualUsage = min(vm.requestedUsage[i], cpu.frequency / vms.size) + val actualBurst = (duration * actualUsage * 1_000_000L).toLong() + + burst[i] += actualBurst + } + } + + // We run the total burst on the host processor. Note that this call may be cancelled at any moment in + // time, so not all of the burst may be executed. + val remainder = hostContext.run(burst, usage, deadline) + val end = simulationContext.clock.millis() + + // No work was performed + if ((end - start) <= 0) { + return@launch + } + + for (vm in vms) { + for (i in vm.cpus.indices) { + val cpu = vm.cpus[i] + + // Limit each vCPU to at most an equal share of the host CPU + val actualUsage = min(vm.requestedUsage[i], cpu.frequency / vms.size) + + // Compute the fraction of compute time allocated to the VM + val fraction = actualUsage / usage[i] + + // Compute the burst time that the VM was actually granted + val grantedBurst = max(0, burst[i] - ceil(remainder[i] * fraction).toLong()) + + // Compute remaining burst time to be executed for the request + vm.requestedBurst[i] = max(0, vm.requestedBurst[i] - grantedBurst) + } + + if (vm.requestedBurst.any { it == 0L } || vm.requestedDeadline <= end) { + // Return vCPU `run` call: the requested burst was completed or deadline was exceeded + vm.chan.send(Unit) + } + } + } + + this.call = call + call.invokeOnCompletion { this.call = null } + } + + /** + * Flush the progress of the current active VMs. + */ + private fun flush() { + val call = call ?: return // If there is no active call, there is nothing to flush + // The progress is actually flushed in the coroutine when it notices we cancel it and wait for its + // completion. + call.cancel() + } + internal inner class VmServerContext( override var server: Server, val monitor: ServerMonitor, ctx: SimulationContext ) : ServerManagementContext { + val requestedBurst: LongArray = LongArray(server.flavor.cpuCount) + val requestedUsage: DoubleArray = DoubleArray(server.flavor.cpuCount) + var requestedDeadline: Long = 0L + var chan = Channel<Unit>(Channel.RENDEZVOUS) private var initialized: Boolean = false internal val job: Job = ctx.domain.launch { @@ -101,7 +210,7 @@ class HypervisorVirtDriver( } } - override val cpus: List<ProcessorContext> = scheduler.createVirtualCpus(server.flavor) + override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount) override suspend fun init() { if (initialized) { @@ -124,5 +233,25 @@ class HypervisorVirtDriver( vms.remove(this) monitors.forEach { it.onUpdate(vms.size, availableMemory) } } + + override suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long): LongArray { + for (i in cpus.indices) { + requestedBurst[i] = if (i < burst.size) burst[i] else 0 + requestedUsage[i] = if (i < maxUsage.size) maxUsage[i] else 0.0 + } + requestedDeadline = deadline + + // Wait until the burst has been run or the coroutine is cancelled + try { + activeVms += this + reschedule() + chan.receive() + } catch (_: CancellationException) { + // On cancellation, we compute and return the remaining burst + } + activeVms -= this + reschedule() + return requestedBurst + } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt deleted file mode 100644 index 7b00d99c..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.virt.driver.hypervisor - -import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.execution.ProcessorContext - -/** - * A scheduler that assigns virtual CPUs to virtual machines and maps them to physical CPUs. - */ -public interface VmScheduler { - /** - * Create the virtual CPUs for the specified [flavor]. - */ - fun createVirtualCpus(flavor: Flavor): List<ProcessorContext> -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt deleted file mode 100644 index f232d695..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt +++ /dev/null @@ -1,199 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.virt.driver.hypervisor - -import com.atlarge.odcsim.simulationContext -import com.atlarge.opendc.compute.core.ProcessingUnit -import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.execution.ProcessorContext -import com.atlarge.opendc.compute.core.execution.ServerContext -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.Job -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min - -/** - * A basic implementation of the [VmScheduler] interface. - * - * @property hostContext The [ServerContext] of the host. - * @property hypervisorMonitor The [HypervisorMonitor] to inform with hypervisor scheduling events. - */ -public class VmSchedulerImpl( - private val hostContext: ServerContext, - private val hypervisorMonitor: HypervisorMonitor -) : VmScheduler { - /** - * The available physical CPUs to schedule. - */ - private val cpus = hostContext.cpus.map { HostProcessorContext(it, hostContext, hypervisorMonitor) } - - override fun createVirtualCpus(flavor: Flavor): List<ProcessorContext> { - // TODO At the moment, the first N cores get filled the first. Distribute over all cores instead - require(flavor.cpuCount <= cpus.size) { "Flavor cannot fit on machine" } - - return cpus - .asSequence() - .take(flavor.cpuCount) - .sortedBy { it.vcpus.size } - .map { VirtualProcessorContext(it) } - .toList() - } - - /** - * A wrapper around a host [ProcessorContext] that carries additional information about the vCPUs scheduled on the - * processor. - */ - internal class HostProcessorContext( - delegate: ProcessorContext, - private val hostContext: ServerContext, - private val hypervisorMonitor: HypervisorMonitor - ) : ProcessorContext by delegate { - /** - * The set of vCPUs scheduled on this processor. - */ - var vcpus: MutableSet<VirtualProcessorContext> = mutableSetOf() - - /** - * The deferred run call. - */ - var call: Job? = null - - /** - * Schedule the vCPUs on the physical CPU. - */ - suspend fun reschedule() { - flush() - - val vcpus = HashSet(vcpus) // Create snapshot of the vCPUs that were scheduled at this moment - val call = simulationContext.domain.launch { - var duration: Long = Long.MAX_VALUE - var deadline: Long = Long.MAX_VALUE - - for (vcpu in vcpus) { - // Limit each vCPU to at most an equal share of the host CPU - vcpu.actualUsage = min(vcpu.requestedUsage, info.frequency / vcpus.size) - - // The duration that we want to run is that of the shortest request from a vCPU - duration = min(duration, ceil(vcpu.requestedBurst / (vcpu.actualUsage * 1_000_000L)).toLong()) - deadline = min(deadline, vcpu.requestedDeadline) - } - - var burst: Long = 0 - var usage: Double = 0.0 - - for (vcpu in vcpus) { - vcpu.actualBurst = (duration * vcpu.actualUsage * 1_000_000L).toLong() - burst += vcpu.actualBurst - usage += vcpu.actualUsage - } - - // Ignore time slice if no work to request - if (burst <= 0L) { - return@launch - } - - // We run the total burst on the host processor. Note that this call may be cancelled at any moment in - // time, so not all of the burst may be executed. - val remainder = run(burst, usage, deadline) - val time = simulationContext.clock.millis() - val totalGrantedBurst: Long = burst - remainder - - // Compute for each vCPU the - for (vcpu in vcpus) { - // Compute the fraction of compute time allocated to the VM - val fraction = vcpu.actualUsage / usage - // Compute the burst time that the VM was actually granted - val grantedBurst = max(0, vcpu.actualBurst - ceil(remainder * fraction).toLong()) - // Compute remaining burst time to be executed for the request - vcpu.requestedBurst = max(0, vcpu.requestedBurst - grantedBurst) - - if (vcpu.requestedBurst == 0L || vcpu.requestedDeadline <= time) { - // Return vCPU `run` call: the requested burst was completed or deadline was exceeded - vcpu.chan.send(Unit) - } - } - - hypervisorMonitor.onSliceFinish( - time, - burst, - totalGrantedBurst, - vcpus.size, - hostContext.server - ) - } - - this.call = call - call.invokeOnCompletion { this.call = null } - } - - /** - * Flush the progress of the current active VMs. - */ - fun flush() { - val call = call ?: return // If there is no active call, there is nothing to flush - // The progress is actually flushed in the coroutine when it notices we cancel it and wait for its - // completion. - call.cancel() - } - } - - /** - * An implementation of [ProcessorContext] that delegates the work to a physical CPU. - */ - internal class VirtualProcessorContext(val host: HostProcessorContext) : ProcessorContext { - var actualBurst: Long = 0 - var actualUsage: Double = 0.0 - var requestedBurst: Long = 0 - var requestedUsage: Double = 0.0 - var requestedDeadline: Long = 0 - var chan = Channel<Unit>(Channel.RENDEZVOUS) - - override val info: ProcessingUnit - get() = host.info - - override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long { - requestedBurst = burst - requestedUsage = maxUsage - requestedDeadline = deadline - - // Wait until the burst has been run or the coroutine is cancelled - try { - host.vcpus.add(this) - host.reschedule() - chan.receive() - } catch (_: CancellationException) { - // On cancellation, we compute and return the remaining burst - } - - host.vcpus.remove(this) - host.reschedule() - return requestedBurst - } - } -} diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index 6468a408..84b16b68 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -25,6 +25,7 @@ package com.atlarge.opendc.compute.metal.driver import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server @@ -53,15 +54,16 @@ internal class SimpleBareMetalDriverTest { root.launch { val dom = root.newDomain(name = "driver") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) - val cpus = List(5) { ProcessingUnit(cpuNode, it, 2400.0) } + val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", cpus, emptyList(), dom) val monitor = object : ServerMonitor { override suspend fun onUpdate(server: Server, previousState: ServerState) { + println("[${simulationContext.clock.millis()}] $server") finalState = server.state } } - val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2) + val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000_000_000, 2) // Batch driver commands withContext(dom.coroutineContext) { diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt index 7e5a4bbe..6cfb2317 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt @@ -69,8 +69,8 @@ internal class HypervisorTest { println("Hello World!") } }) - val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000_000, 1) - val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000_000, 1) + val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000_000_000, 1) + val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000_000_000, 1) val monitor = object : ServerMonitor { override suspend fun onUpdate(server: Server, previousState: ServerState) { println("[${simulationContext.clock.millis()}]: $server") @@ -80,12 +80,13 @@ internal class HypervisorTest { val driverDom = root.newDomain("driver") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) - val cpus = List(5) { ProcessingUnit(cpuNode, it, 2000.0) } + val cpus = List(4) { ProcessingUnit(cpuNode, it, 2000.0) } val metalDriver = SimpleBareMetalDriver(UUID.randomUUID(), "test", cpus, emptyList(), driverDom) metalDriver.init(monitor) metalDriver.setImage(vmm) metalDriver.setPower(PowerState.POWER_ON) + delay(5) val flavor = Flavor(1, 0) |
