diff options
Diffstat (limited to 'opendc')
11 files changed, 211 insertions, 337 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt deleted file mode 100644 index c081acd5..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ProcessorContext.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.core.execution - -import com.atlarge.opendc.compute.core.ProcessingUnit - -/** - * An interface for managing a single processing core (CPU) of a (virtual) machine. - */ -public interface ProcessorContext { - /** - * The information about the processing unit. - */ - public val info: ProcessingUnit - - /** - * Request the specified burst time from the processor and suspend execution until the processor finishes - * processing of the requested burst. - * - * @param burst The burst time to request from the processor. - * @param maxUsage The maximum usage in terms of MHz that the processing core may use while running the burst. - * @param deadline The instant at which this request needs to be fulfilled. - * @return The remaining burst time in case the method was cancelled or zero if the processor finished running. - */ - public suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index 53b00aa6..3a804f51 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -24,6 +24,7 @@ package com.atlarge.opendc.compute.core.execution +import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.services.AbstractServiceKey @@ -38,9 +39,9 @@ public interface ServerContext { public val server: Server /** - * A list of available processor context instances to use. + * A list of processing units available to use. */ - public val cpus: List<ProcessorContext> + public val cpus: List<ProcessingUnit> /** * Publishes the given [service] with key [serviceKey] in the server's registry. @@ -48,4 +49,15 @@ public interface ServerContext { public suspend fun <T : Any> publishService(serviceKey: AbstractServiceKey<T>, service: T) { server.serviceRegistry[serviceKey] = service } + + /** + * Request the specified burst time from the processor cores and suspend execution until a processor core finishes + * processing a **non-zero** burst or until the deadline is reached. + * + * @param burst The burst time to request from each of the processor cores. + * @param maxUsage The maximum usage in terms of MHz that the processing core may use while running the burst. + * @param deadline The instant at which this request needs to be fulfilled. + * @return The remaining burst time of the processor cores. + */ + public suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long): LongArray } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt index de429d41..d2d35db9 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt @@ -26,9 +26,9 @@ package com.atlarge.opendc.compute.core.image import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch +import kotlinx.coroutines.isActive import java.util.UUID +import kotlin.coroutines.coroutineContext import kotlin.math.min /** @@ -61,12 +61,15 @@ class FlopsApplicationImage( */ override suspend fun invoke(ctx: ServerContext) { val cores = min(this.cores, ctx.server.flavor.cpuCount) - val req = flops / cores + var burst = LongArray(cores) { flops / cores } + val maxUsage = DoubleArray(cores) { i -> ctx.cpus[i].frequency * utilization } - coroutineScope { - for (cpu in ctx.cpus.take(cores)) { - launch { cpu.run(req, cpu.info.frequency * utilization, Long.MAX_VALUE) } + while (coroutineContext.isActive) { + if (burst.all { it == 0L }) { + break } + + burst = ctx.run(burst, maxUsage, Long.MAX_VALUE) } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index 436f4653..7c4fe839 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -3,10 +3,10 @@ package com.atlarge.opendc.compute.core.image import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay -import kotlinx.coroutines.launch +import kotlinx.coroutines.ensureActive import java.util.UUID +import kotlin.coroutines.coroutineContext import kotlin.math.min class VmImage( @@ -19,18 +19,17 @@ class VmImage( ) : Image { override suspend fun invoke(ctx: ServerContext) { - flopsHistory.forEach { fragment -> + for (fragment in flopsHistory) { + coroutineContext.ensureActive() + if (fragment.flops == 0L) { delay(fragment.duration) } else { val cores = min(this.cores, ctx.server.flavor.cpuCount) - val req = fragment.flops / cores - coroutineScope { - for (cpu in ctx.cpus.take(cores)) { - val usage = req / (fragment.usage * 1_000_000L) - launch { cpu.run(req, usage, simulationContext.clock.millis() + fragment.duration) } - } - } + val burst = LongArray(cores) { fragment.flops / cores } + val maxUsage = DoubleArray(cores) { i -> burst[i] / (fragment.usage * 1_000_000L) } + + ctx.run(burst, maxUsage, simulationContext.clock.millis() + fragment.duration) } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index e1b7b178..d86e04ff 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -31,7 +31,6 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.execution.ProcessorContext import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.image.EmptyImage import com.atlarge.opendc.compute.core.image.Image @@ -138,32 +137,10 @@ public class SimpleBareMetalDriver( } } - private data class ProcessorContextImpl(override val info: ProcessingUnit) : ProcessorContext { - override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long { - val start = simulationContext.clock.millis() - val usage = min(maxUsage, info.frequency) * 1_000_000 // Usage from MHz to Hz - - try { - val duration = min( - max(0, deadline - start), // Determine duration between now and deadline - ceil(burst / usage * 1000).toLong() // Convert from seconds to milliseconds - ) - delay(duration) - } catch (_: CancellationException) { - // On cancellation, we compute and return the remaining burst - } - val end = simulationContext.clock.millis() - val granted = ceil((end - start) / 1000.0 * usage).toLong() - return max(0, burst - granted) - } - } - private val serverCtx = object : ServerManagementContext { private var initialized: Boolean = false - override val cpus: List<ProcessorContextImpl> = this@SimpleBareMetalDriver.cpus - .map { ProcessorContextImpl(it) } - .toList() + override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus override var server: Server get() = node.server!! @@ -186,8 +163,44 @@ public class SimpleBareMetalDriver( val previousState = server.state val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR server = server.copy(state = state) - monitor.onUpdate(server, previousState) initialized = false + domain.launch { monitor.onUpdate(server, previousState) } + } + + override suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long): LongArray { + val start = simulationContext.clock.millis() + var duration = max(0, deadline - start) + + for (i in 0..cpus.size) { + if (i >= burst.size || i >= maxUsage.size) { + continue + } + + val cpu = cpus[i] + val usage = min(maxUsage[i], cpu.frequency) * 1_000_000 // Usage from MHz to Hz + val cpuDuration = ceil(burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds + + if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst + duration = min(duration, cpuDuration) + } + } + + try { + delay(duration) + } catch (_: CancellationException) { + // On cancellation, we compute and return the remaining burst + } + + val end = simulationContext.clock.millis() + return LongArray(cpus.size) { i -> + if (i < burst.size && i < maxUsage.size) { + val usage = min(maxUsage[i], cpus[i].frequency) * 1_000_000 + val granted = ceil((end - start) / 1000.0 * usage).toLong() + max(0, burst[i] - granted) + } else { + 0 + } + } } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt index b7848cf3..8d055953 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt @@ -43,7 +43,7 @@ class HypervisorImage( override val tags: TagContainer = emptyMap() override suspend fun invoke(ctx: ServerContext) { - val driver = HypervisorVirtDriver(ctx, VmSchedulerImpl(ctx, hypervisorMonitor)) + val driver = HypervisorVirtDriver(ctx, hypervisorMonitor) ctx.publishService(VirtDriver.Key, driver) diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index e0547dcf..87c4f073 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -28,24 +28,30 @@ import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor +import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.execution.ProcessorContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.driver.VirtDriverMonitor +import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import java.util.UUID +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min /** * A [VirtDriver] that is backed by a simple hypervisor implementation. */ class HypervisorVirtDriver( private val hostContext: ServerContext, - private val scheduler: VmScheduler + private val monitor: HypervisorMonitor ) : VirtDriver { /** * A set for tracking the VM context objects. @@ -84,11 +90,114 @@ class HypervisorVirtDriver( monitors.remove(monitor) } + /** + * The set of [VmServerContext] instances that is being scheduled at the moment. + */ + private val activeVms = mutableSetOf<VmServerContext>() + + /** + * The deferred run call. + */ + private var call: Job? = null + + /** + * Schedule the vCPUs on the physical CPUs. + */ + private suspend fun reschedule() { + flush() + val call = simulationContext.domain.launch { + val start = simulationContext.clock.millis() + val vms = activeVms.toSet() + + var duration: Long = Long.MAX_VALUE + var deadline: Long = Long.MAX_VALUE + val usage = DoubleArray(hostContext.cpus.size) + + for (vm in vms) { + for (i in vm.cpus.indices) { + val cpu = vm.cpus[i] + + // Limit each vCPU to at most an equal share of the host CPU + val actualUsage = min(vm.requestedUsage[i], cpu.frequency / vms.size) + + // The duration that we want to run is that of the shortest request from a vCPU + duration = min(duration, ceil(vm.requestedBurst[i] / (actualUsage * 1_000_000L)).toLong()) + deadline = min(deadline, vm.requestedDeadline) + usage[i] += actualUsage + } + } + + val burst = LongArray(hostContext.cpus.size) + + for (vm in vms) { + for (i in vm.cpus.indices) { + val cpu = vm.cpus[i] + + // Limit each vCPU to at most an equal share of the host CPU + val actualUsage = min(vm.requestedUsage[i], cpu.frequency / vms.size) + val actualBurst = (duration * actualUsage * 1_000_000L).toLong() + + burst[i] += actualBurst + } + } + + // We run the total burst on the host processor. Note that this call may be cancelled at any moment in + // time, so not all of the burst may be executed. + val remainder = hostContext.run(burst, usage, deadline) + val end = simulationContext.clock.millis() + + // No work was performed + if ((end - start) <= 0) { + return@launch + } + + for (vm in vms) { + for (i in vm.cpus.indices) { + val cpu = vm.cpus[i] + + // Limit each vCPU to at most an equal share of the host CPU + val actualUsage = min(vm.requestedUsage[i], cpu.frequency / vms.size) + + // Compute the fraction of compute time allocated to the VM + val fraction = actualUsage / usage[i] + + // Compute the burst time that the VM was actually granted + val grantedBurst = max(0, burst[i] - ceil(remainder[i] * fraction).toLong()) + + // Compute remaining burst time to be executed for the request + vm.requestedBurst[i] = max(0, vm.requestedBurst[i] - grantedBurst) + } + + if (vm.requestedBurst.any { it == 0L } || vm.requestedDeadline <= end) { + // Return vCPU `run` call: the requested burst was completed or deadline was exceeded + vm.chan.send(Unit) + } + } + } + + this.call = call + call.invokeOnCompletion { this.call = null } + } + + /** + * Flush the progress of the current active VMs. + */ + private fun flush() { + val call = call ?: return // If there is no active call, there is nothing to flush + // The progress is actually flushed in the coroutine when it notices we cancel it and wait for its + // completion. + call.cancel() + } + internal inner class VmServerContext( override var server: Server, val monitor: ServerMonitor, ctx: SimulationContext ) : ServerManagementContext { + val requestedBurst: LongArray = LongArray(server.flavor.cpuCount) + val requestedUsage: DoubleArray = DoubleArray(server.flavor.cpuCount) + var requestedDeadline: Long = 0L + var chan = Channel<Unit>(Channel.RENDEZVOUS) private var initialized: Boolean = false internal val job: Job = ctx.domain.launch { @@ -101,7 +210,7 @@ class HypervisorVirtDriver( } } - override val cpus: List<ProcessorContext> = scheduler.createVirtualCpus(server.flavor) + override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount) override suspend fun init() { if (initialized) { @@ -124,5 +233,25 @@ class HypervisorVirtDriver( vms.remove(this) monitors.forEach { it.onUpdate(vms.size, availableMemory) } } + + override suspend fun run(burst: LongArray, maxUsage: DoubleArray, deadline: Long): LongArray { + for (i in cpus.indices) { + requestedBurst[i] = if (i < burst.size) burst[i] else 0 + requestedUsage[i] = if (i < maxUsage.size) maxUsage[i] else 0.0 + } + requestedDeadline = deadline + + // Wait until the burst has been run or the coroutine is cancelled + try { + activeVms += this + reschedule() + chan.receive() + } catch (_: CancellationException) { + // On cancellation, we compute and return the remaining burst + } + activeVms -= this + reschedule() + return requestedBurst + } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt deleted file mode 100644 index 7b00d99c..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmScheduler.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.virt.driver.hypervisor - -import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.execution.ProcessorContext - -/** - * A scheduler that assigns virtual CPUs to virtual machines and maps them to physical CPUs. - */ -public interface VmScheduler { - /** - * Create the virtual CPUs for the specified [flavor]. - */ - fun createVirtualCpus(flavor: Flavor): List<ProcessorContext> -} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt deleted file mode 100644 index f232d695..00000000 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/VmSchedulerImpl.kt +++ /dev/null @@ -1,199 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.compute.virt.driver.hypervisor - -import com.atlarge.odcsim.simulationContext -import com.atlarge.opendc.compute.core.ProcessingUnit -import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.execution.ProcessorContext -import com.atlarge.opendc.compute.core.execution.ServerContext -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.Job -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min - -/** - * A basic implementation of the [VmScheduler] interface. - * - * @property hostContext The [ServerContext] of the host. - * @property hypervisorMonitor The [HypervisorMonitor] to inform with hypervisor scheduling events. - */ -public class VmSchedulerImpl( - private val hostContext: ServerContext, - private val hypervisorMonitor: HypervisorMonitor -) : VmScheduler { - /** - * The available physical CPUs to schedule. - */ - private val cpus = hostContext.cpus.map { HostProcessorContext(it, hostContext, hypervisorMonitor) } - - override fun createVirtualCpus(flavor: Flavor): List<ProcessorContext> { - // TODO At the moment, the first N cores get filled the first. Distribute over all cores instead - require(flavor.cpuCount <= cpus.size) { "Flavor cannot fit on machine" } - - return cpus - .asSequence() - .take(flavor.cpuCount) - .sortedBy { it.vcpus.size } - .map { VirtualProcessorContext(it) } - .toList() - } - - /** - * A wrapper around a host [ProcessorContext] that carries additional information about the vCPUs scheduled on the - * processor. - */ - internal class HostProcessorContext( - delegate: ProcessorContext, - private val hostContext: ServerContext, - private val hypervisorMonitor: HypervisorMonitor - ) : ProcessorContext by delegate { - /** - * The set of vCPUs scheduled on this processor. - */ - var vcpus: MutableSet<VirtualProcessorContext> = mutableSetOf() - - /** - * The deferred run call. - */ - var call: Job? = null - - /** - * Schedule the vCPUs on the physical CPU. - */ - suspend fun reschedule() { - flush() - - val vcpus = HashSet(vcpus) // Create snapshot of the vCPUs that were scheduled at this moment - val call = simulationContext.domain.launch { - var duration: Long = Long.MAX_VALUE - var deadline: Long = Long.MAX_VALUE - - for (vcpu in vcpus) { - // Limit each vCPU to at most an equal share of the host CPU - vcpu.actualUsage = min(vcpu.requestedUsage, info.frequency / vcpus.size) - - // The duration that we want to run is that of the shortest request from a vCPU - duration = min(duration, ceil(vcpu.requestedBurst / (vcpu.actualUsage * 1_000_000L)).toLong()) - deadline = min(deadline, vcpu.requestedDeadline) - } - - var burst: Long = 0 - var usage: Double = 0.0 - - for (vcpu in vcpus) { - vcpu.actualBurst = (duration * vcpu.actualUsage * 1_000_000L).toLong() - burst += vcpu.actualBurst - usage += vcpu.actualUsage - } - - // Ignore time slice if no work to request - if (burst <= 0L) { - return@launch - } - - // We run the total burst on the host processor. Note that this call may be cancelled at any moment in - // time, so not all of the burst may be executed. - val remainder = run(burst, usage, deadline) - val time = simulationContext.clock.millis() - val totalGrantedBurst: Long = burst - remainder - - // Compute for each vCPU the - for (vcpu in vcpus) { - // Compute the fraction of compute time allocated to the VM - val fraction = vcpu.actualUsage / usage - // Compute the burst time that the VM was actually granted - val grantedBurst = max(0, vcpu.actualBurst - ceil(remainder * fraction).toLong()) - // Compute remaining burst time to be executed for the request - vcpu.requestedBurst = max(0, vcpu.requestedBurst - grantedBurst) - - if (vcpu.requestedBurst == 0L || vcpu.requestedDeadline <= time) { - // Return vCPU `run` call: the requested burst was completed or deadline was exceeded - vcpu.chan.send(Unit) - } - } - - hypervisorMonitor.onSliceFinish( - time, - burst, - totalGrantedBurst, - vcpus.size, - hostContext.server - ) - } - - this.call = call - call.invokeOnCompletion { this.call = null } - } - - /** - * Flush the progress of the current active VMs. - */ - fun flush() { - val call = call ?: return // If there is no active call, there is nothing to flush - // The progress is actually flushed in the coroutine when it notices we cancel it and wait for its - // completion. - call.cancel() - } - } - - /** - * An implementation of [ProcessorContext] that delegates the work to a physical CPU. - */ - internal class VirtualProcessorContext(val host: HostProcessorContext) : ProcessorContext { - var actualBurst: Long = 0 - var actualUsage: Double = 0.0 - var requestedBurst: Long = 0 - var requestedUsage: Double = 0.0 - var requestedDeadline: Long = 0 - var chan = Channel<Unit>(Channel.RENDEZVOUS) - - override val info: ProcessingUnit - get() = host.info - - override suspend fun run(burst: Long, maxUsage: Double, deadline: Long): Long { - requestedBurst = burst - requestedUsage = maxUsage - requestedDeadline = deadline - - // Wait until the burst has been run or the coroutine is cancelled - try { - host.vcpus.add(this) - host.reschedule() - chan.receive() - } catch (_: CancellationException) { - // On cancellation, we compute and return the remaining burst - } - - host.vcpus.remove(this) - host.reschedule() - return requestedBurst - } - } -} diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index 6468a408..84b16b68 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -25,6 +25,7 @@ package com.atlarge.opendc.compute.metal.driver import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server @@ -53,15 +54,16 @@ internal class SimpleBareMetalDriverTest { root.launch { val dom = root.newDomain(name = "driver") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) - val cpus = List(5) { ProcessingUnit(cpuNode, it, 2400.0) } + val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } val driver = SimpleBareMetalDriver(UUID.randomUUID(), "test", cpus, emptyList(), dom) val monitor = object : ServerMonitor { override suspend fun onUpdate(server: Server, previousState: ServerState) { + println("[${simulationContext.clock.millis()}] $server") finalState = server.state } } - val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2) + val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000_000_000, 2) // Batch driver commands withContext(dom.coroutineContext) { diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt index 7e5a4bbe..6cfb2317 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorTest.kt @@ -69,8 +69,8 @@ internal class HypervisorTest { println("Hello World!") } }) - val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000_000, 1) - val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000_000, 1) + val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000_000_000, 1) + val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000_000_000, 1) val monitor = object : ServerMonitor { override suspend fun onUpdate(server: Server, previousState: ServerState) { println("[${simulationContext.clock.millis()}]: $server") @@ -80,12 +80,13 @@ internal class HypervisorTest { val driverDom = root.newDomain("driver") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) - val cpus = List(5) { ProcessingUnit(cpuNode, it, 2000.0) } + val cpus = List(4) { ProcessingUnit(cpuNode, it, 2000.0) } val metalDriver = SimpleBareMetalDriver(UUID.randomUUID(), "test", cpus, emptyList(), driverDom) metalDriver.init(monitor) metalDriver.setImage(vmm) metalDriver.setPower(PowerState.POWER_ON) + delay(5) val flavor = Flavor(1, 0) |
