diff options
| author | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-04-20 15:21:35 +0200 |
|---|---|---|
| committer | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-04-20 15:21:35 +0200 |
| commit | 6981d5581e2ce5c6df42dfbf133c350bd9c35a0f (patch) | |
| tree | 74e8993babb28dd56950f1da69eda2d97735a0e8 | |
| parent | 60372f0022d423efd5267ef4008d9afcbe870911 (diff) | |
| parent | 3e056406616860c77168d827f1ca9d8d3c79c08e (diff) | |
Merge branch 'bug/experiment-issues' into '2.x'
Address issues during experiments
See merge request opendc/opendc-simulator!61
18 files changed, 894 insertions, 213 deletions
@@ -1,5 +1,9 @@ # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* + +# data +data/ + ### JetBrains # Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm # Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index b0688f99..36bbfa45 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -14,7 +14,7 @@ class VmImage( public override val uid: UUID, public override val name: String, public override val tags: TagContainer, - public val flopsHistory: List<FlopsHistoryFragment>, + public val flopsHistory: Sequence<FlopsHistoryFragment>, public val maxCores: Int, public val requiredMemory: Long ) : Image { diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt new file mode 100644 index 00000000..45024a49 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt @@ -0,0 +1,129 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.core.workload + +import com.atlarge.opendc.compute.core.Server +import java.util.SortedSet +import java.util.TreeSet +import kotlin.random.Random + +/** + * Meta-data key for the [PerformanceInterferenceModel] of an image. + */ +const val IMAGE_PERF_INTERFERENCE_MODEL = "image:performance-interference" + +/** + * Performance Interference Model describing the variability incurred by different sets of workloads if colocated. + * + * @param items The [PerformanceInterferenceModelItem]s that make up this model. + */ +class PerformanceInterferenceModel( + items: Set<PerformanceInterferenceModelItem>, + val random: Random = Random(0) +) { + private var intersectingItems: List<PerformanceInterferenceModelItem> = emptyList() + private val comparator = Comparator<PerformanceInterferenceModelItem> { lhs, rhs -> + var cmp = lhs.performanceScore.compareTo(rhs.performanceScore) + if (cmp != 0) { + return@Comparator cmp + } + + cmp = lhs.minServerLoad.compareTo(rhs.minServerLoad) + if (cmp != 0) { + return@Comparator cmp + } + + lhs.hashCode().compareTo(rhs.hashCode()) + } + val items = TreeSet(comparator) + private val colocatedWorkloads = TreeSet<String>() + + init { + this.items.addAll(items) + } + + fun vmStarted(server: Server) { + colocatedWorkloads.add(server.image.name) + intersectingItems = items.filter { item -> doesMatch(item) } + } + + fun vmStopped(server: Server) { + colocatedWorkloads.remove(server.image.name) + intersectingItems = items.filter { item -> doesMatch(item) } + } + + private fun doesMatch(item: PerformanceInterferenceModelItem): Boolean { + var count = 0 + for (name in item.workloadNames.subSet(colocatedWorkloads.first(), colocatedWorkloads.last() + "\u0000")) { + if (name in colocatedWorkloads) + count++ + if (count > 1) + return true + } + return false + } + + fun apply(currentServerLoad: Double): Double { + if (intersectingItems.isEmpty()) { + return 1.0 + } + val score = intersectingItems + .firstOrNull { it.minServerLoad <= currentServerLoad } + + // Apply performance penalty to (on average) only one of the VMs + return if (score != null && random.nextInt(score.workloadNames.size) == 0) { + score.performanceScore + } else { + 1.0 + } + } +} + +/** + * Model describing how a specific set of workloads causes performance variability for each workload. + * + * @param workloadNames The names of the workloads that together cause performance variability for each workload in the set. + * @param minServerLoad The minimum total server load at which this interference is activated and noticeable. + * @param performanceScore The performance score that should be applied to each workload's performance. 1 means no + * influence, <1 means that performance degrades, and >1 means that performance improves. + */ +data class PerformanceInterferenceModelItem( + val workloadNames: SortedSet<String>, + val minServerLoad: Double, + val performanceScore: Double +) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + + other as PerformanceInterferenceModelItem + + if (workloadNames != other.workloadNames) return false + + return true + } + + override fun hashCode(): Int = workloadNames.hashCode() +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 844938db..08f04760 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -120,7 +120,7 @@ public class SimpleBareMetalDriver( /** * The internal random instance. */ - private val random = Random(0) + private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits) override suspend fun init(): Node = withContext(domain.coroutineContext) { nodeState.value @@ -134,7 +134,7 @@ public class SimpleBareMetalDriver( val events = EventFlow<ServerEvent>() val server = Server( - UUID(node.uid.leastSignificantBits xor node.uid.mostSignificantBits, random.nextLong()), + UUID(random.nextLong(), random.nextLong()), node.name, emptyMap(), flavor, @@ -151,7 +151,7 @@ public class SimpleBareMetalDriver( override suspend fun stop(): Node = withContext(domain.coroutineContext) { val node = nodeState.value - if (node.state == NodeState.SHUTOFF || node.state == NodeState.ERROR) { + if (node.state == NodeState.SHUTOFF) { return@withContext node } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 73f9dd5c..53fa463b 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -40,31 +40,26 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry -import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel -import kotlinx.coroutines.CancellableContinuation +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.Job -import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.select -import kotlinx.coroutines.suspendCancellableCoroutine -import kotlinx.coroutines.withContext import java.util.Objects import java.util.TreeSet import java.util.UUID +import kotlin.coroutines.Continuation import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -101,15 +96,6 @@ class SimpleVirtDriver( override val events: Flow<HypervisorEvent> = eventFlow init { - events.filter { it is HypervisorEvent.VmsUpdated }.onEach { - val imagesRunning = vms.map { it.server.image }.toSet() - vms.forEach { - val performanceModel = - it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - performanceModel?.computeIntersectingItems(imagesRunning) - } - }.launchIn(this) - launch { try { scheduler() @@ -140,6 +126,7 @@ class SimpleVirtDriver( ) availableMemory -= requiredMemory vms.add(VmServerContext(server, events, simulationContext.domain)) + vmStarted(server) eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) return server } @@ -148,6 +135,22 @@ class SimpleVirtDriver( eventFlow.close() } + private fun vmStarted(server: Server) { + vms.forEach { + val performanceModel = + it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + performanceModel?.vmStarted(server) + } + } + + private fun vmStopped(server: Server) { + vms.forEach { + val performanceModel = + it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + performanceModel?.vmStopped(server) + } + } + /** * A scheduling command processed by the scheduler. */ @@ -240,7 +243,7 @@ class SimpleVirtDriver( } // XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs. - duration = max(300.0, ceil(duration)) + duration = 300.0 val totalAllocatedUsage = maxUsage - availableUsage var totalAllocatedBurst = 0L @@ -325,14 +328,14 @@ class SimpleVirtDriver( requests.removeAll(vmRequests) // Return vCPU `run` call: the requested burst was completed or deadline was exceeded - vm.cont?.resume(Unit) + vm.chan?.resume(Unit) } } eventFlow.emit( HypervisorEvent.SliceFinished( this@SimpleVirtDriver, - totalRequestedSubBurst, + totalRequestedBurst, min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing totalOvercommissionedBurst, totalInterferedBurst, // Might be smaller than zero due to FP rounding errors, @@ -371,7 +374,7 @@ class SimpleVirtDriver( val vm: VmServerContext, val vcpu: ProcessingUnit, var burst: Long, - val limit: Double + var limit: Double ) { /** * The usage that was actually granted. @@ -395,7 +398,7 @@ class SimpleVirtDriver( private var finalized: Boolean = false lateinit var burst: LongArray var deadline: Long = 0L - var cont: CancellableContinuation<Unit>? = null + var chan: Continuation<Unit>? = null private var initialized: Boolean = false internal val job: Job = launch { @@ -443,6 +446,7 @@ class SimpleVirtDriver( server = server.copy(state = serverState) availableMemory += server.flavor.memorySize vms.remove(this) + vmStopped(server) eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory)) events.close() } @@ -451,6 +455,7 @@ class SimpleVirtDriver( require(burst.size == limit.size) { "Array dimensions do not match" } this.deadline = deadline this.burst = burst + val requests = cpus.asSequence() .take(burst.size) .mapIndexed { i, cpu -> @@ -465,16 +470,13 @@ class SimpleVirtDriver( // Wait until the burst has been run or the coroutine is cancelled try { - schedulingQueue.send(SchedulerCommand.Schedule(this, requests)) - suspendCancellableCoroutine<Unit> { cont = it } + schedulingQueue.offer(SchedulerCommand.Schedule(this, requests)) + suspendCoroutine<Unit> { chan = it } } catch (e: CancellationException) { // Deschedule the VM - withContext(NonCancellable) { - requests.forEach { it.isCancelled = true } - schedulingQueue.send(SchedulerCommand.Interrupt) - suspendCancellableCoroutine<Unit> { cont = it } - } - + requests.forEach { it.isCancelled = true } + schedulingQueue.offer(SchedulerCommand.Interrupt) + suspendCoroutine<Unit> { chan = it } e.assertFailure() } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 6200ad7c..520f6dc5 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -1,7 +1,6 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.odcsim.SimulationContext -import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server @@ -20,7 +19,6 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch @@ -28,6 +26,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext import kotlin.coroutines.Continuation import kotlin.coroutines.resume +import kotlin.math.max @OptIn(ExperimentalCoroutinesApi::class) class SimpleVirtProvisioningService( @@ -55,7 +54,14 @@ class SimpleVirtProvisioningService( */ private val activeImages: MutableSet<ImageView> = mutableSetOf() - override val hypervisorEvents: Flow<HypervisorEvent> = EventFlow() + public var submittedVms = 0L + public var queuedVms = 0L + public var runningVms = 0L + public var finishedVms = 0L + public var unscheduledVms = 0L + + private var maxCores = 0 + private var maxMemory = 0L /** * The allocation logic to use. @@ -87,6 +93,8 @@ class SimpleVirtProvisioningService( image: Image, flavor: Flavor ): Server = withContext(coroutineContext) { + submittedVms++ + queuedVms++ suspendCancellableCoroutine<Server> { cont -> val vmInstance = ImageView(name, image, flavor, cont) incomingImages += vmInstance @@ -121,15 +129,24 @@ class SimpleVirtProvisioningService( } private suspend fun schedule() { - val log = simulationContext.log + val clock = simulationContext.clock val imagesToBeScheduled = incomingImages.toSet() for (imageInstance in imagesToBeScheduled) { val requiredMemory = (imageInstance.image as VmImage).requiredMemory - val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) ?: break + val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) + + if (selectedHv == null) { + if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { + unscheduledVms++ + println("[${clock.millis()}] CANNOT SPAWN ${imageInstance.image}") + } + + break + } try { - log.info("Spawning ${imageInstance.image} on ${selectedHv.server}") + println("[${clock.millis()}] SPAWN ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}") incomingImages -= imageInstance // Speculatively update the hypervisor view information to prevent other images in the queue from @@ -145,6 +162,8 @@ class SimpleVirtProvisioningService( ) imageInstance.server = server imageInstance.continuation.resume(server) + queuedVms-- + runningVms++ activeImages += imageInstance server.events @@ -152,6 +171,10 @@ class SimpleVirtProvisioningService( when (event) { is ServerEvent.StateChanged -> { if (event.server.state == ServerState.SHUTOFF) { + println("[${clock.millis()}] FINISH ${event.server.uid} ${event.server.name} ${event.server.flavor}") + runningVms-- + finishedVms++ + activeImages -= imageInstance selectedHv.provisionedCores -= server.flavor.cpuCount @@ -170,6 +193,8 @@ class SimpleVirtProvisioningService( selectedHv.numberOfActiveServers-- selectedHv.provisionedCores -= imageInstance.flavor.cpuCount selectedHv.availableMemory += requiredMemory + } catch (e: Throwable) { + e.printStackTrace() } } } @@ -188,13 +213,18 @@ class SimpleVirtProvisioningService( server.flavor.memorySize, 0 ) + maxCores = max(maxCores, server.flavor.cpuCount) + maxMemory = max(maxMemory, server.flavor.memorySize) hypervisors[server] = hv } } ServerState.SHUTOFF, ServerState.ERROR -> { val hv = hypervisors[server] ?: return availableHypervisors -= hv - requestCycle() + + if (incomingImages.isNotEmpty()) { + requestCycle() + } } else -> throw IllegalStateException() } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index 550048a4..2ad7df84 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -3,10 +3,8 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy -import kotlinx.coroutines.flow.Flow /** * A service for VM provisioning on a cloud. @@ -18,11 +16,6 @@ interface VirtProvisioningService { val allocationPolicy: AllocationPolicy /** - * The events emitted by the hypervisors. - */ - public val hypervisorEvents: Flow<HypervisorEvent> - - /** * Obtain the active hypervisors for this provisioner. */ public suspend fun drivers(): Set<VirtDriver> diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/PerformanceInterferenceModel.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/PerformanceInterferenceModel.kt deleted file mode 100644 index 04056394..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/PerformanceInterferenceModel.kt +++ /dev/null @@ -1,83 +0,0 @@ -package com.atlarge.opendc.core.workload - -import com.atlarge.opendc.core.resource.Resource -import kotlin.random.Random - -/** - * Meta-data key for the [PerformanceInterferenceModel] of an image. - */ -const val IMAGE_PERF_INTERFERENCE_MODEL = "image:performance-interference" - -/** - * Performance Interference Model describing the variability incurred by different sets of workloads if colocated. - * - * @param items The [PerformanceInterferenceModelItem]s that make up this model. - */ -data class PerformanceInterferenceModel( - val items: Set<PerformanceInterferenceModelItem>, - val random: Random = Random(0) -) { - private var intersectingItems: List<PerformanceInterferenceModelItem> = emptyList() - private var comparator = Comparator<PerformanceInterferenceModelItem> { lhs, rhs -> - var cmp = lhs.performanceScore.compareTo(rhs.performanceScore) - if (cmp != 0) { - return@Comparator cmp - } - - cmp = lhs.minServerLoad.compareTo(rhs.minServerLoad) - if (cmp != 0) { - return@Comparator cmp - } - - 0 - } - - fun computeIntersectingItems(colocatedWorkloads: Set<Resource>) { - val colocatedWorkloadIds = colocatedWorkloads.map { it.name } - intersectingItems = items.filter { item -> - colocatedWorkloadIds.intersect(item.workloadNames).size > 1 - }.sortedWith(comparator) - } - - fun apply(currentServerLoad: Double): Double { - if (intersectingItems.isEmpty()) { - return 1.0 - } - val score = intersectingItems - .firstOrNull { it.minServerLoad <= currentServerLoad } - - // Apply performance penalty to (on average) only one of the VMs - return if (score != null && random.nextInt(score.workloadNames.size) == 0) { - score.performanceScore - } else { - 1.0 - } - } -} - -/** - * Model describing how a specific set of workloads causes performance variability for each workload. - * - * @param workloadNames The names of the workloads that together cause performance variability for each workload in the set. - * @param minServerLoad The minimum total server load at which this interference is activated and noticeable. - * @param performanceScore The performance score that should be applied to each workload's performance. 1 means no - * influence, <1 means that performance degrades, and >1 means that performance improves. - */ -data class PerformanceInterferenceModelItem( - val workloadNames: Set<String>, - val minServerLoad: Double, - val performanceScore: Double -) { - override fun equals(other: Any?): Boolean { - if (this === other) return true - if (javaClass != other?.javaClass) return false - - other as PerformanceInterferenceModelItem - - if (workloadNames != other.workloadNames) return false - - return true - } - - override fun hashCode(): Int = workloadNames.hashCode() -} diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index 28b8ae12..8611ffa7 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -32,6 +32,7 @@ plugins { application { mainClassName = "com.atlarge.opendc.experiments.sc20.TestExperimentKt" + applicationDefaultJvmArgs = listOf("-Xmx2500M", "-Xms1800M") } dependencies { @@ -40,7 +41,10 @@ dependencies { implementation(kotlin("stdlib")) implementation("com.xenomachina:kotlin-argparser:2.0.7") api("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") - + implementation("org.apache.parquet:parquet-avro:1.11.0") + implementation("org.apache.hadoop:hadoop-client:3.2.1") { + exclude(group = "org.slf4j", module = "slf4j-log4j12") + } runtimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}") runtimeOnly(project(":odcsim:odcsim-engine-omega")) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 7e6398bb..bac0de21 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -6,23 +6,71 @@ import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver import kotlinx.coroutines.flow.first -import java.io.BufferedWriter +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName import java.io.Closeable -import java.io.FileWriter +import java.util.concurrent.ArrayBlockingQueue +import kotlin.concurrent.thread class Sc20Monitor( destination: String ) : Closeable { - private val outputFile = BufferedWriter(FileWriter(destination)) private val lastServerStates = mutableMapOf<Server, Pair<ServerState, Long>>() - - init { - outputFile.write("time,duration,requestedBurst,grantedBurst,overcommissionedBurst,interferedBurst,cpuUsage,cpuDemand,numberOfDeployedImages,server,hostState,hostUsage,powerDraw\n") + private val schema = SchemaBuilder + .record("slice") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + .name("time").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("requestedBurst").type().longType().noDefault() + .name("grantedBurst").type().longType().noDefault() + .name("overcommissionedBurst").type().longType().noDefault() + .name("interferedBurst").type().longType().noDefault() + .name("cpuUsage").type().doubleType().noDefault() + .name("cpuDemand").type().doubleType().noDefault() + .name("numberOfDeployedImages").type().intType().noDefault() + .name("server").type().stringType().noDefault() + .name("hostState").type().stringType().noDefault() + .name("hostUsage").type().doubleType().noDefault() + .name("powerDraw").type().doubleType().noDefault() + .name("totalSubmittedVms").type().longType().noDefault() + .name("totalQueuedVms").type().longType().noDefault() + .name("totalRunningVms").type().longType().noDefault() + .name("totalFinishedVms").type().longType().noDefault() + .endRecord() + private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(destination)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + private val queue = ArrayBlockingQueue<GenericData.Record>(2048) + private val writerThread = thread(start = true, name = "sc20-writer") { + try { + while (true) { + val record = queue.take() + writer.write(record) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + writer.close() + } } suspend fun onVmStateChanged(server: Server) {} - suspend fun serverStateChanged(driver: VirtDriver, server: Server) { + suspend fun serverStateChanged( + driver: VirtDriver, + server: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long + ) { val lastServerState = lastServerStates[server] if (server.state == ServerState.SHUTOFF && lastServerState != null) { val duration = simulationContext.clock.millis() - lastServerState.second @@ -36,6 +84,10 @@ class Sc20Monitor( 0.0, 0, server, + submittedVms, + queuedVms, + runningVms, + finishedVms, duration ) } @@ -55,21 +107,44 @@ class Sc20Monitor( cpuDemand: Double, numberOfDeployedImages: Int, hostServer: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long, duration: Long = 5 * 60 * 1000L ) { - lastServerStates.remove(hostServer) - // Assume for now that the host is not virtualized and measure the current power draw val driver = hostServer.services[BareMetalDriver.Key] val usage = driver.usage.first() val powerDraw = driver.powerDraw.first() - outputFile.write("$time,$duration,$requestedBurst,$grantedBurst,$overcommissionedBurst,$interferedBurst,$cpuUsage,$cpuDemand,$numberOfDeployedImages,${hostServer.uid},${hostServer.state},$usage,$powerDraw") - outputFile.newLine() + val record = GenericData.Record(schema) + record.put("time", time) + record.put("duration", duration) + record.put("requestedBurst", requestedBurst) + record.put("grantedBurst", grantedBurst) + record.put("overcommissionedBurst", overcommissionedBurst) + record.put("interferedBurst", interferedBurst) + record.put("cpuUsage", cpuUsage) + record.put("cpuDemand", cpuDemand) + record.put("numberOfDeployedImages", numberOfDeployedImages) + record.put("server", hostServer.uid) + record.put("hostState", hostServer.state) + record.put("hostUsage", usage) + record.put("powerDraw", powerDraw) + record.put("totalSubmittedVms", submittedVms) + record.put("totalQueuedVms", queuedVms) + record.put("totalRunningVms", runningVms) + record.put("totalFinishedVms", finishedVms) + + queue.put(record) } override fun close() { - outputFile.flush() - outputFile.close() + // Busy loop to wait for writer thread to finish + while (queue.isNotEmpty()) { + Thread.sleep(500) + } + writerThread.interrupt() } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt new file mode 100644 index 00000000..0a7718e9 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt @@ -0,0 +1,290 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.core.User +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetReader +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.filter2.predicate.Statistics +import org.apache.parquet.filter2.predicate.UserDefinedPredicate +import org.apache.parquet.io.api.Binary +import java.io.File +import java.io.Serializable +import java.util.SortedSet +import java.util.TreeSet +import java.util.UUID +import java.util.concurrent.ArrayBlockingQueue +import kotlin.concurrent.thread +import kotlin.random.Random + +/** + * A [TraceReader] for the internal VM workload trace format. + * + * @param traceFile The directory of the traces. + * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + */ +@OptIn(ExperimentalStdlibApi::class) +class Sc20ParquetTraceReader( + traceFile: File, + performanceInterferenceModel: PerformanceInterferenceModel, + selectedVms: List<String>, + random: Random +) : TraceReader<VmWorkload> { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator<TraceEntry<VmWorkload>> + + /** + * The intermediate buffer to store the read records in. + */ + private val queue = ArrayBlockingQueue<Pair<String, FlopsHistoryFragment>>(1024) + + /** + * An optional filter for filtering the selected VMs + */ + private val filter = + if (selectedVms.isEmpty()) + null + else + FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms)))) + + /** + * A poisonous fragment. + */ + private val poison = Pair("\u0000", FlopsHistoryFragment(0, 0, 0, 0.0, 0)) + + /** + * The thread to read the records in. + */ + private val readerThread = thread(start = true, name = "sc20-reader") { + val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet")) + .disableCompatibility() + .run { if (filter != null) withFilter(filter) else this } + .build() + + try { + while (true) { + val record = reader.read() + + if (record == null) { + queue.put(poison) + break + } + + val id = record["id"].toString() + val tick = record["time"] as Long + val duration = record["duration"] as Long + val cores = record["cores"] as Int + val cpuUsage = record["cpuUsage"] as Double + val flops = record["flops"] as Long + + val fragment = FlopsHistoryFragment( + tick, + flops, + duration, + cpuUsage, + cores + ) + + queue.put(id to fragment) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + reader.close() + } + } + + /** + * Fill the buffers with the VMs + */ + private fun pull(buffers: Map<String, List<MutableList<FlopsHistoryFragment>>>) { + if (!hasNext) { + return + } + + val fragments = mutableListOf<Pair<String, FlopsHistoryFragment>>() + queue.drainTo(fragments) + + for ((id, fragment) in fragments) { + if (id == poison.first) { + hasNext = false + return + } + buffers[id]?.forEach { it.add(fragment) } + } + } + + /** + * A flag to indicate whether the reader has more entries. + */ + private var hasNext: Boolean = true + + /** + * Initialize the reader. + */ + init { + val takenIds = mutableSetOf<UUID>() + val entries = mutableMapOf<String, GenericData.Record>() + val buffers = mutableMapOf<String, MutableList<MutableList<FlopsHistoryFragment>>>() + + val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet")) + .disableCompatibility() + .run { if (filter != null) withFilter(filter) else this } + .build() + + while (true) { + val record = metaReader.read() ?: break + val id = record["id"].toString() + entries[id] = record + } + + metaReader.close() + + val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms + + // Create the entry iterator + iterator = selection.asSequence() + .mapNotNull { entries[it] } + .mapIndexed { index, record -> + val id = record["id"].toString() + val submissionTime = record["submissionTime"] as Long + val endTime = record["endTime"] as Long + val maxCores = record["maxCores"] as Int + val requiredMemory = record["requiredMemory"] as Long + val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray()) + + assert(uid !in takenIds) + takenIds += uid + + println(id) + + val internalBuffer = mutableListOf<FlopsHistoryFragment>() + val externalBuffer = mutableListOf<FlopsHistoryFragment>() + buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) + val fragments = sequence<FlopsHistoryFragment> { + repeat@while (true) { + if (externalBuffer.isEmpty()) { + if (hasNext) { + pull(buffers) + continue + } else { + break + } + } + + internalBuffer.addAll(externalBuffer) + externalBuffer.clear() + + for (fragment in internalBuffer) { + yield(fragment) + + if (fragment.tick >= endTime) { + break@repeat + } + } + + internalBuffer.clear() + } + + buffers.remove(id) + } + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(), + Random(random.nextInt()) + ) + val vmWorkload = VmWorkload( + uid, "VM Workload $id", UnnamedUser, + VmImage( + uid, + id, + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + fragments, + maxCores, + requiredMemory + ) + ) + + TraceEntryImpl(submissionTime, vmWorkload) + } + .sortedBy { it.submissionTime } + .toList() + .iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry<VmWorkload> = iterator.next() + + override fun close() { + readerThread.interrupt() + } + + private class SelectedVmFilter(val selectedVms: SortedSet<String>) : UserDefinedPredicate<Binary>(), Serializable { + override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) + + override fun canDrop(statistics: Statistics<Binary>): Boolean { + val min = statistics.min + val max = statistics.max + + return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty() + } + + override fun inverseCanDrop(statistics: Statistics<Binary>): Boolean { + val min = statistics.min + val max = statistics.max + + return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() + } + } + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "<unnamed>" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry<VmWorkload> +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index c75bde30..028cfb9a 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -29,7 +29,6 @@ import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ServerEvent -import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.metal.NODE_CLUSTER import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.HypervisorEvent @@ -45,7 +44,6 @@ import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.core.failure.FaultInjector import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader -import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import com.xenomachina.argparser.ArgParser @@ -70,13 +68,15 @@ class ExperimentParameters(parser: ArgParser) { val environmentFile by parser.storing("path to the environment file") val performanceInterferenceFile by parser.storing("path to the performance interference file").default { null } val outputFile by parser.storing("path to where the output should be stored") - .default { "sc20-experiment-results.csv" } + .default { "data/results-${System.currentTimeMillis()}.parquet" } val selectedVms by parser.storing("the VMs to run") { parseVMs(this) } .default { emptyList() } val selectedVmsFile by parser.storing("path to a file containing the VMs to run") { parseVMs(FileReader(File(this)).readText()) } .default { emptyList() } + val seed by parser.storing("the random seed") { toInt() } + .default(0) val failures by parser.flagging("-x", "--failures", help = "enable (correlated) machine failures") val allocationPolicy by parser.storing("name of VM allocation policy to use").default("core-mem") @@ -118,6 +118,14 @@ fun createFaultInjector(domain: Domain, random: Random): FaultInjector { @OptIn(ExperimentalCoroutinesApi::class) fun main(args: Array<String>) { ArgParser(args).parseInto(::ExperimentParameters).run { + println("trace-directory: $traceDirectory") + println("environment-file: $environmentFile") + println("performance-interference-file: $performanceInterferenceFile") + println("selected-vms-file: $selectedVmsFile") + println("seed: $seed") + println("failures: $failures") + println("allocation-policy: $allocationPolicy") + val start = System.currentTimeMillis() val monitor = Sc20Monitor(outputFile) @@ -135,7 +143,7 @@ fun main(args: Array<String>) { "active-servers-inv" to NumberOfActiveServersAllocationPolicy(true), "provisioned-cores" to ProvisionedCoresAllocationPolicy(), "provisioned-cores-inv" to ProvisionedCoresAllocationPolicy(true), - "random" to RandomAllocationPolicy() + "random" to RandomAllocationPolicy(Random(seed)) ) if (allocationPolicy !in allocationPolicies) { @@ -174,20 +182,16 @@ fun main(args: Array<String>) { delay(10) val hypervisors = scheduler.drivers() - var availableHypervisors = hypervisors.size // Monitor hypervisor events for (hypervisor in hypervisors) { // TODO Do not expose VirtDriver directly but use Hypervisor class. - monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server) + monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) hypervisor.server.events .onEach { event -> when (event) { is ServerEvent.StateChanged -> { - monitor.serverStateChanged(hypervisor, event.server) - - if (event.server.state == ServerState.ERROR) - availableHypervisors -= 1 + monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) } } } @@ -204,7 +208,11 @@ fun main(args: Array<String>) { event.cpuUsage, event.cpuDemand, event.numberOfDeployedImages, - event.hostServer + event.hostServer, + scheduler.submittedVms, + scheduler.queuedVms, + scheduler.runningVms, + scheduler.finishedVms ) } } @@ -216,7 +224,7 @@ fun main(args: Array<String>) { val domain = root.newDomain(name = "failures") domain.launch { chan.receive() - val random = Random(0) + val random = Random(seed) val injectors = mutableMapOf<String, FaultInjector>() for (node in bareMetalProvisioner.nodes()) { val cluster = node.metadata[NODE_CLUSTER] as String @@ -229,15 +237,13 @@ fun main(args: Array<String>) { null } - val finish = Channel<Unit>(Channel.RENDEZVOUS) - - var submitted = 0 - var finished = 0 - val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList()) + var submitted = 0L + val finished = Channel<Unit>(Channel.RENDEZVOUS) + val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed)) while (reader.hasNext()) { val (time, workload) = reader.next() - delay(max(0, time - simulationContext.clock.millis())) submitted++ + delay(max(0, time - simulationContext.clock.millis())) launch { chan.send(Unit) val server = scheduler.deploy( @@ -247,27 +253,26 @@ fun main(args: Array<String>) { // Monitor server events server.events .onEach { - if (it is ServerEvent.StateChanged) + if (it is ServerEvent.StateChanged) { monitor.onVmStateChanged(it.server) - - // Detect whether the VM has finished running - if (it.server.state == ServerState.SHUTOFF) { - finished++ } - if (finished == submitted && !reader.hasNext()) { - finish.send(Unit) - } + finished.send(Unit) } .collect() } } - finish.receive() - scheduler.terminate() + while (scheduler.finishedVms + scheduler.unscheduledVms != submitted || reader.hasNext()) { + finished.receive() + } + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + failureDomain?.cancel() - println(simulationContext.clock.instant()) - println("${System.currentTimeMillis() - start} milliseconds") + scheduler.terminate() + reader.close() + println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds") } runBlocking { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt new file mode 100644 index 00000000..d005a157 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt @@ -0,0 +1,193 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import kotlin.math.max +import kotlin.math.min + +/** + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +fun main() { + val metaSchema = SchemaBuilder + .record("meta") + .namespace("com.atlarge.opendc.format.sc20") + .fields() + .name("id").type().stringType().noDefault() + .name("submissionTime").type().longType().noDefault() + .name("endTime").type().longType().noDefault() + .name("maxCores").type().intType().noDefault() + .name("requiredMemory").type().longType().noDefault() + .endRecord() + val schema = SchemaBuilder + .record("trace") + .namespace("com.atlarge.opendc.format.sc20") + .fields() + .name("id").type().stringType().noDefault() + .name("time").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("cores").type().intType().noDefault() + .name("cpuUsage").type().doubleType().noDefault() + .name("flops").type().longType().noDefault() + .endRecord() + + val timestampCol = 0 + val cpuUsageCol = 1 + val coreCol = 12 + val vmIdCol = 19 + val provisionedMemoryCol = 20 + val traceInterval = 5 * 60 * 1000L + + val dest = File("../traces/solvinity/small-parquet") + val traceDirectory = File("../traces/solvinity/small") + val vms = + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + + val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "meta.parquet")) + .withSchema(metaSchema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + val allFragments = mutableListOf<Fragment>() + + vms + .forEachIndexed { idx, vmFile -> + println(vmFile) + + var vmId = "" + var maxCores = -1 + var requiredMemory = -1L + var cores = -1 + var minTime = Long.MAX_VALUE + + val flopsFragments = sequence { + var last: Fragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(" ") + + vmId = vmFile.name + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + cores = values[coreCol].trim().toInt() + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + + last = if (last != null && last!!.flops == 0L && flops == 0L) { + val oldFragment = last!! + Fragment( + vmId, + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval, + cpuUsage, + cores + ) + } else { + val fragment = + Fragment(vmId, timestamp, flops, traceInterval, cpuUsage, cores) + if (last != null) { + yield(last!!) + } + fragment + } + } + } + } + + if (last != null) { + yield(last!!) + } + } + + var maxTime = Long.MIN_VALUE + flopsFragments.forEach { fragment -> + allFragments.add(fragment) + maxTime = max(maxTime, fragment.tick) + } + + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", vmId) + metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) + metaRecord.put("maxCores", maxCores) + metaRecord.put("requiredMemory", requiredMemory) + metaWriter.write(metaRecord) + } + + val writer = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "trace.parquet")) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id }) + + for (fragment in allFragments) { + val record = GenericData.Record(schema) + record.put("id", fragment.id) + record.put("time", fragment.tick) + record.put("duration", fragment.duration) + record.put("cores", fragment.cores) + record.put("cpuUsage", fragment.usage) + record.put("flops", fragment.flops) + + writer.write(record) + } + + writer.close() + metaWriter.close() +} + +data class Fragment(val id: String, val tick: Long, val flops: Long, val duration: Long, val usage: Double, val cores: Int) diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index 89a59e1c..2ef0db97 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -41,6 +41,7 @@ import com.atlarge.opendc.format.environment.EnvironmentReader import java.io.BufferedReader import java.io.File import java.io.FileReader +import java.util.Random import java.util.UUID /** @@ -67,6 +68,7 @@ class Sc20ClusterEnvironmentReader( var coresPerHost: Int val nodes = mutableListOf<SimpleBareMetalDriver>() + val random = Random(0) BufferedReader(FileReader(environmentFile)).use { reader -> reader.lineSequence() @@ -87,6 +89,7 @@ class Sc20ClusterEnvironmentReader( return@forEachIndexed } + clusterIdx++ clusterId = values[clusterIdCol].trim() speed = values[speedCol].trim().toDouble() * 1000.0 numberOfHosts = values[numberOfHostsCol].trim().toInt() @@ -100,7 +103,7 @@ class Sc20ClusterEnvironmentReader( nodes.add( SimpleBareMetalDriver( dom.newDomain("node-$clusterId-$it"), - UUID((clusterIdx++).toLong(), it.toLong()), + UUID(random.nextLong(), random.nextLong()), "node-$clusterId-$it", mapOf(NODE_CLUSTER to clusterId), List(coresPerHost) { coreId -> diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt index f9ebba3d..a653e643 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.format.trace -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import java.io.Closeable /** diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt index daa1fdf8..8562cefe 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt @@ -24,13 +24,14 @@ package com.atlarge.opendc.format.trace.sc20 -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel -import com.atlarge.opendc.core.workload.PerformanceInterferenceModelItem +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModelItem import com.atlarge.opendc.format.trace.PerformanceInterferenceModelReader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import java.io.InputStream +import java.util.TreeSet /** * A parser for the JSON performance interference setup files used for the SC20 paper. @@ -49,7 +50,7 @@ class Sc20PerformanceInterferenceReader(input: InputStream, mapper: ObjectMapper return PerformanceInterferenceModel( performanceInterferenceModel.map { item -> PerformanceInterferenceModelItem( - item.vms.toSet(), + TreeSet(item.vms), item.minServerLoad, item.performanceScore ) diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt index c40cb039..2e2159ba 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.core.User -import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader import java.io.BufferedReader @@ -37,6 +37,8 @@ import java.io.File import java.io.FileReader import java.util.UUID import kotlin.math.max +import kotlin.math.min +import kotlin.random.Random /** * A [TraceReader] for the internal VM workload trace format. @@ -47,7 +49,8 @@ import kotlin.math.max class Sc20TraceReader( traceDirectory: File, performanceInterferenceModel: PerformanceInterferenceModel, - selectedVms: List<String> + selectedVms: List<String>, + random: Random ) : TraceReader<VmWorkload> { /** * The internal iterator to use for this reader. @@ -81,10 +84,13 @@ class Sc20TraceReader( vms .forEachIndexed { idx, vmFile -> println(vmFile) - val flopsHistory = mutableListOf<FlopsHistoryFragment>() + var vmId = "" var maxCores = -1 var requiredMemory = -1L + var timestamp = -1L + var cores = -1 + var minTime = Long.MAX_VALUE BufferedReader(FileReader(vmFile)).use { reader -> reader.lineSequence() @@ -96,54 +102,82 @@ class Sc20TraceReader( val values = line.split(" ") vmId = vmFile.name - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - val cores = values[coreCol].trim().toInt() - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + cores = values[coreCol].trim().toInt() requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + } + } - val flops: Long = (cpuUsage * 5 * 60).toLong() - - if (flopsHistory.isEmpty()) { - flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) - } else { - // Restrict merging to empty fragments for now - if (flopsHistory.last().flops == 0L && flops == 0L) { - val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1) - flopsHistory.add( + val flopsFragments = sequence { + var last: FlopsHistoryFragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + // val res = ArrayList<FlopsHistoryFragment>(lines.size) + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(" ") + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + + last = if (last != null && last!!.flops == 0L && flops == 0L) { + val oldFragment = last!! FlopsHistoryFragment( oldFragment.tick, oldFragment.flops + flops, oldFragment.duration + traceInterval, cpuUsage, - cores) - ) - } else { - flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) + cores + ) + } else { + val fragment = + FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores) + if (last != null) { + yield(last!!) + } + fragment + } } + // yieldAll(res) } + + if (last != null) { + yield(last!!) } + } } val uuid = UUID(0, idx.toLong()) - val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet() - ) - + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet(), + Random(random.nextInt()) + ) val vmWorkload = VmWorkload( uuid, "VM Workload $vmId", UnnamedUser, VmImage( uuid, vmId, mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - flopsHistory, + flopsFragments.asSequence(), maxCores, requiredMemory ) ) entries[uuid] = TraceEntryImpl( - flopsHistory.firstOrNull()?.tick ?: -1, + minTime, vmWorkload ) } diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt index fbe77654..fe1049d9 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt @@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.core.User -import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader import java.io.BufferedReader @@ -123,9 +123,10 @@ class VmTraceReader( val uuid = UUID(0L, vmId) - val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSet() - ) + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSet() + ) val vmWorkload = VmWorkload( uuid, "VM Workload $vmId", UnnamedUser, @@ -133,7 +134,7 @@ class VmTraceReader( uuid, vmId.toString(), mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - flopsHistory, + flopsHistory.asSequence(), cores, requiredMemory ) |
