From 02864ba50fafffd19bb1b635eea06004d9fd78aa Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 6 Apr 2020 16:36:49 +0200 Subject: perf: Optimize SimpleVirtDriver This change optimizes the SimpleVirtDriver by removing unnecessary cancellations (which take a lot of time due to exception construction). Moreover, we now try to keep intermediate state sorted so that we do not have to re-sort every scheduling cycle (which was rather heavy as profiling showed). --- .../opendc/compute/core/execution/ServerContext.kt | 22 ++- .../compute/metal/driver/SimpleBareMetalDriver.kt | 130 ++++++++------ .../opendc/compute/virt/driver/SimpleVirtDriver.kt | 196 ++++++++++++++------- .../opendc/experiments/sc20/TestExperiment.kt | 2 + 4 files changed, 234 insertions(+), 116 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt index e0a491c8..663fa5e4 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ServerContext.kt @@ -28,6 +28,8 @@ import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.core.services.ServiceKey +import kotlinx.coroutines.selects.SelectClause0 +import kotlinx.coroutines.selects.select /** * Represents the execution context in which a bootable [Image] runs on a [Server]. @@ -62,5 +64,23 @@ public interface ServerContext { * @param limit The maximum usage in terms of MHz that the processing core may use while running the burst. * @param deadline The instant at which this request needs to be fulfilled. */ - public suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) + public suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { + select { onRun(burst, limit, deadline).invoke {} } + } + + /** + * Request the specified burst time from the processor cores and suspend execution until a processor core finishes + * processing a **non-zero** burst or until the deadline is reached. + * + * After the method returns, [burst] will contain the remaining burst length for each of the cores (which may be + * zero). + * + * Both [burst] and [limit] must be of the same size and in any other case the method will throw an + * [IllegalArgumentException]. + * + * @param burst The burst time to request from each of the processor cores. + * @param limit The maximum usage in terms of MHz that the processing core may use while running the burst. + * @param deadline The instant at which this request needs to be fulfilled. + */ + public fun onRun(burst: LongArray, limit: DoubleArray, deadline: Long): SelectClause0 } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 37ae9eb5..e3cb6e35 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -25,9 +25,9 @@ package com.atlarge.opendc.compute.metal.driver import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.flow.StateFlow -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.Flavor @@ -36,7 +36,6 @@ import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.execution.ServerManagementContext import com.atlarge.opendc.compute.core.execution.ShutdownException -import com.atlarge.opendc.compute.core.execution.assertFailure import com.atlarge.opendc.compute.core.image.EmptyImage import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.metal.Node @@ -46,18 +45,23 @@ import com.atlarge.opendc.compute.metal.power.ConstantPowerModel import com.atlarge.opendc.core.power.PowerModel import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry -import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job +import kotlinx.coroutines.Delay +import kotlinx.coroutines.DisposableHandle +import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.intrinsics.startCoroutineCancellable import kotlinx.coroutines.launch +import kotlinx.coroutines.selects.SelectClause0 +import kotlinx.coroutines.selects.SelectInstance import java.util.UUID import kotlin.math.ceil import kotlin.math.max import kotlin.math.min import kotlinx.coroutines.withContext import java.lang.Exception +import kotlin.coroutines.ContinuationInterceptor /** * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. @@ -242,64 +246,78 @@ public class SimpleBareMetalDriver( setNode(nodeState.value.copy(state = newNodeState, server = server)) } - private var flush: Job? = null + private var flush: DisposableHandle? = null - override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { + @OptIn(InternalCoroutinesApi::class) + override fun onRun(burst: LongArray, limit: DoubleArray, deadline: Long): SelectClause0 { require(burst.size == limit.size) { "Array dimensions do not match" } assert(!finalized) { "Server instance is already finalized" } - // If run is called in at the same timestamp as the previous call, cancel the load flush - flush?.cancel() - flush = null - - val start = simulationContext.clock.millis() - var duration = max(0, deadline - start) - var totalUsage = 0.0 - - // Determine the duration of the first CPU to finish - for (i in 0 until min(cpus.size, burst.size)) { - val cpu = cpus[i] - val usage = min(limit[i], cpu.frequency) - val cpuDuration = ceil(burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds - - totalUsage += usage / cpu.frequency - - if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst - duration = min(duration, cpuDuration) + return object : SelectClause0 { + @InternalCoroutinesApi + override fun registerSelectClause0(select: SelectInstance, block: suspend () -> R) { + // If run is called in at the same timestamp as the previous call, cancel the load flush + flush?.dispose() + flush = null + + val context = select.completion.context + val simulationContext = context[SimulationContext]!! + val delay = context[ContinuationInterceptor] as Delay + + val start = simulationContext.clock.millis() + var duration = max(0, deadline - start) + var totalUsage = 0.0 + + // Determine the duration of the first CPU to finish + for (i in 0 until min(cpus.size, burst.size)) { + val cpu = cpus[i] + val usage = min(limit[i], cpu.frequency) + val cpuDuration = ceil(burst[i] / usage * 1000).toLong() // Convert from seconds to milliseconds + + totalUsage += usage / cpu.frequency + + if (cpuDuration != 0L) { // We only wait for processor cores with a non-zero burst + duration = min(duration, cpuDuration) + } + } + + if (!unavailable) { + usageState.value = totalUsage / cpus.size + } + + val action = Runnable { + // todo: we could have replaced startCoroutine with startCoroutineUndispatched + // But we need a way to know that Delay.invokeOnTimeout had used the right thread + if (select.trySelect()) { + block.startCoroutineCancellable(select.completion) // shall be cancellable while waits for dispatch + } + } + + val disposable = delay.invokeOnTimeout(duration, action) + val flush = DisposableHandle { + val end = simulationContext.clock.millis() + + // Flush the load if they do not receive a new run call for the same timestamp + flush = delay.invokeOnTimeout(1, Runnable { + usageState.value = 0.0 + flush = null + }) + + if (!unavailable) { + // Write back the remaining burst time + for (i in 0 until min(cpus.size, burst.size)) { + val usage = min(limit[i], cpus[i].frequency) + val granted = ceil((end - start) / 1000.0 * usage).toLong() + burst[i] = max(0, burst[i] - granted) + } + } + + disposable.dispose() + } + + select.disposeOnSelect(flush) } } - - if (!unavailable) { - usageState.value = totalUsage / cpus.size - } - - try { - delay(duration) - } catch (e: CancellationException) { - // On non-failure cancellation, we compute and return the remaining burst - e.assertFailure() - } - val end = simulationContext.clock.millis() - - // Flush the load if they do not receive a new run call for the same timestamp - flush = domain.launch(job) { - delay(1) - usageState.value = 0.0 - } - flush!!.invokeOnCompletion { - flush = null - } - - if (unavailable) { - return - } - - // Write back the remaining burst time - for (i in 0 until min(cpus.size, burst.size)) { - val usage = min(limit[i], cpus[i].frequency) - val granted = ceil((end - start) / 1000.0 * usage).toLong() - burst[i] = max(0, burst[i] - granted) - } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index c21a9fc0..4939a624 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -46,7 +46,9 @@ import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.Job +import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow @@ -54,6 +56,12 @@ import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch +import kotlinx.coroutines.selects.SelectClause0 +import kotlinx.coroutines.selects.select +import kotlinx.coroutines.withContext +import java.lang.Exception +import java.util.Objects +import java.util.TreeSet import java.util.UUID import kotlin.math.ceil import kotlin.math.max @@ -99,6 +107,17 @@ class SimpleVirtDriver( performanceModel?.computeIntersectingItems(imagesRunning) } }.launchIn(this) + + launch { + try { + scheduler() + } catch (e: Exception) { + if (e !is CancellationException) { + simulationContext.log.error("Hypervisor scheduler failed", e) + } + throw e + } + } } override suspend fun spawn( @@ -128,44 +147,75 @@ class SimpleVirtDriver( } /** - * A flag to indicate the driver is stopped. + * A scheduling command processed by the scheduler. */ - private var stopped: Boolean = false + private sealed class SchedulerCommand { + /** + * Schedule the specified vCPUs of a single VM. + */ + data class Schedule(val vm: VmServerContext, val requests: Collection) : SchedulerCommand() + + /** + * Interrupt the scheduler. + */ + object Interrupt : SchedulerCommand() + } /** - * The set of [VmServerContext] instances that is being scheduled at the moment. + * A flag to indicate the driver is stopped. */ - private val activeVms = mutableSetOf() + private var stopped: Boolean = false /** - * The deferred run call. + * The channel for scheduling new CPU requests. */ - private var call: Job? = null + private val schedulingQueue = Channel(Channel.UNLIMITED) /** - * Schedule the vCPUs on the physical CPUs. + * The scheduling process of the hypervisor. */ - private fun reschedule() { - flush() + private suspend fun scheduler() { + val clock = simulationContext.clock + val maxUsage = hostContext.cpus.sumByDouble { it.frequency } + val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency } + + val vms = mutableMapOf>() + val requests = TreeSet() + + val usage = DoubleArray(hostContext.cpus.size) + val burst = LongArray(hostContext.cpus.size) + + fun process(command: SchedulerCommand) { + when (command) { + is SchedulerCommand.Schedule -> { + vms[command.vm] = command.requests + requests.removeAll(command.requests) + requests.addAll(command.requests) + } + } + } - // Do not schedule a call if there is no work to schedule or the driver stopped. - if (stopped || activeVms.isEmpty()) { - return + fun processRemaining() { + var command = schedulingQueue.poll() + while (command != null) { + process(command) + command = schedulingQueue.poll() + } } - val call = launch { - val start = simulationContext.clock.millis() - val vms = activeVms.toSet() + while (!stopped) { + // Wait for a request to be submitted if we have no work yet. + if (requests.isEmpty()) { + process(schedulingQueue.receive()) + } + + processRemaining() + + val start = clock.millis() var duration: Double = Double.POSITIVE_INFINITY var deadline: Long = Long.MAX_VALUE - - val maxUsage = hostContext.cpus.sumByDouble { it.frequency } var availableUsage = maxUsage - val requests = vms.asSequence() - .flatMap { it.requests.asSequence() } - .sortedBy { it.limit } - .toList() // Divide the available host capacity fairly across the vCPUs using max-min fair sharing for ((i, req) in requests.withIndex()) { @@ -177,48 +227,55 @@ class SimpleVirtDriver( availableUsage -= grantedUsage // The duration that we want to run is that of the shortest request from a vCPU - duration = min(duration, req.burst / req.allocatedUsage) + duration = min(duration, req.burst / grantedUsage) deadline = min(deadline, req.vm.deadline) } - val usage = DoubleArray(hostContext.cpus.size) - val burst = LongArray(hostContext.cpus.size) val totalUsage = maxUsage - availableUsage + var totalBurst = 0L availableUsage = totalUsage val serverLoad = totalUsage / maxUsage // Divide the requests over the available capacity of the pCPUs fairly - for (i in hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency }) { + for (i in pCPUs) { val remaining = hostContext.cpus.size - i val availableShare = availableUsage / remaining val grantedUsage = min(hostContext.cpus[i].frequency, availableShare) + val pBurst = (duration * grantedUsage).toLong() usage[i] = grantedUsage - burst[i] = (duration * grantedUsage).toLong() + burst[i] = pBurst + totalBurst += pBurst availableUsage -= grantedUsage } - val remainder = burst.clone() // We run the total burst on the host processor. Note that this call may be cancelled at any moment in // time, so not all of the burst may be executed. - hostContext.run(remainder, usage, deadline) - val end = simulationContext.clock.millis() + val interrupted = select { + schedulingQueue.onReceive { schedulingQueue.offer(it); true } + hostContext.onRun(burst, usage, deadline).invoke { false } + } + + val end = clock.millis() // No work was performed if ((end - start) <= 0) { - return@launch + continue } - val totalRemainder = remainder.sum() - val totalBurst = burst.sum() + val totalRemainder = burst.sum() + + val entryIterator = vms.entries.iterator() + while (entryIterator.hasNext()) { + val (vm, vmRequests) = entryIterator.next() - for (vm in vms) { // Apply performance interference model val performanceModel = vm.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? val performanceScore = performanceModel?.apply(serverLoad) ?: 1.0 + var hasFinished = false - for ((i, req) in vm.requests.withIndex()) { + for ((i, req) in vmRequests.withIndex()) { // Compute the fraction of compute time allocated to the VM val fraction = req.allocatedUsage / totalUsage @@ -231,9 +288,17 @@ class SimpleVirtDriver( // Compute remaining burst time to be executed for the request req.burst = max(0, vm.burst[i] - grantedBurst) vm.burst[i] = req.burst + + if (req.burst <= 0L || req.isCancelled) { + hasFinished = true + } } - if (vm.burst.any { it == 0L } || vm.deadline <= end) { + if (hasFinished || vm.deadline <= end) { + // Deschedule all requests from this VM + entryIterator.remove() + requests.removeAll(vmRequests) + // Return vCPU `run` call: the requested burst was completed or deadline was exceeded vm.chan.send(Unit) } @@ -248,22 +313,7 @@ class SimpleVirtDriver( server ) ) - - // Make sure we reschedule the remaining amount of work (if we did not obtain the entire request) - reschedule() } - this.call = call - } - - /** - * Flush the progress of the current active VMs. - */ - private fun flush() { - val call = call ?: return // If there is no active call, there is nothing to flush - // The progress is actually flushed in the coroutine when it notices: we cancel it and wait for its - // completion. - call.cancel() - this.call = null } /** @@ -274,11 +324,35 @@ class SimpleVirtDriver( val vcpu: ProcessingUnit, var burst: Long, val limit: Double - ) { + ) : Comparable { /** * The usage that was actually granted. */ var allocatedUsage: Double = 0.0 + + /** + * A flag to indicate the request was cancelled. + */ + var isCancelled: Boolean = false + + override fun equals(other: Any?): Boolean = other is CpuRequest && vm == other.vm && vcpu == other.vcpu + override fun hashCode(): Int = Objects.hash(vm, vcpu) + + override fun compareTo(other: CpuRequest): Int { + var cmp = limit.compareTo(other.limit) + + if (cmp != 0) { + return cmp + } + + cmp = vm.server.uid.compareTo(other.vm.server.uid) + + if (cmp != 0) { + return cmp + } + + return vcpu.id.compareTo(other.vcpu.id) + } } internal inner class VmServerContext( @@ -287,7 +361,6 @@ class SimpleVirtDriver( val domain: Domain ) : ServerManagementContext { private var finalized: Boolean = false - lateinit var requests: List lateinit var burst: LongArray var deadline: Long = 0L var chan = Channel(Channel.RENDEZVOUS) @@ -347,7 +420,7 @@ class SimpleVirtDriver( this.deadline = deadline this.burst = burst - requests = cpus.asSequence() + val requests = cpus.asSequence() .take(burst.size) .mapIndexed { i, cpu -> CpuRequest( @@ -361,16 +434,21 @@ class SimpleVirtDriver( // Wait until the burst has been run or the coroutine is cancelled try { - activeVms += this - reschedule() + schedulingQueue.send(SchedulerCommand.Schedule(this, requests)) chan.receive() } catch (e: CancellationException) { - // On cancellation, we compute and return the remaining burst + // Deschedule the VM + withContext(NonCancellable) { + requests.forEach { it.isCancelled = true } + schedulingQueue.send(SchedulerCommand.Interrupt) + chan.receive() + } + e.assertFailure() - } finally { - activeVms -= this - reschedule() } } + + @OptIn(InternalCoroutinesApi::class) + override fun onRun(burst: LongArray, limit: DoubleArray, deadline: Long): SelectClause0 = TODO() } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index ca7e31ea..3392bd02 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -119,6 +119,7 @@ fun createFaultInjector(domain: Domain, random: Random): FaultInjector { @OptIn(ExperimentalCoroutinesApi::class) fun main(args: Array) { ArgParser(args).parseInto(::ExperimentParameters).run { + val start = System.currentTimeMillis() val monitor = Sc20Monitor(outputFile) val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() @@ -262,6 +263,7 @@ fun main(args: Array) { scheduler.terminate() failureDomain?.cancel() println(simulationContext.clock.instant()) + println("${System.currentTimeMillis() - start} milliseconds") } runBlocking { -- cgit v1.2.3