diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-01-07 17:25:40 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-01-07 23:33:57 +0100 |
| commit | 9cf24c9a8d3e96a29d9b111081bc3369aadd490d (patch) | |
| tree | 4f378ee9f77d8623a67a403135a4010afd5f9000 /simulator/opendc-simulator | |
| parent | 74a4bff83bfb6366cc193d1fc9c4a07e49649649 (diff) | |
Refactor workflow service to schedule tasks onto VMs
This change updates the workflow service to delegate the resource
scheduling logic to the virtualized resource provisioner.
Diffstat (limited to 'simulator/opendc-simulator')
5 files changed, 530 insertions, 486 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 4340708f..5e50a676 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -251,7 +251,7 @@ public class SimBareMetalMachine( this.isEmpty = !nonEmpty this.totalUsage = totalUsage - this.minExit = minExit + this.minExit = if (isEmpty) 0 else minExit this.maxExit = maxExit } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt new file mode 100644 index 00000000..b88871a5 --- /dev/null +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimFairSharedHypervisor.kt @@ -0,0 +1,517 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.intrinsics.startCoroutineCancellable +import kotlinx.coroutines.selects.SelectClause0 +import kotlinx.coroutines.selects.SelectInstance +import kotlinx.coroutines.selects.select +import org.opendc.simulator.compute.interference.PerformanceInterferenceModel +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimWorkload +import java.time.Clock +import kotlin.math.ceil +import kotlin.math.max +import kotlin.math.min + +/** + * A [SimHypervisor] that distributes the computing requirements of multiple [SimWorkload] on a single + * [SimBareMetalMachine] concurrently using weighted fair sharing. + * + * @param coroutineScope The [CoroutineScope] to run the simulated workloads in. + * @param clock The virtual clock to track the simulation time. + */ +@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) +public class SimFairSharedHypervisor( + private val coroutineScope: CoroutineScope, + private val clock: Clock, + private val listener: SimHypervisor.Listener? = null +) : SimHypervisor { + /** + * A flag to indicate the driver is stopped. + */ + private var stopped: Boolean = false + + /** + * The channel for scheduling new CPU requests. + */ + private val schedulingQueue = Channel<SchedulerCommand>(Channel.UNLIMITED) + + /** + * Create a [SimMachine] instance on which users may run a [SimWorkload]. + * + * @param model The machine to create. + */ + override fun createMachine(model: SimMachineModel, performanceInterferenceModel: PerformanceInterferenceModel?): SimMachine { + val vm = VmSession(model, performanceInterferenceModel) + val vmCtx = VmExecutionContext(vm) + + return object : SimMachine { + override val model: SimMachineModel + get() = vmCtx.machine + + override val usage: StateFlow<Double> + get() = vm.usage + + /** + * The current active workload. + */ + private var activeWorkload: SimWorkload? = null + + override suspend fun run(workload: SimWorkload) { + require(activeWorkload == null) { "Run should not be called concurrently" } + + try { + activeWorkload = workload + workload.run(vmCtx) + } finally { + activeWorkload = null + } + } + + override fun toString(): String = "SimVirtualMachine" + } + } + + /** + * Run the scheduling process of the hypervisor. + */ + override suspend fun run(ctx: SimExecutionContext) { + val model = ctx.machine + val maxUsage = model.cpus.sumByDouble { it.frequency } + val pCPUs = model.cpus.indices.sortedBy { model.cpus[it].frequency } + + val vms = mutableSetOf<VmSession>() + val vcpus = mutableListOf<VCpu>() + + val usage = DoubleArray(model.cpus.size) + val burst = LongArray(model.cpus.size) + + fun process(command: SchedulerCommand) { + when (command) { + is SchedulerCommand.Schedule -> { + vms += command.vm + vcpus.addAll(command.vm.vcpus) + } + is SchedulerCommand.Deschedule -> { + vms -= command.vm + vcpus.removeAll(command.vm.vcpus) + } + is SchedulerCommand.Interrupt -> { + } + } + } + + fun processRemaining() { + var command = schedulingQueue.poll() + while (command != null) { + process(command) + command = schedulingQueue.poll() + } + } + + while (!stopped) { + // Wait for a request to be submitted if we have no work yet. + if (vcpus.isEmpty()) { + process(schedulingQueue.receive()) + } + + processRemaining() + + val start = clock.millis() + + var duration: Double = Double.POSITIVE_INFINITY + var deadline: Long = Long.MAX_VALUE + var availableUsage = maxUsage + var totalRequestedUsage = 0.0 + var totalRequestedBurst = 0L + + // Sort the vCPUs based on their requested usage + // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set + vcpus.sort() + + // Divide the available host capacity fairly across the vCPUs using max-min fair sharing + for ((i, req) in vcpus.withIndex()) { + val remaining = vcpus.size - i + val availableShare = availableUsage / remaining + val grantedUsage = min(req.limit, availableShare) + + // Take into account the minimum deadline of this slice before we possible continue + deadline = min(deadline, req.vm.deadline) + + // Ignore empty CPUs + if (grantedUsage <= 0 || req.burst <= 0) { + req.allocatedLimit = 0.0 + continue + } + + totalRequestedUsage += req.limit + totalRequestedBurst += req.burst + + req.allocatedLimit = grantedUsage + availableUsage -= grantedUsage + + // The duration that we want to run is that of the shortest request from a vCPU + duration = min(duration, req.burst / grantedUsage) + } + + val totalAllocatedUsage = maxUsage - availableUsage + var totalAllocatedBurst = 0L + availableUsage = totalAllocatedUsage + val serverLoad = totalAllocatedUsage / maxUsage + + // / XXX Ceil duration to eliminate rounding issues + duration = ceil(duration) + + // Divide the requests over the available capacity of the pCPUs fairly + for (i in pCPUs) { + val maxCpuUsage = model.cpus[i].frequency + val fraction = maxCpuUsage / maxUsage + val grantedUsage = min(maxCpuUsage, totalAllocatedUsage * fraction) + val grantedBurst = ceil(duration * grantedUsage).toLong() + + usage[i] = grantedUsage + burst[i] = grantedBurst + totalAllocatedBurst += grantedBurst + availableUsage -= grantedUsage + } + + // XXX If none of the VMs require any computation, wait until their deadline, otherwise trigger on the + // first vCPU finished. + val triggerMode = + if (totalAllocatedBurst > 0 && totalAllocatedUsage > 0.0) + SimExecutionContext.TriggerMode.FIRST + else + SimExecutionContext.TriggerMode.DEADLINE + + // We run the total burst on the host processor. Note that this call may be cancelled at any moment in + // time, so not all of the burst may be executed. + val isInterrupted = select<Boolean> { + schedulingQueue.onReceive { schedulingQueue.offer(it); true } + ctx.onRun(SimExecutionContext.Slice(burst, usage, deadline), triggerMode) + .invoke { false } + } + + val end = clock.millis() + + // The total requested burst that the VMs wanted to run in the time-frame that we ran. + val totalRequestedSubBurst = + vcpus.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum() + val totalRemainder = burst.sum() + val totalGrantedBurst = totalAllocatedBurst - totalRemainder + + // The burst that was lost due to overcommissioning of CPU resources + var totalOvercommissionedBurst = 0L + // The burst that was lost due to interference. + var totalInterferedBurst = 0L + + val vmIterator = vms.iterator() + while (vmIterator.hasNext()) { + val vm = vmIterator.next() + + // Apply performance interference model + val performanceScore = vm.performanceInterferenceModel?.apply(serverLoad) ?: 1.0 + var hasFinished = false + + for (vcpu in vm.vcpus) { + // Compute the fraction of compute time allocated to the VM + val fraction = vcpu.allocatedLimit / totalAllocatedUsage + + // Compute the burst time that the VM was actually granted + val grantedBurst = ceil(totalGrantedBurst * fraction).toLong() + + // The burst that was actually used by the VM + val usedBurst = ceil(grantedBurst * performanceScore).toLong() + + totalInterferedBurst += grantedBurst - usedBurst + + // Compute remaining burst time to be executed for the request + if (vcpu.consume(usedBurst)) { + hasFinished = true + } else if (vm.deadline <= end) { + // Request must have its entire burst consumed or otherwise we have overcommission + // Note that we count the overcommissioned burst if the hypervisor has failed. + totalOvercommissionedBurst += vcpu.burst + } + } + + if (hasFinished || vm.deadline <= end) { + // Mark the VM as finished and deschedule the VMs if needed + if (vm.finish()) { + vmIterator.remove() + vcpus.removeAll(vm.vcpus) + } + } + } + + listener?.onSliceFinish( + this, + totalRequestedBurst, + min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing + totalOvercommissionedBurst, + totalInterferedBurst, // Might be smaller than zero due to FP rounding errors, + min( + totalAllocatedUsage, + totalRequestedUsage + ), // The allocated usage might be slightly higher due to FP rounding + totalRequestedUsage + ) + } + } + + /** + * A scheduling command processed by the scheduler. + */ + private sealed class SchedulerCommand { + /** + * Schedule the specified VM on the hypervisor. + */ + data class Schedule(val vm: VmSession) : SchedulerCommand() + + /** + * De-schedule the specified VM on the hypervisor. + */ + data class Deschedule(val vm: VmSession) : SchedulerCommand() + + /** + * Interrupt the scheduler. + */ + object Interrupt : SchedulerCommand() + } + + /** + * A virtual machine running on the hypervisor. + * + * @param ctx The execution context the vCPU runs in. + * @param triggerMode The mode when to trigger the VM exit. + * @param merge The function to merge consecutive slices on spillover. + * @param select The function to select on finish. + */ + @OptIn(InternalCoroutinesApi::class) + private data class VmSession( + val model: SimMachineModel, + val performanceInterferenceModel: PerformanceInterferenceModel? = null, + var triggerMode: SimExecutionContext.TriggerMode = SimExecutionContext.TriggerMode.FIRST, + var merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice = { _, r -> r }, + var select: () -> Unit = {} + ) { + /** + * The vCPUs of this virtual machine. + */ + val vcpus: List<VCpu> + + /** + * The slices that the VM wants to run. + */ + var queue: Iterator<SimExecutionContext.Slice> = emptyList<SimExecutionContext.Slice>().iterator() + + /** + * The current active slice. + */ + var activeSlice: SimExecutionContext.Slice? = null + + /** + * The current deadline of the VM. + */ + val deadline: Long + get() = activeSlice?.deadline ?: Long.MAX_VALUE + + /** + * A flag to indicate that the VM is idle. + */ + val isIdle: Boolean + get() = activeSlice == null + + /** + * The usage of the virtual machine. + */ + val usage: MutableStateFlow<Double> = MutableStateFlow(0.0) + + init { + vcpus = model.cpus.mapIndexed { i, model -> VCpu(this, model, i) } + } + + /** + * Schedule the given slices on this vCPU, replacing the existing slices. + */ + fun schedule(slices: Sequence<SimExecutionContext.Slice>) { + queue = slices.iterator() + + if (queue.hasNext()) { + activeSlice = queue.next() + refresh() + } + } + + /** + * Cancel the existing workload on the VM. + */ + fun cancel() { + queue = emptyList<SimExecutionContext.Slice>().iterator() + activeSlice = null + refresh() + } + + /** + * Finish the current slice of the VM. + * + * @return `true` if the vCPUs may be descheduled, `false` otherwise. + */ + fun finish(): Boolean { + val activeSlice = activeSlice ?: return true + + return if (queue.hasNext()) { + val needsMerge = activeSlice.burst.any { it > 0 } + val candidateSlice = queue.next() + val slice = if (needsMerge) merge(activeSlice, candidateSlice) else candidateSlice + + this.activeSlice = slice + + // Update the vCPU cache + refresh() + + false + } else { + this.activeSlice = null + select() + true + } + } + + /** + * Refresh the vCPU cache. + */ + fun refresh() { + vcpus.forEach { it.refresh() } + usage.value = vcpus.sumByDouble { it.burst / it.limit } / vcpus.size + } + } + + /** + * A virtual CPU that can be scheduled on a physical CPU. + * + * @param vm The VM of which this vCPU is part. + * @param model The model of CPU that this vCPU models. + * @param id The id of the vCPU with respect to the VM. + */ + private data class VCpu( + val vm: VmSession, + val model: ProcessingUnit, + val id: Int + ) : Comparable<VCpu> { + /** + * The current limit on the vCPU. + */ + var limit: Double = 0.0 + + /** + * The limit allocated by the hypervisor. + */ + var allocatedLimit: Double = 0.0 + + /** + * The current burst running on the vCPU. + */ + var burst: Long = 0L + + /** + * Consume the specified burst on this vCPU. + */ + fun consume(burst: Long): Boolean { + this.burst = max(0, this.burst - burst) + + // Flush the result to the slice if it exists + vm.activeSlice?.burst?.takeIf { id < it.size }?.set(id, this.burst) + + val actuallyExists = vm.activeSlice?.burst?.let { id < it.size } ?: false + return actuallyExists && this.burst == 0L + } + + /** + * Refresh the information of this vCPU based on the current slice. + */ + fun refresh() { + limit = vm.activeSlice?.limit?.takeIf { id < it.size }?.get(id) ?: 0.0 + burst = vm.activeSlice?.burst?.takeIf { id < it.size }?.get(id) ?: 0 + } + + /** + * Compare to another vCPU based on the current load of the vCPU. + */ + override fun compareTo(other: VCpu): Int { + return limit.compareTo(other.limit) + } + + /** + * Create a string representation of the vCPU. + */ + override fun toString(): String = + "vCPU(id=$id,burst=$burst,limit=$limit,allocatedLimit=$allocatedLimit)" + } + + /** + * The execution context in which a VM runs. + * + */ + private inner class VmExecutionContext(val session: VmSession) : + SimExecutionContext, DisposableHandle { + override val machine: SimMachineModel + get() = session.model + + override val clock: Clock + get() = this@SimFairSharedHypervisor.clock + + @OptIn(InternalCoroutinesApi::class) + override fun onRun( + batch: Sequence<SimExecutionContext.Slice>, + triggerMode: SimExecutionContext.TriggerMode, + merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice + ): SelectClause0 = object : SelectClause0 { + @InternalCoroutinesApi + override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) { + session.triggerMode = triggerMode + session.merge = merge + session.select = { + if (select.trySelect()) { + block.startCoroutineCancellable(select.completion) + } + } + session.schedule(batch) + // Indicate to the hypervisor that the VM should be re-scheduled + schedulingQueue.offer(SchedulerCommand.Schedule(session)) + select.disposeOnSelect(this@VmExecutionContext) + } + } + + override fun dispose() { + if (!session.isIdle) { + session.cancel() + schedulingQueue.offer(SchedulerCommand.Deschedule(session)) + } + } + } +} diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt index 6087227b..fb4cd137 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimHypervisor.kt @@ -22,267 +22,23 @@ package org.opendc.simulator.compute -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.intrinsics.startCoroutineCancellable -import kotlinx.coroutines.selects.SelectClause0 -import kotlinx.coroutines.selects.SelectInstance -import kotlinx.coroutines.selects.select import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload -import java.time.Clock -import kotlin.math.ceil -import kotlin.math.max -import kotlin.math.min /** - * SimHypervisor distributes the computing requirements of multiple [SimWorkload] on a single [SimBareMetalMachine] concurrently. - * - * @param coroutineScope The [CoroutineScope] to run the simulated workloads in. - * @param clock The virtual clock to track the simulation time. + * SimHypervisor distributes the computing requirements of multiple [SimWorkload] on a single [SimBareMetalMachine] i + * concurrently. */ -@OptIn(ExperimentalCoroutinesApi::class, InternalCoroutinesApi::class) -public class SimHypervisor( - private val coroutineScope: CoroutineScope, - private val clock: Clock, - private val listener: Listener? = null -) : SimWorkload { - /** - * A set for tracking the VM context objects. - */ - private val vms: MutableSet<VmExecutionContext> = mutableSetOf() - - /** - * A flag to indicate the driver is stopped. - */ - private var stopped: Boolean = false - - /** - * The channel for scheduling new CPU requests. - */ - private val schedulingQueue = Channel<SchedulerCommand>(Channel.UNLIMITED) - +public interface SimHypervisor : SimWorkload { /** * Create a [SimMachine] instance on which users may run a [SimWorkload]. * * @param model The machine to create. */ - public fun createMachine(model: SimMachineModel, performanceInterferenceModel: PerformanceInterferenceModel? = null): SimMachine { - val vm = VmSession(model, performanceInterferenceModel) - val vmCtx = VmExecutionContext(vm) - - return object : SimMachine { - override val model: SimMachineModel - get() = vmCtx.machine - - override val usage: StateFlow<Double> - get() = vm.usage - - /** - * The current active workload. - */ - private var activeWorkload: SimWorkload? = null - - override suspend fun run(workload: SimWorkload) { - require(activeWorkload == null) { "Run should not be called concurrently" } - - try { - activeWorkload = workload - workload.run(vmCtx) - } finally { - activeWorkload = null - } - } - - override fun toString(): String = "SimVirtualMachine" - } - } - - /** - * Run the scheduling process of the hypervisor. - */ - override suspend fun run(ctx: SimExecutionContext) { - val model = ctx.machine - val maxUsage = model.cpus.sumByDouble { it.frequency } - val pCPUs = model.cpus.indices.sortedBy { model.cpus[it].frequency } - - val vms = mutableSetOf<VmSession>() - val vcpus = mutableListOf<VCpu>() - - val usage = DoubleArray(model.cpus.size) - val burst = LongArray(model.cpus.size) - - fun process(command: SchedulerCommand) { - when (command) { - is SchedulerCommand.Schedule -> { - vms += command.vm - vcpus.addAll(command.vm.vcpus) - } - is SchedulerCommand.Deschedule -> { - vms -= command.vm - vcpus.removeAll(command.vm.vcpus) - } - is SchedulerCommand.Interrupt -> { - } - } - } - - fun processRemaining() { - var command = schedulingQueue.poll() - while (command != null) { - process(command) - command = schedulingQueue.poll() - } - } - - while (!stopped) { - // Wait for a request to be submitted if we have no work yet. - if (vcpus.isEmpty()) { - process(schedulingQueue.receive()) - } - - processRemaining() - - val start = clock.millis() - - var duration: Double = Double.POSITIVE_INFINITY - var deadline: Long = Long.MAX_VALUE - var availableUsage = maxUsage - var totalRequestedUsage = 0.0 - var totalRequestedBurst = 0L - - // Sort the vCPUs based on their requested usage - // Profiling shows that it is faster to sort every slice instead of maintaining some kind of sorted set - vcpus.sort() - - // Divide the available host capacity fairly across the vCPUs using max-min fair sharing - for ((i, req) in vcpus.withIndex()) { - val remaining = vcpus.size - i - val availableShare = availableUsage / remaining - val grantedUsage = min(req.limit, availableShare) - - // Take into account the minimum deadline of this slice before we possible continue - deadline = min(deadline, req.vm.deadline) - - // Ignore empty CPUs - if (grantedUsage <= 0 || req.burst <= 0) { - req.allocatedLimit = 0.0 - continue - } - - totalRequestedUsage += req.limit - totalRequestedBurst += req.burst - - req.allocatedLimit = grantedUsage - availableUsage -= grantedUsage - - // The duration that we want to run is that of the shortest request from a vCPU - duration = min(duration, req.burst / grantedUsage) - } - - // XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs. - duration = 300.0 - - val totalAllocatedUsage = maxUsage - availableUsage - var totalAllocatedBurst = 0L - availableUsage = totalAllocatedUsage - val serverLoad = totalAllocatedUsage / maxUsage - - // Divide the requests over the available capacity of the pCPUs fairly - for (i in pCPUs) { - val maxCpuUsage = model.cpus[i].frequency - val fraction = maxCpuUsage / maxUsage - val grantedUsage = min(maxCpuUsage, totalAllocatedUsage * fraction) - val grantedBurst = ceil(duration * grantedUsage).toLong() - - usage[i] = grantedUsage - burst[i] = grantedBurst - totalAllocatedBurst += grantedBurst - availableUsage -= grantedUsage - } - - // We run the total burst on the host processor. Note that this call may be cancelled at any moment in - // time, so not all of the burst may be executed. - select<Boolean> { - schedulingQueue.onReceive { schedulingQueue.offer(it); true } - ctx.onRun(SimExecutionContext.Slice(burst, usage, deadline), SimExecutionContext.TriggerMode.DEADLINE) - .invoke { false } - } - - val end = clock.millis() - - // No work was performed - if ((end - start) <= 0) { - continue - } - - // The total requested burst that the VMs wanted to run in the time-frame that we ran. - val totalRequestedSubBurst = - vcpus.map { ceil((duration * 1000) / (it.vm.deadline - start) * it.burst).toLong() }.sum() - val totalRemainder = burst.sum() - val totalGrantedBurst = totalAllocatedBurst - totalRemainder - - // The burst that was lost due to overcommissioning of CPU resources - var totalOvercommissionedBurst = 0L - // The burst that was lost due to interference. - var totalInterferedBurst = 0L - - val vmIterator = vms.iterator() - while (vmIterator.hasNext()) { - val vm = vmIterator.next() - - // Apply performance interference model - val performanceScore = vm.performanceInterferenceModel?.apply(serverLoad) ?: 1.0 - var hasFinished = false - - for (vcpu in vm.vcpus) { - // Compute the fraction of compute time allocated to the VM - val fraction = vcpu.allocatedLimit / totalAllocatedUsage - - // Compute the burst time that the VM was actually granted - val grantedBurst = ceil(totalGrantedBurst * fraction).toLong() - - // The burst that was actually used by the VM - val usedBurst = ceil(grantedBurst * performanceScore).toLong() - - totalInterferedBurst += grantedBurst - usedBurst - - // Compute remaining burst time to be executed for the request - if (vcpu.consume(usedBurst)) { - hasFinished = true - } else if (vm.deadline <= end) { - // Request must have its entire burst consumed or otherwise we have overcommission - // Note that we count the overcommissioned burst if the hypervisor has failed. - totalOvercommissionedBurst += vcpu.burst - } - } - - if (hasFinished || vm.deadline <= end) { - // Mark the VM as finished and deschedule the VMs if needed - if (vm.finish()) { - vmIterator.remove() - vcpus.removeAll(vm.vcpus) - } - } - } - - listener?.onSliceFinish( - this, - totalRequestedBurst, - min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing - totalOvercommissionedBurst, - totalInterferedBurst, // Might be smaller than zero due to FP rounding errors, - min( - totalAllocatedUsage, - totalRequestedUsage - ), // The allocated usage might be slightly higher due to FP rounding - totalRequestedUsage - ) - } - } + public fun createMachine( + model: SimMachineModel, + performanceInterferenceModel: PerformanceInterferenceModel? = null + ): SimMachine /** * Event listener for hypervisor events. @@ -301,235 +57,4 @@ public class SimHypervisor( cpuDemand: Double ) } - - /** - * A scheduling command processed by the scheduler. - */ - private sealed class SchedulerCommand { - /** - * Schedule the specified VM on the hypervisor. - */ - data class Schedule(val vm: VmSession) : SchedulerCommand() - - /** - * De-schedule the specified VM on the hypervisor. - */ - data class Deschedule(val vm: VmSession) : SchedulerCommand() - - /** - * Interrupt the scheduler. - */ - object Interrupt : SchedulerCommand() - } - - /** - * A virtual machine running on the hypervisor. - * - * @param ctx The execution context the vCPU runs in. - * @param triggerMode The mode when to trigger the VM exit. - * @param merge The function to merge consecutive slices on spillover. - * @param select The function to select on finish. - */ - @OptIn(InternalCoroutinesApi::class) - private data class VmSession( - val model: SimMachineModel, - val performanceInterferenceModel: PerformanceInterferenceModel? = null, - var triggerMode: SimExecutionContext.TriggerMode = SimExecutionContext.TriggerMode.FIRST, - var merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice = { _, r -> r }, - var select: () -> Unit = {} - ) { - /** - * The vCPUs of this virtual machine. - */ - val vcpus: List<VCpu> - - /** - * The slices that the VM wants to run. - */ - var queue: Iterator<SimExecutionContext.Slice> = emptyList<SimExecutionContext.Slice>().iterator() - - /** - * The current active slice. - */ - var activeSlice: SimExecutionContext.Slice? = null - - /** - * The current deadline of the VM. - */ - val deadline: Long - get() = activeSlice?.deadline ?: Long.MAX_VALUE - - /** - * A flag to indicate that the VM is idle. - */ - val isIdle: Boolean - get() = activeSlice == null - - /** - * The usage of the virtual machine. - */ - val usage: MutableStateFlow<Double> = MutableStateFlow(0.0) - - init { - vcpus = model.cpus.mapIndexed { i, model -> VCpu(this, model, i) } - } - - /** - * Schedule the given slices on this vCPU, replacing the existing slices. - */ - fun schedule(slices: Sequence<SimExecutionContext.Slice>) { - queue = slices.iterator() - - if (queue.hasNext()) { - activeSlice = queue.next() - refresh() - } - } - - /** - * Cancel the existing workload on the VM. - */ - fun cancel() { - queue = emptyList<SimExecutionContext.Slice>().iterator() - activeSlice = null - refresh() - } - - /** - * Finish the current slice of the VM. - * - * @return `true` if the vCPUs may be descheduled, `false` otherwise. - */ - fun finish(): Boolean { - val activeSlice = activeSlice ?: return true - - return if (queue.hasNext()) { - val needsMerge = activeSlice.burst.any { it > 0 } - val candidateSlice = queue.next() - val slice = if (needsMerge) merge(activeSlice, candidateSlice) else candidateSlice - - this.activeSlice = slice - - // Update the vCPU cache - refresh() - - false - } else { - this.activeSlice = null - select() - true - } - } - - /** - * Refresh the vCPU cache. - */ - fun refresh() { - vcpus.forEach { it.refresh() } - usage.value = vcpus.sumByDouble { it.burst / it.limit } / vcpus.size - } - } - - /** - * A virtual CPU that can be scheduled on a physical CPU. - * - * @param vm The VM of which this vCPU is part. - * @param model The model of CPU that this vCPU models. - * @param id The id of the vCPU with respect to the VM. - */ - private data class VCpu( - val vm: VmSession, - val model: ProcessingUnit, - val id: Int - ) : Comparable<VCpu> { - /** - * The current limit on the vCPU. - */ - var limit: Double = 0.0 - - /** - * The limit allocated by the hypervisor. - */ - var allocatedLimit: Double = 0.0 - - /** - * The current burst running on the vCPU. - */ - var burst: Long = 0L - - /** - * Consume the specified burst on this vCPU. - */ - fun consume(burst: Long): Boolean { - this.burst = max(0, this.burst - burst) - - // Flush the result to the slice if it exists - vm.activeSlice?.burst?.takeIf { id < it.size }?.set(id, this.burst) - - return allocatedLimit > 0.0 && this.burst == 0L - } - - /** - * Refresh the information of this vCPU based on the current slice. - */ - fun refresh() { - limit = vm.activeSlice?.limit?.takeIf { id < it.size }?.get(id) ?: 0.0 - burst = vm.activeSlice?.burst?.takeIf { id < it.size }?.get(id) ?: 0 - } - - /** - * Compare to another vCPU based on the current load of the vCPU. - */ - override fun compareTo(other: VCpu): Int { - return limit.compareTo(other.limit) - } - - /** - * Create a string representation of the vCPU. - */ - override fun toString(): String = - "vCPU(id=$id,burst=$burst,limit=$limit,allocatedLimit=$allocatedLimit)" - } - - /** - * The execution context in which a VM runs. - * - */ - private inner class VmExecutionContext(val session: VmSession) : - SimExecutionContext, DisposableHandle { - override val machine: SimMachineModel - get() = session.model - - override val clock: Clock - get() = this@SimHypervisor.clock - - @OptIn(InternalCoroutinesApi::class) - override fun onRun( - batch: Sequence<SimExecutionContext.Slice>, - triggerMode: SimExecutionContext.TriggerMode, - merge: (SimExecutionContext.Slice, SimExecutionContext.Slice) -> SimExecutionContext.Slice - ): SelectClause0 = object : SelectClause0 { - @InternalCoroutinesApi - override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) { - session.triggerMode = triggerMode - session.merge = merge - session.select = { - if (select.trySelect()) { - block.startCoroutineCancellable(select.completion) - } - } - session.schedule(batch) - // Indicate to the hypervisor that the VM should be re-scheduled - schedulingQueue.offer(SchedulerCommand.Schedule(session)) - select.disposeOnSelect(this@VmExecutionContext) - } - } - - override fun dispose() { - if (!session.isIdle) { - session.cancel() - schedulingQueue.offer(SchedulerCommand.Deschedule(session)) - } - } - } } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index 918a78bd..0d2c9374 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -39,7 +39,7 @@ public class SimFlopsWorkload( public val utilization: Double = 0.8 ) : SimWorkload { init { - require(flops >= 0) { "Negative number of flops" } + require(flops >= 0) { "Negative number of FLOPs" } require(cores > 0) { "Negative number of cores or no cores" } require(utilization > 0.0 && utilization <= 1.0) { "Utilization must be in (0, 1]" } } @@ -54,4 +54,6 @@ public class SimFlopsWorkload( ctx.run(SimExecutionContext.Slice(burst, maxUsage, Long.MAX_VALUE), triggerMode = SimExecutionContext.TriggerMode.LAST) } + + override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,cores=$cores,utilization=$utilization)" } diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt index 78bd2940..e7fdd4b2 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimHypervisorTest.kt @@ -105,7 +105,7 @@ internal class SimHypervisorTest { ) val machine = SimBareMetalMachine(scope, clock, machineModel) - val hypervisor = SimHypervisor(scope, clock, listener) + val hypervisor = SimFairSharedHypervisor(scope, clock, listener) launch { machine.run(hypervisor) @@ -120,7 +120,7 @@ internal class SimHypervisorTest { assertAll( { Assertions.assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") }, - { Assertions.assertEquals(2073600, listener.totalRequestedBurst, "Requested Burst does not match") }, + { Assertions.assertEquals(2082000, listener.totalRequestedBurst, "Requested Burst does not match") }, { Assertions.assertEquals(2013600, listener.totalGrantedBurst, "Granted Burst does not match") }, { Assertions.assertEquals(60000, listener.totalOvercommissionedBurst, "Overcommissioned Burst does not match") }, { Assertions.assertEquals(1200001, scope.currentTime) } |
