From 998466e611438e9f4381e5d693ef4119a3cf8905 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 00:17:18 +0200 Subject: bug: Address uid collision issue --- .../atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'opendc/opendc-compute/src') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 844938db..08f04760 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -120,7 +120,7 @@ public class SimpleBareMetalDriver( /** * The internal random instance. */ - private val random = Random(0) + private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits) override suspend fun init(): Node = withContext(domain.coroutineContext) { nodeState.value @@ -134,7 +134,7 @@ public class SimpleBareMetalDriver( val events = EventFlow() val server = Server( - UUID(node.uid.leastSignificantBits xor node.uid.mostSignificantBits, random.nextLong()), + UUID(random.nextLong(), random.nextLong()), node.name, emptyMap(), flavor, @@ -151,7 +151,7 @@ public class SimpleBareMetalDriver( override suspend fun stop(): Node = withContext(domain.coroutineContext) { val node = nodeState.value - if (node.state == NodeState.SHUTOFF || node.state == NodeState.ERROR) { + if (node.state == NodeState.SHUTOFF) { return@withContext node } -- cgit v1.2.3 From a77829c7a8cf300a99a695423d85f905e0209286 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 12:16:42 +0200 Subject: perf: Optimize performance interference --- .../core/workload/PerformanceInterferenceModel.kt | 129 +++++++++++++++++++++ .../atlarge/opendc/compute/virt/HypervisorEvent.kt | 1 + .../opendc/compute/virt/driver/SimpleVirtDriver.kt | 50 ++++---- .../virt/service/SimpleVirtProvisioningService.kt | 12 +- .../virt/service/VirtProvisioningService.kt | 5 - 5 files changed, 168 insertions(+), 29 deletions(-) create mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt (limited to 'opendc/opendc-compute/src') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt new file mode 100644 index 00000000..ddf9bb33 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt @@ -0,0 +1,129 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.core.workload + +import com.atlarge.opendc.compute.core.Server +import java.util.SortedSet +import java.util.TreeSet +import kotlin.random.Random + +/** + * Meta-data key for the [PerformanceInterferenceModel] of an image. + */ +const val IMAGE_PERF_INTERFERENCE_MODEL = "image:performance-interference" + +/** + * Performance Interference Model describing the variability incurred by different sets of workloads if colocated. + * + * @param items The [PerformanceInterferenceModelItem]s that make up this model. + */ +class PerformanceInterferenceModel( + items: Set, + val random: Random = Random(0) +) { + private var intersectingItems: List = emptyList() + private val comparator = Comparator { lhs, rhs -> + var cmp = lhs.performanceScore.compareTo(rhs.performanceScore) + if (cmp != 0) { + return@Comparator cmp + } + + cmp = lhs.minServerLoad.compareTo(rhs.minServerLoad) + if (cmp != 0) { + return@Comparator cmp + } + + lhs.hashCode().compareTo(rhs.hashCode()) + } + val items = TreeSet(comparator) + private val colocatedWorkloads = TreeSet() + + init { + this.items.addAll(items) + } + + fun vmStarted(server: Server) { + colocatedWorkloads.add(server.image.name) + intersectingItems = items.filter { item -> doesMatch(item) } + } + + fun vmStopped(server: Server) { + colocatedWorkloads.remove(server.image.name) + intersectingItems = items.filter { item -> doesMatch(item) } + } + + private fun doesMatch(item: PerformanceInterferenceModelItem): Boolean { + var count = 0 + for (name in item.workloadNames.subSet(colocatedWorkloads.first(), colocatedWorkloads.last() + "\u0000")) { + if (name in colocatedWorkloads) + count++ + if (count > 1) + return true + } + return false + } + + fun apply(currentServerLoad: Double): Double { + if (intersectingItems.isEmpty()) { + return 1.0 + } + val score = intersectingItems + .firstOrNull { it.minServerLoad <= currentServerLoad } + + // Apply performance penalty to (on average) only one of the VMs + return if (score != null && random.nextInt(score.workloadNames.size) == 0) { + score.performanceScore + } else { + 1.0 + } + } +} + +/** + * Model describing how a specific set of workloads causes performance variability for each workload. + * + * @param workloadNames The names of the workloads that together cause performance variability for each workload in the set. + * @param minServerLoad The minimum total server load at which this interference is activated and noticeable. + * @param performanceScore The performance score that should be applied to each workload's performance. 1 means no + * influence, <1 means that performance degrades, and >1 means that performance improves. + */ +data class PerformanceInterferenceModelItem( + val workloadNames: SortedSet, + val minServerLoad: Double, + val performanceScore: Double +) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + + other as PerformanceInterferenceModelItem + + if (workloadNames != other.workloadNames) return false + + return true + } + + override fun hashCode(): Int = workloadNames.hashCode() +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt index 7c088bc8..e7344fa6 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt @@ -26,6 +26,7 @@ package com.atlarge.opendc.compute.virt import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.virt.driver.VirtDriver +import com.atlarge.opendc.compute.virt.service.VirtProvisioningService /** * An event that is emitted by a [VirtDriver]. diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 73f9dd5c..f32f407c 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -40,8 +40,8 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry -import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope @@ -53,9 +53,6 @@ import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.select @@ -65,6 +62,7 @@ import java.util.Objects import java.util.TreeSet import java.util.UUID import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -101,15 +99,6 @@ class SimpleVirtDriver( override val events: Flow = eventFlow init { - events.filter { it is HypervisorEvent.VmsUpdated }.onEach { - val imagesRunning = vms.map { it.server.image }.toSet() - vms.forEach { - val performanceModel = - it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - performanceModel?.computeIntersectingItems(imagesRunning) - } - }.launchIn(this) - launch { try { scheduler() @@ -140,6 +129,7 @@ class SimpleVirtDriver( ) availableMemory -= requiredMemory vms.add(VmServerContext(server, events, simulationContext.domain)) + vmStarted(server) eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) return server } @@ -148,6 +138,22 @@ class SimpleVirtDriver( eventFlow.close() } + private fun vmStarted(server: Server) { + vms.forEach { + val performanceModel = + it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + performanceModel?.vmStarted(server) + } + } + + private fun vmStopped(server: Server) { + vms.forEach { + val performanceModel = + it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + performanceModel?.vmStopped(server) + } + } + /** * A scheduling command processed by the scheduler. */ @@ -325,7 +331,7 @@ class SimpleVirtDriver( requests.removeAll(vmRequests) // Return vCPU `run` call: the requested burst was completed or deadline was exceeded - vm.cont?.resume(Unit) + vm.chan.send(Unit) } } @@ -395,7 +401,7 @@ class SimpleVirtDriver( private var finalized: Boolean = false lateinit var burst: LongArray var deadline: Long = 0L - var cont: CancellableContinuation? = null + val chan: Channel = Channel(Channel.CONFLATED) private var initialized: Boolean = false internal val job: Job = launch { @@ -443,6 +449,7 @@ class SimpleVirtDriver( server = server.copy(state = serverState) availableMemory += server.flavor.memorySize vms.remove(this) + vmStopped(server) eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory)) events.close() } @@ -466,15 +473,12 @@ class SimpleVirtDriver( // Wait until the burst has been run or the coroutine is cancelled try { schedulingQueue.send(SchedulerCommand.Schedule(this, requests)) - suspendCancellableCoroutine { cont = it } + chan.receive() } catch (e: CancellationException) { // Deschedule the VM - withContext(NonCancellable) { - requests.forEach { it.isCancelled = true } - schedulingQueue.send(SchedulerCommand.Interrupt) - suspendCancellableCoroutine { cont = it } - } - + requests.forEach { it.isCancelled = true } + schedulingQueue.send(SchedulerCommand.Interrupt) + chan.receive() e.assertFailure() } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 6200ad7c..5ed58fee 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -55,7 +55,10 @@ class SimpleVirtProvisioningService( */ private val activeImages: MutableSet = mutableSetOf() - override val hypervisorEvents: Flow = EventFlow() + public var submittedVms = 0L + public var queuedVms = 0L + public var runningVms = 0L + public var finishedVms = 0L /** * The allocation logic to use. @@ -87,6 +90,8 @@ class SimpleVirtProvisioningService( image: Image, flavor: Flavor ): Server = withContext(coroutineContext) { + submittedVms++ + queuedVms++ suspendCancellableCoroutine { cont -> val vmInstance = ImageView(name, image, flavor, cont) incomingImages += vmInstance @@ -145,6 +150,8 @@ class SimpleVirtProvisioningService( ) imageInstance.server = server imageInstance.continuation.resume(server) + queuedVms-- + runningVms++ activeImages += imageInstance server.events @@ -152,6 +159,9 @@ class SimpleVirtProvisioningService( when (event) { is ServerEvent.StateChanged -> { if (event.server.state == ServerState.SHUTOFF) { + runningVms-- + finishedVms++ + activeImages -= imageInstance selectedHv.provisionedCores -= server.flavor.cpuCount diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index 550048a4..695f7274 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -17,11 +17,6 @@ interface VirtProvisioningService { */ val allocationPolicy: AllocationPolicy - /** - * The events emitted by the hypervisors. - */ - public val hypervisorEvents: Flow - /** * Obtain the active hypervisors for this provisioner. */ -- cgit v1.2.3 From eab4c190142f54291ed235e4e18f3a35385a541c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 16:12:25 +0200 Subject: perf: Optimize trace loading for memory usage --- .../atlarge/opendc/compute/core/image/VmImage.kt | 22 ++++++++++++---------- .../core/workload/PerformanceInterferenceModel.kt | 2 +- .../atlarge/opendc/compute/virt/HypervisorEvent.kt | 1 - .../opendc/compute/virt/driver/SimpleVirtDriver.kt | 6 ------ .../virt/service/SimpleVirtProvisioningService.kt | 2 -- .../virt/service/VirtProvisioningService.kt | 2 -- 6 files changed, 13 insertions(+), 22 deletions(-) (limited to 'opendc/opendc-compute/src') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index b0688f99..b37f05a7 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -14,7 +14,7 @@ class VmImage( public override val uid: UUID, public override val name: String, public override val tags: TagContainer, - public val flopsHistory: List, + public val flopsHistory: Sequence, public val maxCores: Int, public val requiredMemory: Long ) : Image { @@ -23,17 +23,19 @@ class VmImage( val clock = simulationContext.clock val job = coroutineContext[Job]!! - for (fragment in flopsHistory) { - job.ensureActive() + for (fragments in flopsHistory.chunked(1024)) { + for (fragment in fragments) { + job.ensureActive() - if (fragment.flops == 0L) { - delay(fragment.duration) - } else { - val cores = min(fragment.cores, ctx.server.flavor.cpuCount) - val burst = LongArray(cores) { fragment.flops / cores } - val usage = DoubleArray(cores) { fragment.usage / cores } + if (fragment.flops == 0L) { + delay(fragment.duration) + } else { + val cores = min(fragment.cores, ctx.server.flavor.cpuCount) + val burst = LongArray(cores) { fragment.flops / cores } + val usage = DoubleArray(cores) { fragment.usage / cores } - ctx.run(burst, usage, clock.millis() + fragment.duration) + ctx.run(burst, usage, clock.millis() + fragment.duration) + } } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt index ddf9bb33..45024a49 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt @@ -64,7 +64,7 @@ class PerformanceInterferenceModel( this.items.addAll(items) } - fun vmStarted(server: Server) { + fun vmStarted(server: Server) { colocatedWorkloads.add(server.image.name) intersectingItems = items.filter { item -> doesMatch(item) } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt index e7344fa6..7c088bc8 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt @@ -26,7 +26,6 @@ package com.atlarge.opendc.compute.virt import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.service.VirtProvisioningService /** * An event that is emitted by a [VirtDriver]. diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index f32f407c..2c25c0fa 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -42,27 +42,21 @@ import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel -import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.Job -import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.select -import kotlinx.coroutines.suspendCancellableCoroutine -import kotlinx.coroutines.withContext import java.util.Objects import java.util.TreeSet import java.util.UUID -import kotlin.coroutines.resume -import kotlin.coroutines.suspendCoroutine import kotlin.math.ceil import kotlin.math.max import kotlin.math.min diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 5ed58fee..09036d0d 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -1,7 +1,6 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.odcsim.SimulationContext -import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server @@ -20,7 +19,6 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index 695f7274..2ad7df84 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -3,10 +3,8 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy -import kotlinx.coroutines.flow.Flow /** * A service for VM provisioning on a cloud. -- cgit v1.2.3 From e097aeb16d77c260126b65c7f13330076d800d52 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 16 Apr 2020 01:35:33 +0200 Subject: perf: Convert traces to Parquet format --- .../com/atlarge/opendc/compute/core/image/VmImage.kt | 2 +- .../opendc/compute/virt/driver/SimpleVirtDriver.kt | 18 +++++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) (limited to 'opendc/opendc-compute/src') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index b37f05a7..e3227540 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -23,7 +23,7 @@ class VmImage( val clock = simulationContext.clock val job = coroutineContext[Job]!! - for (fragments in flopsHistory.chunked(1024)) { + for (fragments in flopsHistory.chunked(128)) { for (fragment in fragments) { job.ensureActive() diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 2c25c0fa..8a32bc43 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -57,6 +57,9 @@ import kotlinx.coroutines.selects.select import java.util.Objects import java.util.TreeSet import java.util.UUID +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -325,7 +328,7 @@ class SimpleVirtDriver( requests.removeAll(vmRequests) // Return vCPU `run` call: the requested burst was completed or deadline was exceeded - vm.chan.send(Unit) + vm.chan?.resume(Unit) } } @@ -371,7 +374,7 @@ class SimpleVirtDriver( val vm: VmServerContext, val vcpu: ProcessingUnit, var burst: Long, - val limit: Double + var limit: Double ) { /** * The usage that was actually granted. @@ -395,7 +398,7 @@ class SimpleVirtDriver( private var finalized: Boolean = false lateinit var burst: LongArray var deadline: Long = 0L - val chan: Channel = Channel(Channel.CONFLATED) + var chan: Continuation? = null private var initialized: Boolean = false internal val job: Job = launch { @@ -452,6 +455,7 @@ class SimpleVirtDriver( require(burst.size == limit.size) { "Array dimensions do not match" } this.deadline = deadline this.burst = burst + val requests = cpus.asSequence() .take(burst.size) .mapIndexed { i, cpu -> @@ -466,13 +470,13 @@ class SimpleVirtDriver( // Wait until the burst has been run or the coroutine is cancelled try { - schedulingQueue.send(SchedulerCommand.Schedule(this, requests)) - chan.receive() + schedulingQueue.offer(SchedulerCommand.Schedule(this, requests)) + suspendCoroutine { chan = it } } catch (e: CancellationException) { // Deschedule the VM requests.forEach { it.isCancelled = true } - schedulingQueue.send(SchedulerCommand.Interrupt) - chan.receive() + schedulingQueue.offer(SchedulerCommand.Interrupt) + suspendCoroutine { chan = it } e.assertFailure() } } -- cgit v1.2.3 From 6cd93b57945b289b2e14556f7ceaa193326eff78 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 16 Apr 2020 15:30:34 +0200 Subject: bug: Fix issues related to early termination --- .../atlarge/opendc/compute/core/image/VmImage.kt | 20 +++++++-------- .../opendc/compute/virt/driver/SimpleVirtDriver.kt | 4 +-- .../virt/service/SimpleVirtProvisioningService.kt | 30 +++++++++++++++++++--- 3 files changed, 37 insertions(+), 17 deletions(-) (limited to 'opendc/opendc-compute/src') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index e3227540..36bbfa45 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -23,19 +23,17 @@ class VmImage( val clock = simulationContext.clock val job = coroutineContext[Job]!! - for (fragments in flopsHistory.chunked(128)) { - for (fragment in fragments) { - job.ensureActive() + for (fragment in flopsHistory) { + job.ensureActive() - if (fragment.flops == 0L) { - delay(fragment.duration) - } else { - val cores = min(fragment.cores, ctx.server.flavor.cpuCount) - val burst = LongArray(cores) { fragment.flops / cores } - val usage = DoubleArray(cores) { fragment.usage / cores } + if (fragment.flops == 0L) { + delay(fragment.duration) + } else { + val cores = min(fragment.cores, ctx.server.flavor.cpuCount) + val burst = LongArray(cores) { fragment.flops / cores } + val usage = DoubleArray(cores) { fragment.usage / cores } - ctx.run(burst, usage, clock.millis() + fragment.duration) - } + ctx.run(burst, usage, clock.millis() + fragment.duration) } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 8a32bc43..53fa463b 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -243,7 +243,7 @@ class SimpleVirtDriver( } // XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs. - duration = max(300.0, ceil(duration)) + duration = 300.0 val totalAllocatedUsage = maxUsage - availableUsage var totalAllocatedBurst = 0L @@ -335,7 +335,7 @@ class SimpleVirtDriver( eventFlow.emit( HypervisorEvent.SliceFinished( this@SimpleVirtDriver, - totalRequestedSubBurst, + totalRequestedBurst, min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing totalOvercommissionedBurst, totalInterferedBurst, // Might be smaller than zero due to FP rounding errors, diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 09036d0d..520f6dc5 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext import kotlin.coroutines.Continuation import kotlin.coroutines.resume +import kotlin.math.max @OptIn(ExperimentalCoroutinesApi::class) class SimpleVirtProvisioningService( @@ -57,6 +58,10 @@ class SimpleVirtProvisioningService( public var queuedVms = 0L public var runningVms = 0L public var finishedVms = 0L + public var unscheduledVms = 0L + + private var maxCores = 0 + private var maxMemory = 0L /** * The allocation logic to use. @@ -124,15 +129,24 @@ class SimpleVirtProvisioningService( } private suspend fun schedule() { - val log = simulationContext.log + val clock = simulationContext.clock val imagesToBeScheduled = incomingImages.toSet() for (imageInstance in imagesToBeScheduled) { val requiredMemory = (imageInstance.image as VmImage).requiredMemory - val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) ?: break + val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) + + if (selectedHv == null) { + if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { + unscheduledVms++ + println("[${clock.millis()}] CANNOT SPAWN ${imageInstance.image}") + } + + break + } try { - log.info("Spawning ${imageInstance.image} on ${selectedHv.server}") + println("[${clock.millis()}] SPAWN ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}") incomingImages -= imageInstance // Speculatively update the hypervisor view information to prevent other images in the queue from @@ -157,6 +171,7 @@ class SimpleVirtProvisioningService( when (event) { is ServerEvent.StateChanged -> { if (event.server.state == ServerState.SHUTOFF) { + println("[${clock.millis()}] FINISH ${event.server.uid} ${event.server.name} ${event.server.flavor}") runningVms-- finishedVms++ @@ -178,6 +193,8 @@ class SimpleVirtProvisioningService( selectedHv.numberOfActiveServers-- selectedHv.provisionedCores -= imageInstance.flavor.cpuCount selectedHv.availableMemory += requiredMemory + } catch (e: Throwable) { + e.printStackTrace() } } } @@ -196,13 +213,18 @@ class SimpleVirtProvisioningService( server.flavor.memorySize, 0 ) + maxCores = max(maxCores, server.flavor.cpuCount) + maxMemory = max(maxMemory, server.flavor.memorySize) hypervisors[server] = hv } } ServerState.SHUTOFF, ServerState.ERROR -> { val hv = hypervisors[server] ?: return availableHypervisors -= hv - requestCycle() + + if (incomingImages.isNotEmpty()) { + requestCycle() + } } else -> throw IllegalStateException() } -- cgit v1.2.3