diff options
| author | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-04-12 15:44:53 +0200 |
|---|---|---|
| committer | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-04-12 15:44:53 +0200 |
| commit | 310daf42af741dee2f11d98eb929d2b6c0db141c (patch) | |
| tree | a0570137b64651bfecd0d3a1d0e0383c2c327cea | |
| parent | 5f141c8b6aa6cfe96333f0cc02015e490b90fca6 (diff) | |
| parent | 4a5ef5a41c8e008d5c09261de550d3f55eaa3348 (diff) | |
Merge branch 'bug/virt-driver-behavior' into '2.x'
Address multiple (performance) issues
See merge request opendc/opendc-simulator!58
9 files changed, 117 insertions, 81 deletions
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt index 934af293..0133832c 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt @@ -46,7 +46,6 @@ import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.Runnable import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.isActive import org.jetbrains.annotations.Async import org.slf4j.Logger @@ -72,7 +71,12 @@ public class OmegaSimulationEngine(override val name: String) : SimulationEngine /** * The event queue to process */ - private val queue: PriorityQueue<Event> = PriorityQueue() + private val queue: PriorityQueue<Event> = PriorityQueue(Comparator<Event> { lhs, rhs -> + // Note that Comparator gives better performance than Comparable according to + // profiling + val cmp = lhs.time.compareTo(rhs.time) + if (cmp == 0) lhs.id.compareTo(rhs.id) else cmp + }) /** * The active processes in the simulation engine. @@ -97,7 +101,9 @@ public class OmegaSimulationEngine(override val name: String) : SimulationEngine state = SimulationEngineState.STARTED } - while (coroutineContext.isActive) { + val job = coroutineContext[Job] + + while (job?.isActive == true) { val event = queue.peek() ?: break val delivery = event.time @@ -126,8 +132,18 @@ public class OmegaSimulationEngine(override val name: String) : SimulationEngine /** * Process the delivery of an event. */ + @OptIn(ExperimentalCoroutinesApi::class) private fun process(@Async.Execute event: Event) { - event.run() + // This has been inlined into this method for performance + when (event) { + is Event.Dispatch -> + event.block.run() + is Event.Resume -> + with(event.continuation) { event.dispatcher.resumeUndispatched(Unit) } + is Event.Timeout -> + if (!event.isCancelled) + event.block.run() + } } /** @@ -142,7 +158,7 @@ public class OmegaSimulationEngine(override val name: String) : SimulationEngine private fun newDomain(parent: DomainImpl?): Domain { val name = "$" + UUID.randomUUID() - return newDomainImpl(name, null) + return newDomainImpl(name, parent) } private fun newDomain(name: String, parent: DomainImpl?): Domain { @@ -214,36 +230,18 @@ public class OmegaSimulationEngine(override val name: String) : SimulationEngine * * @property time The point in time to deliver the message. */ - private sealed class Event(val time: Long, val id: Long) : Comparable<Event>, Runnable { - override fun compareTo(other: Event): Int { - val cmp = time.compareTo(other.time) - return if (cmp == 0) id.compareTo(other.id) else cmp - } - + private sealed class Event(val time: Long, val id: Long) { class Dispatch(time: Long, id: Long, val block: Runnable) : Event(time, id) { - override fun run() = block.run() - override fun toString(): String = "Dispatch[$time]" } class Resume(time: Long, id: Long, val dispatcher: CoroutineDispatcher, val continuation: CancellableContinuation<Unit>) : Event(time, id) { - @ExperimentalCoroutinesApi - override fun run() { - with(continuation) { dispatcher.resumeUndispatched(Unit) } - } - override fun toString(): String = "Resume[$time]" } - class Timeout(time: Long, id: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time, id), DisposableHandle { - override fun run() { - if (!cancelled) { - block.run() - } - } - + class Timeout(time: Long, id: Long, val block: Runnable, var isCancelled: Boolean = false) : Event(time, id), DisposableHandle { override fun dispose() { - cancelled = true + isCancelled = true } override fun toString(): String = "Timeout[$time]" diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index 9ad88c17..b0688f99 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -3,6 +3,7 @@ package com.atlarge.opendc.compute.core.image import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer +import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.ensureActive import java.util.UUID @@ -19,17 +20,20 @@ class VmImage( ) : Image { override suspend fun invoke(ctx: ServerContext) { + val clock = simulationContext.clock + val job = coroutineContext[Job]!! + for (fragment in flopsHistory) { - coroutineContext.ensureActive() + job.ensureActive() if (fragment.flops == 0L) { delay(fragment.duration) } else { val cores = min(fragment.cores, ctx.server.flavor.cpuCount) val burst = LongArray(cores) { fragment.flops / cores } - val usage = DoubleArray(cores) { fragment.usage } + val usage = DoubleArray(cores) { fragment.usage / cores } - ctx.run(burst, usage, simulationContext.clock.millis() + fragment.duration) + ctx.run(burst, usage, clock.millis() + fragment.duration) } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 8e15584a..844938db 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -62,6 +62,7 @@ import kotlin.math.min import kotlinx.coroutines.withContext import java.lang.Exception import kotlin.coroutines.ContinuationInterceptor +import kotlin.random.Random /** * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. @@ -116,6 +117,11 @@ public class SimpleBareMetalDriver( override val powerDraw: Flow<Double> = powerModel(this) + /** + * The internal random instance. + */ + private val random = Random(0) + override suspend fun init(): Node = withContext(domain.coroutineContext) { nodeState.value } @@ -128,7 +134,7 @@ public class SimpleBareMetalDriver( val events = EventFlow<ServerEvent>() val server = Server( - UUID.randomUUID(), + UUID(node.uid.leastSignificantBits xor node.uid.mostSignificantBits, random.nextLong()), node.name, emptyMap(), flavor, diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt index 9ceb8bfc..7c088bc8 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt @@ -60,6 +60,8 @@ public sealed class HypervisorEvent { * it did not have the capacity. * @property interferedBurst The sum of CPU time that virtual machines could not utilize due to performance * interference. + * @property cpuUsage CPU use in megahertz. + * @property cpuDemand CPU demand in megahertz. * @property numberOfDeployedImages The number of images deployed on this hypervisor. */ public data class SliceFinished( @@ -68,6 +70,8 @@ public sealed class HypervisorEvent { public val grantedBurst: Long, public val overcommissionedBurst: Long, public val interferedBurst: Long, + public val cpuUsage: Double, + public val cpuDemand: Double, public val numberOfDeployedImages: Int, public val hostServer: Server ) : HypervisorEvent() diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 5f15084d..d81b8825 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -42,6 +42,7 @@ import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -58,11 +59,12 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.select +import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext -import java.lang.Exception import java.util.Objects import java.util.TreeSet import java.util.UUID +import kotlin.coroutines.resume import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -180,7 +182,7 @@ class SimpleVirtDriver( val pCPUs = hostContext.cpus.indices.sortedBy { hostContext.cpus[it].frequency } val vms = mutableMapOf<VmServerContext, Collection<CpuRequest>>() - val requests = TreeSet<CpuRequest>() + val requests = TreeSet(cpuRequestComparator) val usage = DoubleArray(hostContext.cpus.size) val burst = LongArray(hostContext.cpus.size) @@ -237,7 +239,8 @@ class SimpleVirtDriver( deadline = min(deadline, req.vm.deadline) } - duration = ceil(duration) + // XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs. + duration = max(300.0, ceil(duration)) val totalAllocatedUsage = maxUsage - availableUsage var totalAllocatedBurst = 0L @@ -246,14 +249,14 @@ class SimpleVirtDriver( // Divide the requests over the available capacity of the pCPUs fairly for (i in pCPUs) { - val remaining = hostContext.cpus.size - i - val availableShare = availableUsage / remaining - val grantedUsage = min(hostContext.cpus[i].frequency, availableShare) - val pBurst = ceil(duration * grantedUsage).toLong() + val maxCpuUsage = hostContext.cpus[i].frequency + val fraction = maxCpuUsage / maxUsage + val grantedUsage = min(maxCpuUsage, totalAllocatedUsage * fraction) + val grantedBurst = ceil(duration * grantedUsage).toLong() usage[i] = grantedUsage - burst[i] = pBurst - totalAllocatedBurst += pBurst + burst[i] = grantedBurst + totalAllocatedBurst += grantedBurst availableUsage -= grantedUsage } @@ -308,9 +311,7 @@ class SimpleVirtDriver( if (req.burst <= 0L || req.isCancelled) { hasFinished = true - } - - if (vm.deadline <= end && hostContext.server.state != ServerState.ERROR) { + } else if (vm.deadline <= end && hostContext.server.state != ServerState.ERROR) { // Request must have its entire burst consumed or otherwise we have overcommission // Note that we count the overcommissioned burst if the hypervisor has failed. totalOvercommissionedBurst += req.burst @@ -323,7 +324,7 @@ class SimpleVirtDriver( requests.removeAll(vmRequests) // Return vCPU `run` call: the requested burst was completed or deadline was exceeded - vm.chan.send(Unit) + vm.cont?.resume(Unit) } } @@ -335,7 +336,9 @@ class SimpleVirtDriver( min(totalRequestedBurst, totalAllocatedBurst), min(totalRequestedBurst, totalGrantedBurst), // We can run more than requested due to timing totalOvercommissionedBurst, - totalInterferedBurst, // Might be smaller than zero due to FP rounding errors + totalInterferedBurst, // Might be smaller than zero due to FP rounding errors, + totalAllocatedUsage, + totalRequestedUsage, vmCount, // Some VMs might already have finished, so keep initial VM count server ) @@ -344,6 +347,25 @@ class SimpleVirtDriver( } /** + * The [Comparator] for [CpuRequest]. + */ + private val cpuRequestComparator: Comparator<CpuRequest> = Comparator { lhs, rhs -> + var cmp = lhs.limit.compareTo(rhs.limit) + + if (cmp != 0) { + return@Comparator cmp + } + + cmp = lhs.vm.server.uid.compareTo(rhs.vm.server.uid) + + if (cmp != 0) { + return@Comparator cmp + } + + lhs.vcpu.id.compareTo(rhs.vcpu.id) + } + + /** * A request to schedule a virtual CPU on the host cpu. */ internal data class CpuRequest( @@ -351,7 +373,7 @@ class SimpleVirtDriver( val vcpu: ProcessingUnit, var burst: Long, val limit: Double - ) : Comparable<CpuRequest> { + ) { /** * The usage that was actually granted. */ @@ -364,22 +386,6 @@ class SimpleVirtDriver( override fun equals(other: Any?): Boolean = other is CpuRequest && vm == other.vm && vcpu == other.vcpu override fun hashCode(): Int = Objects.hash(vm, vcpu) - - override fun compareTo(other: CpuRequest): Int { - var cmp = limit.compareTo(other.limit) - - if (cmp != 0) { - return cmp - } - - cmp = vm.server.uid.compareTo(other.vm.server.uid) - - if (cmp != 0) { - return cmp - } - - return vcpu.id.compareTo(other.vcpu.id) - } } internal inner class VmServerContext( @@ -390,7 +396,7 @@ class SimpleVirtDriver( private var finalized: Boolean = false lateinit var burst: LongArray var deadline: Long = 0L - var chan = Channel<Unit>(Channel.RENDEZVOUS) + var cont: CancellableContinuation<Unit>? = null private var initialized: Boolean = false internal val job: Job = launch { @@ -462,13 +468,13 @@ class SimpleVirtDriver( // Wait until the burst has been run or the coroutine is cancelled try { schedulingQueue.send(SchedulerCommand.Schedule(this, requests)) - chan.receive() + suspendCancellableCoroutine<Unit> { cont = it } } catch (e: CancellationException) { // Deschedule the VM withContext(NonCancellable) { requests.forEach { it.isCancelled = true } schedulingQueue.send(SchedulerCommand.Interrupt) - chan.receive() + suspendCancellableCoroutine<Unit> { cont = it } } e.assertFailure() diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/PerformanceInterferenceModel.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/PerformanceInterferenceModel.kt index 1efe7588..04056394 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/PerformanceInterferenceModel.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/PerformanceInterferenceModel.kt @@ -14,15 +14,29 @@ const val IMAGE_PERF_INTERFERENCE_MODEL = "image:performance-interference" * @param items The [PerformanceInterferenceModelItem]s that make up this model. */ data class PerformanceInterferenceModel( - val items: Set<PerformanceInterferenceModelItem> + val items: Set<PerformanceInterferenceModelItem>, + val random: Random = Random(0) ) { private var intersectingItems: List<PerformanceInterferenceModelItem> = emptyList() + private var comparator = Comparator<PerformanceInterferenceModelItem> { lhs, rhs -> + var cmp = lhs.performanceScore.compareTo(rhs.performanceScore) + if (cmp != 0) { + return@Comparator cmp + } + + cmp = lhs.minServerLoad.compareTo(rhs.minServerLoad) + if (cmp != 0) { + return@Comparator cmp + } + + 0 + } fun computeIntersectingItems(colocatedWorkloads: Set<Resource>) { val colocatedWorkloadIds = colocatedWorkloads.map { it.name } intersectingItems = items.filter { item -> colocatedWorkloadIds.intersect(item.workloadNames).size > 1 - } + }.sortedWith(comparator) } fun apply(currentServerLoad: Double): Double { @@ -30,11 +44,10 @@ data class PerformanceInterferenceModel( return 1.0 } val score = intersectingItems - .filter { it.minServerLoad <= currentServerLoad } - .minBy { it.performanceScore } + .firstOrNull { it.minServerLoad <= currentServerLoad } // Apply performance penalty to (on average) only one of the VMs - return if (score != null && Random.nextInt(score.workloadNames.size) == 0) { + return if (score != null && random.nextInt(score.workloadNames.size) == 0) { score.performanceScore } else { 1.0 @@ -66,7 +79,5 @@ data class PerformanceInterferenceModelItem( return true } - override fun hashCode(): Int { - return workloadNames.hashCode() - } + override fun hashCode(): Int = workloadNames.hashCode() } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index c0d6de03..245aa250 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -17,7 +17,7 @@ class Sc20Monitor( private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() init { - outputFile.write("time,duration,requestedBurst,grantedBurst,overcommissionedBurst,interferedBurst,numberOfDeployedImages,server,hostState,hostUsage,powerDraw,failedVms\n") + outputFile.write("time,duration,requestedBurst,grantedBurst,overcommissionedBurst,interferedBurst,cpuUsage,cpuDemand,numberOfDeployedImages,server,hostState,hostUsage,powerDraw,failedVms\n") } suspend fun onVmStateChanged(server: Server) {} @@ -32,6 +32,8 @@ class Sc20Monitor( 0, 0, 0, + 0.0, + 0.0, 0, server, duration @@ -49,6 +51,8 @@ class Sc20Monitor( grantedBurst: Long, overcommissionedBurst: Long, interferedBurst: Long, + cpuUsage: Double, + cpuDemand: Double, numberOfDeployedImages: Int, hostServer: Server, duration: Long = 5 * 60 * 1000L @@ -60,7 +64,7 @@ class Sc20Monitor( val usage = driver.usage.first() val powerDraw = driver.powerDraw.first() - outputFile.write("$time,$duration,$requestedBurst,$grantedBurst,$overcommissionedBurst,$interferedBurst,$numberOfDeployedImages,${hostServer.uid},${hostServer.state},$usage,$powerDraw") + outputFile.write("$time,$duration,$requestedBurst,$grantedBurst,$overcommissionedBurst,$interferedBurst,$cpuUsage,$cpuDemand,$numberOfDeployedImages,${hostServer.uid},${hostServer.state},$usage,$powerDraw") outputFile.newLine() } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index ede18b40..c75bde30 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -201,6 +201,8 @@ fun main(args: Array<String>) { event.grantedBurst, event.overcommissionedBurst, event.interferedBurst, + event.cpuUsage, + event.cpuDemand, event.numberOfDeployedImages, event.hostServer ) diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt index 776cbc4e..950d2630 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -36,6 +36,7 @@ import java.io.BufferedReader import java.io.File import java.io.FileReader import java.util.UUID +import kotlin.math.max /** * A [TraceReader] for the internal VM workload trace format. @@ -82,7 +83,7 @@ class Sc20TraceReader( println(vmFile) val flopsHistory = mutableListOf<FlopsHistoryFragment>() var vmId = "" - var cores = -1 + var maxCores = -1 var requiredMemory = -1L BufferedReader(FileReader(vmFile)).use { reader -> @@ -96,11 +97,12 @@ class Sc20TraceReader( vmId = vmFile.name val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - cores = values[coreCol].trim().toInt() + val cores = values[coreCol].trim().toInt() val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - requiredMemory = values[provisionedMemoryCol].trim().toLong() + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) - val flops: Long = (cpuUsage * 5 * 60 * cores).toLong() + val flops: Long = (cpuUsage * 5 * 60).toLong() if (flopsHistory.isEmpty()) { flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) @@ -115,8 +117,7 @@ class Sc20TraceReader( oldFragment.flops + flops, oldFragment.duration + traceInterval, cpuUsage, - cores - ) + cores) ) } } @@ -136,7 +137,7 @@ class Sc20TraceReader( vmId, mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), flopsHistory, - cores, + maxCores, requiredMemory ) ) |
