From 998466e611438e9f4381e5d693ef4119a3cf8905 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 00:17:18 +0200 Subject: bug: Address uid collision issue --- .../atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt | 6 +++--- .../opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt | 5 ++++- 2 files changed, 7 insertions(+), 4 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 844938db..08f04760 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -120,7 +120,7 @@ public class SimpleBareMetalDriver( /** * The internal random instance. */ - private val random = Random(0) + private val random = Random(uid.leastSignificantBits xor uid.mostSignificantBits) override suspend fun init(): Node = withContext(domain.coroutineContext) { nodeState.value @@ -134,7 +134,7 @@ public class SimpleBareMetalDriver( val events = EventFlow() val server = Server( - UUID(node.uid.leastSignificantBits xor node.uid.mostSignificantBits, random.nextLong()), + UUID(random.nextLong(), random.nextLong()), node.name, emptyMap(), flavor, @@ -151,7 +151,7 @@ public class SimpleBareMetalDriver( override suspend fun stop(): Node = withContext(domain.coroutineContext) { val node = nodeState.value - if (node.state == NodeState.SHUTOFF || node.state == NodeState.ERROR) { + if (node.state == NodeState.SHUTOFF) { return@withContext node } diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index 89a59e1c..2ef0db97 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -41,6 +41,7 @@ import com.atlarge.opendc.format.environment.EnvironmentReader import java.io.BufferedReader import java.io.File import java.io.FileReader +import java.util.Random import java.util.UUID /** @@ -67,6 +68,7 @@ class Sc20ClusterEnvironmentReader( var coresPerHost: Int val nodes = mutableListOf() + val random = Random(0) BufferedReader(FileReader(environmentFile)).use { reader -> reader.lineSequence() @@ -87,6 +89,7 @@ class Sc20ClusterEnvironmentReader( return@forEachIndexed } + clusterIdx++ clusterId = values[clusterIdCol].trim() speed = values[speedCol].trim().toDouble() * 1000.0 numberOfHosts = values[numberOfHostsCol].trim().toInt() @@ -100,7 +103,7 @@ class Sc20ClusterEnvironmentReader( nodes.add( SimpleBareMetalDriver( dom.newDomain("node-$clusterId-$it"), - UUID((clusterIdx++).toLong(), it.toLong()), + UUID(random.nextLong(), random.nextLong()), "node-$clusterId-$it", mapOf(NODE_CLUSTER to clusterId), List(coresPerHost) { coreId -> -- cgit v1.2.3 From b5fab8f707d4aeb0d045b53f571c3dc826c69570 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 00:19:06 +0200 Subject: bug: Report shutdown state of all machines --- .../src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 7e6398bb..e6c36e5d 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -22,6 +22,7 @@ class Sc20Monitor( suspend fun onVmStateChanged(server: Server) {} + suspend fun serverStateChanged(driver: VirtDriver, server: Server) { val lastServerState = lastServerStates[server] if (server.state == ServerState.SHUTOFF && lastServerState != null) { @@ -57,8 +58,6 @@ class Sc20Monitor( hostServer: Server, duration: Long = 5 * 60 * 1000L ) { - lastServerStates.remove(hostServer) - // Assume for now that the host is not virtualized and measure the current power draw val driver = hostServer.services[BareMetalDriver.Key] val usage = driver.usage.first() -- cgit v1.2.3 From ae23970faa77c89408a4e98cb9259fb53e222bd3 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 03:02:16 +0200 Subject: perf: Convert output format to parquet --- opendc/opendc-experiments-sc20/build.gradle.kts | 5 +- .../atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 58 +++++++++++++++++----- .../opendc/experiments/sc20/TestExperiment.kt | 2 +- 3 files changed, 51 insertions(+), 14 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index 28b8ae12..d23456a8 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -40,7 +40,10 @@ dependencies { implementation(kotlin("stdlib")) implementation("com.xenomachina:kotlin-argparser:2.0.7") api("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") - + implementation("org.apache.parquet:parquet-avro:1.11.0") + implementation("org.apache.hadoop:hadoop-client:3.2.1") { + exclude(group = "org.slf4j", module = "slf4j-log4j12") + } runtimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}") runtimeOnly(project(":odcsim:odcsim-engine-omega")) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index e6c36e5d..02a982dc 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -6,23 +6,44 @@ import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.metal.driver.BareMetalDriver import com.atlarge.opendc.compute.virt.driver.VirtDriver import kotlinx.coroutines.flow.first -import java.io.BufferedWriter +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName import java.io.Closeable -import java.io.FileWriter class Sc20Monitor( destination: String ) : Closeable { - private val outputFile = BufferedWriter(FileWriter(destination)) private val lastServerStates = mutableMapOf>() - - init { - outputFile.write("time,duration,requestedBurst,grantedBurst,overcommissionedBurst,interferedBurst,cpuUsage,cpuDemand,numberOfDeployedImages,server,hostState,hostUsage,powerDraw\n") - } + private val schema = SchemaBuilder + .record("slice") + .namespace("com.atlarge.opendc.experiments.sc20") + .fields() + .name("time").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("requestedBurst").type().longType().noDefault() + .name("grantedBurst").type().longType().noDefault() + .name("overcommisionedBurst").type().longType().noDefault() + .name("interferedBurst").type().longType().noDefault() + .name("cpuUsage").type().doubleType().noDefault() + .name("cpuDemand").type().doubleType().noDefault() + .name("numberOfDeployedImages").type().intType().noDefault() + .name("server").type().stringType().noDefault() + .name("hostState").type().stringType().noDefault() + .name("hostUsage").type().doubleType().noDefault() + .name("powerDraw").type().doubleType().noDefault() + .endRecord() + private val writer = AvroParquetWriter.builder(Path(destination)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() suspend fun onVmStateChanged(server: Server) {} - suspend fun serverStateChanged(driver: VirtDriver, server: Server) { val lastServerState = lastServerStates[server] if (server.state == ServerState.SHUTOFF && lastServerState != null) { @@ -63,12 +84,25 @@ class Sc20Monitor( val usage = driver.usage.first() val powerDraw = driver.powerDraw.first() - outputFile.write("$time,$duration,$requestedBurst,$grantedBurst,$overcommissionedBurst,$interferedBurst,$cpuUsage,$cpuDemand,$numberOfDeployedImages,${hostServer.uid},${hostServer.state},$usage,$powerDraw") - outputFile.newLine() + val record = GenericData.Record(schema) + record.put("time", time) + record.put("duration", duration) + record.put("requestedBurst", requestedBurst) + record.put("grantedBurst", grantedBurst) + record.put("overcommisionedBurst", overcommissionedBurst) + record.put("interferedBurst", interferedBurst) + record.put("cpuUsage", cpuUsage) + record.put("cpuDemand", cpuDemand) + record.put("numberOfDeployedImages", numberOfDeployedImages) + record.put("server", hostServer.uid) + record.put("hostState", hostServer.state) + record.put("hostUsage", usage) + record.put("powerDraw", powerDraw) + + writer.write(record) } override fun close() { - outputFile.flush() - outputFile.close() + writer.close() } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index c75bde30..f3b5061c 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -70,7 +70,7 @@ class ExperimentParameters(parser: ArgParser) { val environmentFile by parser.storing("path to the environment file") val performanceInterferenceFile by parser.storing("path to the performance interference file").default { null } val outputFile by parser.storing("path to where the output should be stored") - .default { "sc20-experiment-results.csv" } + .default { "data/results-${System.currentTimeMillis()}.parquet" } val selectedVms by parser.storing("the VMs to run") { parseVMs(this) } .default { emptyList() } val selectedVmsFile by parser.storing("path to a file containing the VMs to run") { -- cgit v1.2.3 From a77829c7a8cf300a99a695423d85f905e0209286 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 12:16:42 +0200 Subject: perf: Optimize performance interference --- .../core/workload/PerformanceInterferenceModel.kt | 129 +++++++++++++++++++++ .../atlarge/opendc/compute/virt/HypervisorEvent.kt | 1 + .../opendc/compute/virt/driver/SimpleVirtDriver.kt | 50 ++++---- .../virt/service/SimpleVirtProvisioningService.kt | 12 +- .../virt/service/VirtProvisioningService.kt | 5 - .../core/workload/PerformanceInterferenceModel.kt | 83 ------------- .../atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 16 +++ .../opendc/experiments/sc20/TestExperiment.kt | 18 ++- .../trace/PerformanceInterferenceModelReader.kt | 2 +- .../sc20/Sc20PerformanceInterferenceReader.kt | 7 +- .../opendc/format/trace/sc20/Sc20TraceReader.kt | 11 +- .../opendc/format/trace/vm/VmTraceReader.kt | 11 +- 12 files changed, 209 insertions(+), 136 deletions(-) create mode 100644 opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt delete mode 100644 opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/PerformanceInterferenceModel.kt (limited to 'opendc') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt new file mode 100644 index 00000000..ddf9bb33 --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt @@ -0,0 +1,129 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.core.workload + +import com.atlarge.opendc.compute.core.Server +import java.util.SortedSet +import java.util.TreeSet +import kotlin.random.Random + +/** + * Meta-data key for the [PerformanceInterferenceModel] of an image. + */ +const val IMAGE_PERF_INTERFERENCE_MODEL = "image:performance-interference" + +/** + * Performance Interference Model describing the variability incurred by different sets of workloads if colocated. + * + * @param items The [PerformanceInterferenceModelItem]s that make up this model. + */ +class PerformanceInterferenceModel( + items: Set, + val random: Random = Random(0) +) { + private var intersectingItems: List = emptyList() + private val comparator = Comparator { lhs, rhs -> + var cmp = lhs.performanceScore.compareTo(rhs.performanceScore) + if (cmp != 0) { + return@Comparator cmp + } + + cmp = lhs.minServerLoad.compareTo(rhs.minServerLoad) + if (cmp != 0) { + return@Comparator cmp + } + + lhs.hashCode().compareTo(rhs.hashCode()) + } + val items = TreeSet(comparator) + private val colocatedWorkloads = TreeSet() + + init { + this.items.addAll(items) + } + + fun vmStarted(server: Server) { + colocatedWorkloads.add(server.image.name) + intersectingItems = items.filter { item -> doesMatch(item) } + } + + fun vmStopped(server: Server) { + colocatedWorkloads.remove(server.image.name) + intersectingItems = items.filter { item -> doesMatch(item) } + } + + private fun doesMatch(item: PerformanceInterferenceModelItem): Boolean { + var count = 0 + for (name in item.workloadNames.subSet(colocatedWorkloads.first(), colocatedWorkloads.last() + "\u0000")) { + if (name in colocatedWorkloads) + count++ + if (count > 1) + return true + } + return false + } + + fun apply(currentServerLoad: Double): Double { + if (intersectingItems.isEmpty()) { + return 1.0 + } + val score = intersectingItems + .firstOrNull { it.minServerLoad <= currentServerLoad } + + // Apply performance penalty to (on average) only one of the VMs + return if (score != null && random.nextInt(score.workloadNames.size) == 0) { + score.performanceScore + } else { + 1.0 + } + } +} + +/** + * Model describing how a specific set of workloads causes performance variability for each workload. + * + * @param workloadNames The names of the workloads that together cause performance variability for each workload in the set. + * @param minServerLoad The minimum total server load at which this interference is activated and noticeable. + * @param performanceScore The performance score that should be applied to each workload's performance. 1 means no + * influence, <1 means that performance degrades, and >1 means that performance improves. + */ +data class PerformanceInterferenceModelItem( + val workloadNames: SortedSet, + val minServerLoad: Double, + val performanceScore: Double +) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + + other as PerformanceInterferenceModelItem + + if (workloadNames != other.workloadNames) return false + + return true + } + + override fun hashCode(): Int = workloadNames.hashCode() +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt index 7c088bc8..e7344fa6 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt @@ -26,6 +26,7 @@ package com.atlarge.opendc.compute.virt import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.virt.driver.VirtDriver +import com.atlarge.opendc.compute.virt.service.VirtProvisioningService /** * An event that is emitted by a [VirtDriver]. diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 73f9dd5c..f32f407c 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -40,8 +40,8 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry -import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope @@ -53,9 +53,6 @@ import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.select @@ -65,6 +62,7 @@ import java.util.Objects import java.util.TreeSet import java.util.UUID import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -101,15 +99,6 @@ class SimpleVirtDriver( override val events: Flow = eventFlow init { - events.filter { it is HypervisorEvent.VmsUpdated }.onEach { - val imagesRunning = vms.map { it.server.image }.toSet() - vms.forEach { - val performanceModel = - it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? - performanceModel?.computeIntersectingItems(imagesRunning) - } - }.launchIn(this) - launch { try { scheduler() @@ -140,6 +129,7 @@ class SimpleVirtDriver( ) availableMemory -= requiredMemory vms.add(VmServerContext(server, events, simulationContext.domain)) + vmStarted(server) eventFlow.emit(HypervisorEvent.VmsUpdated(this, vms.size, availableMemory)) return server } @@ -148,6 +138,22 @@ class SimpleVirtDriver( eventFlow.close() } + private fun vmStarted(server: Server) { + vms.forEach { + val performanceModel = + it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + performanceModel?.vmStarted(server) + } + } + + private fun vmStopped(server: Server) { + vms.forEach { + val performanceModel = + it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? + performanceModel?.vmStopped(server) + } + } + /** * A scheduling command processed by the scheduler. */ @@ -325,7 +331,7 @@ class SimpleVirtDriver( requests.removeAll(vmRequests) // Return vCPU `run` call: the requested burst was completed or deadline was exceeded - vm.cont?.resume(Unit) + vm.chan.send(Unit) } } @@ -395,7 +401,7 @@ class SimpleVirtDriver( private var finalized: Boolean = false lateinit var burst: LongArray var deadline: Long = 0L - var cont: CancellableContinuation? = null + val chan: Channel = Channel(Channel.CONFLATED) private var initialized: Boolean = false internal val job: Job = launch { @@ -443,6 +449,7 @@ class SimpleVirtDriver( server = server.copy(state = serverState) availableMemory += server.flavor.memorySize vms.remove(this) + vmStopped(server) eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory)) events.close() } @@ -466,15 +473,12 @@ class SimpleVirtDriver( // Wait until the burst has been run or the coroutine is cancelled try { schedulingQueue.send(SchedulerCommand.Schedule(this, requests)) - suspendCancellableCoroutine { cont = it } + chan.receive() } catch (e: CancellationException) { // Deschedule the VM - withContext(NonCancellable) { - requests.forEach { it.isCancelled = true } - schedulingQueue.send(SchedulerCommand.Interrupt) - suspendCancellableCoroutine { cont = it } - } - + requests.forEach { it.isCancelled = true } + schedulingQueue.send(SchedulerCommand.Interrupt) + chan.receive() e.assertFailure() } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 6200ad7c..5ed58fee 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -55,7 +55,10 @@ class SimpleVirtProvisioningService( */ private val activeImages: MutableSet = mutableSetOf() - override val hypervisorEvents: Flow = EventFlow() + public var submittedVms = 0L + public var queuedVms = 0L + public var runningVms = 0L + public var finishedVms = 0L /** * The allocation logic to use. @@ -87,6 +90,8 @@ class SimpleVirtProvisioningService( image: Image, flavor: Flavor ): Server = withContext(coroutineContext) { + submittedVms++ + queuedVms++ suspendCancellableCoroutine { cont -> val vmInstance = ImageView(name, image, flavor, cont) incomingImages += vmInstance @@ -145,6 +150,8 @@ class SimpleVirtProvisioningService( ) imageInstance.server = server imageInstance.continuation.resume(server) + queuedVms-- + runningVms++ activeImages += imageInstance server.events @@ -152,6 +159,9 @@ class SimpleVirtProvisioningService( when (event) { is ServerEvent.StateChanged -> { if (event.server.state == ServerState.SHUTOFF) { + runningVms-- + finishedVms++ + activeImages -= imageInstance selectedHv.provisionedCores -= server.flavor.cpuCount diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index 550048a4..695f7274 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -17,11 +17,6 @@ interface VirtProvisioningService { */ val allocationPolicy: AllocationPolicy - /** - * The events emitted by the hypervisors. - */ - public val hypervisorEvents: Flow - /** * Obtain the active hypervisors for this provisioner. */ diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/PerformanceInterferenceModel.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/PerformanceInterferenceModel.kt deleted file mode 100644 index 04056394..00000000 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/workload/PerformanceInterferenceModel.kt +++ /dev/null @@ -1,83 +0,0 @@ -package com.atlarge.opendc.core.workload - -import com.atlarge.opendc.core.resource.Resource -import kotlin.random.Random - -/** - * Meta-data key for the [PerformanceInterferenceModel] of an image. - */ -const val IMAGE_PERF_INTERFERENCE_MODEL = "image:performance-interference" - -/** - * Performance Interference Model describing the variability incurred by different sets of workloads if colocated. - * - * @param items The [PerformanceInterferenceModelItem]s that make up this model. - */ -data class PerformanceInterferenceModel( - val items: Set, - val random: Random = Random(0) -) { - private var intersectingItems: List = emptyList() - private var comparator = Comparator { lhs, rhs -> - var cmp = lhs.performanceScore.compareTo(rhs.performanceScore) - if (cmp != 0) { - return@Comparator cmp - } - - cmp = lhs.minServerLoad.compareTo(rhs.minServerLoad) - if (cmp != 0) { - return@Comparator cmp - } - - 0 - } - - fun computeIntersectingItems(colocatedWorkloads: Set) { - val colocatedWorkloadIds = colocatedWorkloads.map { it.name } - intersectingItems = items.filter { item -> - colocatedWorkloadIds.intersect(item.workloadNames).size > 1 - }.sortedWith(comparator) - } - - fun apply(currentServerLoad: Double): Double { - if (intersectingItems.isEmpty()) { - return 1.0 - } - val score = intersectingItems - .firstOrNull { it.minServerLoad <= currentServerLoad } - - // Apply performance penalty to (on average) only one of the VMs - return if (score != null && random.nextInt(score.workloadNames.size) == 0) { - score.performanceScore - } else { - 1.0 - } - } -} - -/** - * Model describing how a specific set of workloads causes performance variability for each workload. - * - * @param workloadNames The names of the workloads that together cause performance variability for each workload in the set. - * @param minServerLoad The minimum total server load at which this interference is activated and noticeable. - * @param performanceScore The performance score that should be applied to each workload's performance. 1 means no - * influence, <1 means that performance degrades, and >1 means that performance improves. - */ -data class PerformanceInterferenceModelItem( - val workloadNames: Set, - val minServerLoad: Double, - val performanceScore: Double -) { - override fun equals(other: Any?): Boolean { - if (this === other) return true - if (javaClass != other?.javaClass) return false - - other as PerformanceInterferenceModelItem - - if (workloadNames != other.workloadNames) return false - - return true - } - - override fun hashCode(): Int = workloadNames.hashCode() -} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 02a982dc..464f4fc6 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -34,6 +34,10 @@ class Sc20Monitor( .name("hostState").type().stringType().noDefault() .name("hostUsage").type().doubleType().noDefault() .name("powerDraw").type().doubleType().noDefault() + .name("totalSubmittedVms").type().longType().noDefault() + .name("totalQueuedVms").type().longType().noDefault() + .name("totalRunningVms").type().longType().noDefault() + .name("totalFinishedVms").type().longType().noDefault() .endRecord() private val writer = AvroParquetWriter.builder(Path(destination)) .withSchema(schema) @@ -58,6 +62,10 @@ class Sc20Monitor( 0.0, 0, server, + 0, + 0, + 0, + 0, duration ) } @@ -77,6 +85,10 @@ class Sc20Monitor( cpuDemand: Double, numberOfDeployedImages: Int, hostServer: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long, duration: Long = 5 * 60 * 1000L ) { // Assume for now that the host is not virtualized and measure the current power draw @@ -98,6 +110,10 @@ class Sc20Monitor( record.put("hostState", hostServer.state) record.put("hostUsage", usage) record.put("powerDraw", powerDraw) + record.put("totalSubmittedVms", submittedVms) + record.put("totalQueuedVms", queuedVms) + record.put("totalRunningVms", runningVms) + record.put("totalFinishedVms", finishedVms) writer.write(record) } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index f3b5061c..08815720 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -204,7 +204,11 @@ fun main(args: Array) { event.cpuUsage, event.cpuDemand, event.numberOfDeployedImages, - event.hostServer + event.hostServer, + scheduler.submittedVms, + scheduler.queuedVms, + scheduler.runningVms, + scheduler.finishedVms ) } } @@ -229,15 +233,14 @@ fun main(args: Array) { null } + var submitted = 0L val finish = Channel(Channel.RENDEZVOUS) - var submitted = 0 - var finished = 0 val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList()) while (reader.hasNext()) { val (time, workload) = reader.next() - delay(max(0, time - simulationContext.clock.millis())) submitted++ + delay(max(0, time - simulationContext.clock.millis())) launch { chan.send(Unit) val server = scheduler.deploy( @@ -250,12 +253,7 @@ fun main(args: Array) { if (it is ServerEvent.StateChanged) monitor.onVmStateChanged(it.server) - // Detect whether the VM has finished running - if (it.server.state == ServerState.SHUTOFF) { - finished++ - } - - if (finished == submitted && !reader.hasNext()) { + if (scheduler.submittedVms == submitted && scheduler.runningVms <= 1 && !reader.hasNext()) { finish.send(Unit) } } diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt index f9ebba3d..a653e643 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/PerformanceInterferenceModelReader.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.format.trace -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import java.io.Closeable /** diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt index daa1fdf8..8562cefe 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt @@ -24,13 +24,14 @@ package com.atlarge.opendc.format.trace.sc20 -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel -import com.atlarge.opendc.core.workload.PerformanceInterferenceModelItem +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModelItem import com.atlarge.opendc.format.trace.PerformanceInterferenceModelReader import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import java.io.InputStream +import java.util.TreeSet /** * A parser for the JSON performance interference setup files used for the SC20 paper. @@ -49,7 +50,7 @@ class Sc20PerformanceInterferenceReader(input: InputStream, mapper: ObjectMapper return PerformanceInterferenceModel( performanceInterferenceModel.map { item -> PerformanceInterferenceModelItem( - item.vms.toSet(), + TreeSet(item.vms), item.minServerLoad, item.performanceScore ) diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt index c40cb039..96daa2ce 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.core.User -import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader import java.io.BufferedReader @@ -127,9 +127,10 @@ class Sc20TraceReader( val uuid = UUID(0, idx.toLong()) - val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet() - ) + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet() + ) val vmWorkload = VmWorkload( uuid, "VM Workload $vmId", UnnamedUser, diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt index fbe77654..be9ddfaa 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt @@ -28,8 +28,8 @@ import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.core.User -import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL -import com.atlarge.opendc.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader import java.io.BufferedReader @@ -123,9 +123,10 @@ class VmTraceReader( val uuid = UUID(0L, vmId) - val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSet() - ) + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) }.toSet() + ) val vmWorkload = VmWorkload( uuid, "VM Workload $vmId", UnnamedUser, -- cgit v1.2.3 From eab4c190142f54291ed235e4e18f3a35385a541c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 16:12:25 +0200 Subject: perf: Optimize trace loading for memory usage --- .../atlarge/opendc/compute/core/image/VmImage.kt | 22 ++++--- .../core/workload/PerformanceInterferenceModel.kt | 2 +- .../atlarge/opendc/compute/virt/HypervisorEvent.kt | 1 - .../opendc/compute/virt/driver/SimpleVirtDriver.kt | 6 -- .../virt/service/SimpleVirtProvisioningService.kt | 2 - .../virt/service/VirtProvisioningService.kt | 2 - opendc/opendc-experiments-sc20/build.gradle.kts | 1 + .../atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 21 ++++--- .../opendc/experiments/sc20/TestExperiment.kt | 9 +-- .../opendc/format/trace/sc20/Sc20TraceReader.kt | 71 ++++++++++++++++------ .../opendc/format/trace/vm/VmTraceReader.kt | 2 +- 11 files changed, 82 insertions(+), 57 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index b0688f99..b37f05a7 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -14,7 +14,7 @@ class VmImage( public override val uid: UUID, public override val name: String, public override val tags: TagContainer, - public val flopsHistory: List, + public val flopsHistory: Sequence, public val maxCores: Int, public val requiredMemory: Long ) : Image { @@ -23,17 +23,19 @@ class VmImage( val clock = simulationContext.clock val job = coroutineContext[Job]!! - for (fragment in flopsHistory) { - job.ensureActive() + for (fragments in flopsHistory.chunked(1024)) { + for (fragment in fragments) { + job.ensureActive() - if (fragment.flops == 0L) { - delay(fragment.duration) - } else { - val cores = min(fragment.cores, ctx.server.flavor.cpuCount) - val burst = LongArray(cores) { fragment.flops / cores } - val usage = DoubleArray(cores) { fragment.usage / cores } + if (fragment.flops == 0L) { + delay(fragment.duration) + } else { + val cores = min(fragment.cores, ctx.server.flavor.cpuCount) + val burst = LongArray(cores) { fragment.flops / cores } + val usage = DoubleArray(cores) { fragment.usage / cores } - ctx.run(burst, usage, clock.millis() + fragment.duration) + ctx.run(burst, usage, clock.millis() + fragment.duration) + } } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt index ddf9bb33..45024a49 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt @@ -64,7 +64,7 @@ class PerformanceInterferenceModel( this.items.addAll(items) } - fun vmStarted(server: Server) { + fun vmStarted(server: Server) { colocatedWorkloads.add(server.image.name) intersectingItems = items.filter { item -> doesMatch(item) } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt index e7344fa6..7c088bc8 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorEvent.kt @@ -26,7 +26,6 @@ package com.atlarge.opendc.compute.virt import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.virt.driver.VirtDriver -import com.atlarge.opendc.compute.virt.service.VirtProvisioningService /** * An event that is emitted by a [VirtDriver]. diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index f32f407c..2c25c0fa 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -42,27 +42,21 @@ import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel -import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.Job -import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.select -import kotlinx.coroutines.suspendCancellableCoroutine -import kotlinx.coroutines.withContext import java.util.Objects import java.util.TreeSet import java.util.UUID -import kotlin.coroutines.resume -import kotlin.coroutines.suspendCoroutine import kotlin.math.ceil import kotlin.math.max import kotlin.math.min diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 5ed58fee..09036d0d 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -1,7 +1,6 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.odcsim.SimulationContext -import com.atlarge.odcsim.flow.EventFlow import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server @@ -20,7 +19,6 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt index 695f7274..2ad7df84 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/VirtProvisioningService.kt @@ -3,10 +3,8 @@ package com.atlarge.opendc.compute.virt.service import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image -import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.service.allocation.AllocationPolicy -import kotlinx.coroutines.flow.Flow /** * A service for VM provisioning on a cloud. diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index d23456a8..4b73cedd 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -32,6 +32,7 @@ plugins { application { mainClassName = "com.atlarge.opendc.experiments.sc20.TestExperimentKt" + applicationDefaultJvmArgs = listOf("-Xmx3096M") } dependencies { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 464f4fc6..7c198e56 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -25,7 +25,7 @@ class Sc20Monitor( .name("duration").type().longType().noDefault() .name("requestedBurst").type().longType().noDefault() .name("grantedBurst").type().longType().noDefault() - .name("overcommisionedBurst").type().longType().noDefault() + .name("overcommissionedBurst").type().longType().noDefault() .name("interferedBurst").type().longType().noDefault() .name("cpuUsage").type().doubleType().noDefault() .name("cpuDemand").type().doubleType().noDefault() @@ -48,7 +48,14 @@ class Sc20Monitor( suspend fun onVmStateChanged(server: Server) {} - suspend fun serverStateChanged(driver: VirtDriver, server: Server) { + suspend fun serverStateChanged( + driver: VirtDriver, + server: Server, + submittedVms: Long, + queuedVms: Long, + runningVms: Long, + finishedVms: Long + ) { val lastServerState = lastServerStates[server] if (server.state == ServerState.SHUTOFF && lastServerState != null) { val duration = simulationContext.clock.millis() - lastServerState.second @@ -62,10 +69,10 @@ class Sc20Monitor( 0.0, 0, server, - 0, - 0, - 0, - 0, + submittedVms, + queuedVms, + runningVms, + finishedVms, duration ) } @@ -101,7 +108,7 @@ class Sc20Monitor( record.put("duration", duration) record.put("requestedBurst", requestedBurst) record.put("grantedBurst", grantedBurst) - record.put("overcommisionedBurst", overcommissionedBurst) + record.put("overcommissionedBurst", overcommissionedBurst) record.put("interferedBurst", interferedBurst) record.put("cpuUsage", cpuUsage) record.put("cpuDemand", cpuDemand) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 08815720..3aef80e6 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -29,7 +29,6 @@ import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ServerEvent -import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.metal.NODE_CLUSTER import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.HypervisorEvent @@ -174,20 +173,16 @@ fun main(args: Array) { delay(10) val hypervisors = scheduler.drivers() - var availableHypervisors = hypervisors.size // Monitor hypervisor events for (hypervisor in hypervisors) { // TODO Do not expose VirtDriver directly but use Hypervisor class. - monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server) + monitor.serverStateChanged(hypervisor, (hypervisor as SimpleVirtDriver).server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) hypervisor.server.events .onEach { event -> when (event) { is ServerEvent.StateChanged -> { - monitor.serverStateChanged(hypervisor, event.server) - - if (event.server.state == ServerState.ERROR) - availableHypervisors -= 1 + monitor.serverStateChanged(hypervisor, event.server, scheduler.submittedVms, scheduler.queuedVms, scheduler.runningVms, scheduler.finishedVms) } } } diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt index 96daa2ce..d4eef029 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -37,6 +37,7 @@ import java.io.File import java.io.FileReader import java.util.UUID import kotlin.math.max +import kotlin.math.min /** * A [TraceReader] for the internal VM workload trace format. @@ -81,10 +82,13 @@ class Sc20TraceReader( vms .forEachIndexed { idx, vmFile -> println(vmFile) - val flopsHistory = mutableListOf() + var vmId = "" var maxCores = -1 var requiredMemory = -1L + var timestamp = -1L + var cores = -1 + var minTime = Long.MAX_VALUE BufferedReader(FileReader(vmFile)).use { reader -> reader.lineSequence() @@ -96,33 +100,61 @@ class Sc20TraceReader( val values = line.split(" ") vmId = vmFile.name - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - val cores = values[coreCol].trim().toInt() - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + cores = values[coreCol].trim().toInt() requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + } + } - val flops: Long = (cpuUsage * 5 * 60).toLong() - - if (flopsHistory.isEmpty()) { - flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) - } else { - // Restrict merging to empty fragments for now - if (flopsHistory.last().flops == 0L && flops == 0L) { - val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1) - flopsHistory.add( + val flopsFragments = sequence { + var last: FlopsHistoryFragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .chunked(1024) + .forEach { lines -> + val res = ArrayList(lines.size) + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(" ") + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + + last = if (last != null && last!!.flops == 0L && flops == 0L) { + val oldFragment = last!! FlopsHistoryFragment( oldFragment.tick, oldFragment.flops + flops, oldFragment.duration + traceInterval, cpuUsage, - cores) - ) - } else { - flopsHistory.add(FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores)) + cores + ) + } else { + val fragment = + FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores) + if (last != null) { + res.add(last!!) + } + fragment + } } + + yieldAll(res) } + + if (last != null) { + yield(last!!) } + } } val uuid = UUID(0, idx.toLong()) @@ -131,20 +163,19 @@ class Sc20TraceReader( PerformanceInterferenceModel( performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet() ) - val vmWorkload = VmWorkload( uuid, "VM Workload $vmId", UnnamedUser, VmImage( uuid, vmId, mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - flopsHistory, + flopsFragments, maxCores, requiredMemory ) ) entries[uuid] = TraceEntryImpl( - flopsHistory.firstOrNull()?.tick ?: -1, + minTime, vmWorkload ) } diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt index be9ddfaa..fe1049d9 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/vm/VmTraceReader.kt @@ -134,7 +134,7 @@ class VmTraceReader( uuid, vmId.toString(), mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - flopsHistory, + flopsHistory.asSequence(), cores, requiredMemory ) -- cgit v1.2.3 From 9a7bfac2475b1169c4aa9dee820dd30f412a39c1 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 17:46:31 +0200 Subject: feat: Add support for seeding experiments --- .../kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt | 8 +++++--- .../com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt | 7 +++++-- 2 files changed, 10 insertions(+), 5 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 3aef80e6..79e749b5 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -76,6 +76,8 @@ class ExperimentParameters(parser: ArgParser) { parseVMs(FileReader(File(this)).readText()) } .default { emptyList() } + val seed by parser.storing("the random seed") { toInt() } + .default(0) val failures by parser.flagging("-x", "--failures", help = "enable (correlated) machine failures") val allocationPolicy by parser.storing("name of VM allocation policy to use").default("core-mem") @@ -134,7 +136,7 @@ fun main(args: Array) { "active-servers-inv" to NumberOfActiveServersAllocationPolicy(true), "provisioned-cores" to ProvisionedCoresAllocationPolicy(), "provisioned-cores-inv" to ProvisionedCoresAllocationPolicy(true), - "random" to RandomAllocationPolicy() + "random" to RandomAllocationPolicy(Random(seed)) ) if (allocationPolicy !in allocationPolicies) { @@ -215,7 +217,7 @@ fun main(args: Array) { val domain = root.newDomain(name = "failures") domain.launch { chan.receive() - val random = Random(0) + val random = Random(seed) val injectors = mutableMapOf() for (node in bareMetalProvisioner.nodes()) { val cluster = node.metadata[NODE_CLUSTER] as String @@ -231,7 +233,7 @@ fun main(args: Array) { var submitted = 0L val finish = Channel(Channel.RENDEZVOUS) - val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList()) + val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed)) while (reader.hasNext()) { val (time, workload) = reader.next() submitted++ diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt index d4eef029..da678c07 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -38,6 +38,7 @@ import java.io.FileReader import java.util.UUID import kotlin.math.max import kotlin.math.min +import kotlin.random.Random /** * A [TraceReader] for the internal VM workload trace format. @@ -48,7 +49,8 @@ import kotlin.math.min class Sc20TraceReader( traceDirectory: File, performanceInterferenceModel: PerformanceInterferenceModel, - selectedVms: List + selectedVms: List, + random: Random ) : TraceReader { /** * The internal iterator to use for this reader. @@ -161,7 +163,8 @@ class Sc20TraceReader( val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet() + performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSet(), + Random(random.nextInt()) ) val vmWorkload = VmWorkload( uuid, "VM Workload $vmId", UnnamedUser, -- cgit v1.2.3 From 0a8ec3216642b0991d53baff5498916fc859009a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 15 Apr 2020 17:54:12 +0200 Subject: feat: Print settings to console --- .../kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 79e749b5..552dcb63 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -119,6 +119,14 @@ fun createFaultInjector(domain: Domain, random: Random): FaultInjector { @OptIn(ExperimentalCoroutinesApi::class) fun main(args: Array) { ArgParser(args).parseInto(::ExperimentParameters).run { + println("trace-directory: $traceDirectory") + println("environment-file: $environmentFile") + println("performance-interference-file: $performanceInterferenceFile") + println("selected-vms-file: $selectedVmsFile") + println("seed: $seed") + println("failures: $failures") + println("allocation-policy: $allocationPolicy") + val start = System.currentTimeMillis() val monitor = Sc20Monitor(outputFile) -- cgit v1.2.3 From e097aeb16d77c260126b65c7f13330076d800d52 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 16 Apr 2020 01:35:33 +0200 Subject: perf: Convert traces to Parquet format --- .../atlarge/opendc/compute/core/image/VmImage.kt | 2 +- .../opendc/compute/virt/driver/SimpleVirtDriver.kt | 18 +- opendc/opendc-experiments-sc20/build.gradle.kts | 2 +- .../experiments/sc20/Sc20ParquetTraceReader.kt | 229 +++++++++++++++++++++ .../opendc/experiments/sc20/TestExperiment.kt | 7 +- .../opendc/experiments/sc20/TraceConverter.kt | 190 +++++++++++++++++ .../opendc/format/trace/sc20/Sc20TraceReader.kt | 11 +- 7 files changed, 441 insertions(+), 18 deletions(-) create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt create mode 100644 opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt (limited to 'opendc') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index b37f05a7..e3227540 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -23,7 +23,7 @@ class VmImage( val clock = simulationContext.clock val job = coroutineContext[Job]!! - for (fragments in flopsHistory.chunked(1024)) { + for (fragments in flopsHistory.chunked(128)) { for (fragment in fragments) { job.ensureActive() diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 2c25c0fa..8a32bc43 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -57,6 +57,9 @@ import kotlinx.coroutines.selects.select import java.util.Objects import java.util.TreeSet import java.util.UUID +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine import kotlin.math.ceil import kotlin.math.max import kotlin.math.min @@ -325,7 +328,7 @@ class SimpleVirtDriver( requests.removeAll(vmRequests) // Return vCPU `run` call: the requested burst was completed or deadline was exceeded - vm.chan.send(Unit) + vm.chan?.resume(Unit) } } @@ -371,7 +374,7 @@ class SimpleVirtDriver( val vm: VmServerContext, val vcpu: ProcessingUnit, var burst: Long, - val limit: Double + var limit: Double ) { /** * The usage that was actually granted. @@ -395,7 +398,7 @@ class SimpleVirtDriver( private var finalized: Boolean = false lateinit var burst: LongArray var deadline: Long = 0L - val chan: Channel = Channel(Channel.CONFLATED) + var chan: Continuation? = null private var initialized: Boolean = false internal val job: Job = launch { @@ -452,6 +455,7 @@ class SimpleVirtDriver( require(burst.size == limit.size) { "Array dimensions do not match" } this.deadline = deadline this.burst = burst + val requests = cpus.asSequence() .take(burst.size) .mapIndexed { i, cpu -> @@ -466,13 +470,13 @@ class SimpleVirtDriver( // Wait until the burst has been run or the coroutine is cancelled try { - schedulingQueue.send(SchedulerCommand.Schedule(this, requests)) - chan.receive() + schedulingQueue.offer(SchedulerCommand.Schedule(this, requests)) + suspendCoroutine { chan = it } } catch (e: CancellationException) { // Deschedule the VM requests.forEach { it.isCancelled = true } - schedulingQueue.send(SchedulerCommand.Interrupt) - chan.receive() + schedulingQueue.offer(SchedulerCommand.Interrupt) + suspendCoroutine { chan = it } e.assertFailure() } } diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index 4b73cedd..a78ea745 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -32,7 +32,7 @@ plugins { application { mainClassName = "com.atlarge.opendc.experiments.sc20.TestExperimentKt" - applicationDefaultJvmArgs = listOf("-Xmx3096M") + applicationDefaultJvmArgs = listOf("-Xmx2200M", "-Xms1800M") } dependencies { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt new file mode 100644 index 00000000..30456204 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt @@ -0,0 +1,229 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import com.atlarge.opendc.compute.core.image.FlopsHistoryFragment +import com.atlarge.opendc.compute.core.image.VmImage +import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL +import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel +import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.core.User +import com.atlarge.opendc.format.trace.TraceEntry +import com.atlarge.opendc.format.trace.TraceReader +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetReader +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.filter2.predicate.Statistics +import org.apache.parquet.filter2.predicate.UserDefinedPredicate +import org.apache.parquet.hadoop.ParquetReader +import org.apache.parquet.io.api.Binary +import java.io.File +import java.io.Serializable +import java.util.Deque +import java.util.SortedSet +import java.util.TreeSet +import java.util.UUID +import java.util.concurrent.LinkedBlockingDeque +import kotlin.random.Random + +/** + * A [TraceReader] for the internal VM workload trace format. + * + * @param traceFile The directory of the traces. + * @param performanceInterferenceModel The performance model covering the workload in the VM trace. + */ +@OptIn(ExperimentalStdlibApi::class) +class Sc20ParquetTraceReader( + traceFile: File, + performanceInterferenceModel: PerformanceInterferenceModel, + selectedVms: List, + random: Random +) : TraceReader { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator> + + /** + * Fill the buffers of the VMs + */ + private fun pull(reader: ParquetReader, buffers: Map>) { + if (!hasNext) { + return + } + + repeat(buffers.size) { + val record = reader.read() + + if (record == null) { + hasNext = false + reader.close() + return + } + + val id = record["id"].toString() + val tick = record["time"] as Long + val duration = record["duration"] as Long + val cores = record["cores"] as Int + val cpuUsage = record["cpuUsage"] as Double + val flops = record["flops"] as Long + + val fragment = FlopsHistoryFragment( + tick, + flops, + duration, + cpuUsage, + cores + ) + + buffers[id]?.add(fragment) + } + } + + /** + * A flag to indicate whether the reader has more entries. + */ + private var hasNext: Boolean = true + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf>() + val buffers = mutableMapOf>() + + val filter = + if (selectedVms.isEmpty()) + null + else + FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms)))) + + val metaReader = AvroParquetReader.builder(Path(traceFile.absolutePath, "meta.parquet")) + .disableCompatibility() + .run { if (filter != null) withFilter(filter) else this } + .build() + + val reader = AvroParquetReader.builder(Path(traceFile.absolutePath, "trace.parquet")) + .disableCompatibility() + .run { if (filter != null) withFilter(filter) else this } + .build() + + var idx = 0 + while (true) { + val record = metaReader.read() ?: break + val id = record["id"].toString() + val submissionTime = record["submissionTime"] as Long + val maxCores = record["maxCores"] as Int + val requiredMemory = record["requiredMemory"] as Long + + println(id) + + val buffer = LinkedBlockingDeque() + buffers[id] = buffer + val fragments = sequence { + while (true) { + if (buffer.isEmpty()) { + if (hasNext) { + pull(reader, buffers) + continue + } else { + break + } + } + yield(buffer.poll()) + } + } + val uuid = UUID(0, (idx++).toLong()) + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(), + Random(random.nextInt()) + ) + val vmWorkload = VmWorkload( + uuid, "VM Workload $id", UnnamedUser, + VmImage( + uuid, + id, + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + fragments, + maxCores, + requiredMemory + ) + ) + + entries[uuid] = TraceEntryImpl( + submissionTime, + vmWorkload + ) + } + + metaReader.close() + + // Create the entry iterator + iterator = entries.values.sortedBy { it.submissionTime }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry = iterator.next() + + override fun close() {} + + private class SelectedVmFilter(val selectedVms: SortedSet) : UserDefinedPredicate(), Serializable { + override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) + + override fun canDrop(statistics: Statistics): Boolean { + val min = statistics.min + val max = statistics.max + + return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isEmpty() + } + + override fun inverseCanDrop(statistics: Statistics): Boolean { + val min = statistics.min + val max = statistics.max + + return selectedVms.subSet(min.toStringUsingUTF8(), max.toStringUsingUTF8() + "\u0000").isNotEmpty() + } + } + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: VmWorkload + ) : TraceEntry +} diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 552dcb63..19055ad3 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -44,7 +44,6 @@ import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.core.failure.FaultInjector import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader -import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import com.xenomachina.argparser.ArgParser @@ -241,7 +240,7 @@ fun main(args: Array) { var submitted = 0L val finish = Channel(Channel.RENDEZVOUS) - val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed)) + val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed)) while (reader.hasNext()) { val (time, workload) = reader.next() submitted++ @@ -267,8 +266,10 @@ fun main(args: Array) { } finish.receive() - scheduler.terminate() failureDomain?.cancel() + launch { + scheduler.terminate() + } println(simulationContext.clock.instant()) println("${System.currentTimeMillis() - start} milliseconds") } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt new file mode 100644 index 00000000..7f429b89 --- /dev/null +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt @@ -0,0 +1,190 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.experiments.sc20 + +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.io.BufferedReader +import java.io.File +import java.io.FileReader +import kotlin.math.max +import kotlin.math.min + +/** + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +fun main() { + val metaSchema = SchemaBuilder + .record("meta") + .namespace("com.atlarge.opendc.format.sc20") + .fields() + .name("id").type().stringType().noDefault() + .name("submissionTime").type().longType().noDefault() + .name("maxCores").type().intType().noDefault() + .name("requiredMemory").type().longType().noDefault() + .endRecord() + val schema = SchemaBuilder + .record("trace") + .namespace("com.atlarge.opendc.format.sc20") + .fields() + .name("id").type().stringType().noDefault() + .name("time").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("cores").type().intType().noDefault() + .name("cpuUsage").type().doubleType().noDefault() + .name("flops").type().longType().noDefault() + .endRecord() + + val timestampCol = 0 + val cpuUsageCol = 1 + val coreCol = 12 + val vmIdCol = 19 + val provisionedMemoryCol = 20 + val traceInterval = 5 * 60 * 1000L + + val dest = File("../traces/solvinity/small-parquet") + val traceDirectory = File("../traces/solvinity/small") + val vms = + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + + val metaWriter = AvroParquetWriter.builder(Path(dest.absolutePath, "meta.parquet")) + .withSchema(metaSchema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + val allFragments = mutableListOf() + + vms + .forEachIndexed { idx, vmFile -> + println(vmFile) + + var vmId = "" + var maxCores = -1 + var requiredMemory = -1L + var timestamp = -1L + var cores = -1 + var minTime = Long.MAX_VALUE + + val flopsFragments = sequence { + var last: Fragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(" ") + + vmId = vmFile.name + timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + cores = values[coreCol].trim().toInt() + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + + last = if (last != null && last!!.flops == 0L && flops == 0L) { + val oldFragment = last!! + Fragment( + vmId, + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval, + cpuUsage, + cores + ) + } else { + val fragment = + Fragment(vmId, timestamp, flops, traceInterval, cpuUsage, cores) + if (last != null) { + yield(last!!) + } + fragment + } + } + } + } + + if (last != null) { + yield(last!!) + } + } + + flopsFragments.forEach { fragment -> + allFragments.add(fragment) + } + + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", vmId) + metaRecord.put("submissionTime", minTime) + metaRecord.put("maxCores", maxCores) + metaRecord.put("requiredMemory", requiredMemory) + metaWriter.write(metaRecord) + } + + val writer = AvroParquetWriter.builder(Path(dest.absolutePath, "trace.parquet")) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + allFragments.sortWith(compareBy { it.tick }.thenBy { it.id }) + + for (fragment in allFragments) { + val record = GenericData.Record(schema) + record.put("id", fragment.id) + record.put("time", fragment.tick) + record.put("duration", fragment.duration) + record.put("cores", fragment.cores) + record.put("cpuUsage", fragment.usage) + record.put("flops", fragment.flops) + + writer.write(record) + } + + writer.close() + metaWriter.close() +} + +data class Fragment(val id: String, val tick: Long, val flops: Long, val duration: Long, val usage: Double, val cores: Int) diff --git a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt index da678c07..2e2159ba 100644 --- a/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt +++ b/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/trace/sc20/Sc20TraceReader.kt @@ -115,9 +115,9 @@ class Sc20TraceReader( BufferedReader(FileReader(vmFile)).use { reader -> reader.lineSequence() - .chunked(1024) + .chunked(128) .forEach { lines -> - val res = ArrayList(lines.size) + // val res = ArrayList(lines.size) for (line in lines) { // Ignore comments in the trace if (line.startsWith("#") || line.isBlank()) { @@ -144,13 +144,12 @@ class Sc20TraceReader( val fragment = FlopsHistoryFragment(timestamp, flops, traceInterval, cpuUsage, cores) if (last != null) { - res.add(last!!) + yield(last!!) } fragment } } - - yieldAll(res) + // yieldAll(res) } if (last != null) { @@ -172,7 +171,7 @@ class Sc20TraceReader( uuid, vmId, mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - flopsFragments, + flopsFragments.asSequence(), maxCores, requiredMemory ) -- cgit v1.2.3 From 6cd93b57945b289b2e14556f7ceaa193326eff78 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 16 Apr 2020 15:30:34 +0200 Subject: bug: Fix issues related to early termination --- .../atlarge/opendc/compute/core/image/VmImage.kt | 20 ++-- .../opendc/compute/virt/driver/SimpleVirtDriver.kt | 4 +- .../virt/service/SimpleVirtProvisioningService.kt | 30 ++++- opendc/opendc-experiments-sc20/build.gradle.kts | 2 +- .../atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 23 +++- .../experiments/sc20/Sc20ParquetTraceReader.kt | 121 ++++++++++++++------- .../opendc/experiments/sc20/TestExperiment.kt | 27 +++-- .../opendc/experiments/sc20/TraceConverter.kt | 7 +- 8 files changed, 160 insertions(+), 74 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt index e3227540..36bbfa45 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/VmImage.kt @@ -23,19 +23,17 @@ class VmImage( val clock = simulationContext.clock val job = coroutineContext[Job]!! - for (fragments in flopsHistory.chunked(128)) { - for (fragment in fragments) { - job.ensureActive() + for (fragment in flopsHistory) { + job.ensureActive() - if (fragment.flops == 0L) { - delay(fragment.duration) - } else { - val cores = min(fragment.cores, ctx.server.flavor.cpuCount) - val burst = LongArray(cores) { fragment.flops / cores } - val usage = DoubleArray(cores) { fragment.usage / cores } + if (fragment.flops == 0L) { + delay(fragment.duration) + } else { + val cores = min(fragment.cores, ctx.server.flavor.cpuCount) + val burst = LongArray(cores) { fragment.flops / cores } + val usage = DoubleArray(cores) { fragment.usage / cores } - ctx.run(burst, usage, clock.millis() + fragment.duration) - } + ctx.run(burst, usage, clock.millis() + fragment.duration) } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index 8a32bc43..53fa463b 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -243,7 +243,7 @@ class SimpleVirtDriver( } // XXX We set the minimum duration to 5 minutes here to prevent the rounding issues that are occurring with the FLOPs. - duration = max(300.0, ceil(duration)) + duration = 300.0 val totalAllocatedUsage = maxUsage - availableUsage var totalAllocatedBurst = 0L @@ -335,7 +335,7 @@ class SimpleVirtDriver( eventFlow.emit( HypervisorEvent.SliceFinished( this@SimpleVirtDriver, - totalRequestedSubBurst, + totalRequestedBurst, min(totalRequestedSubBurst, totalGrantedBurst), // We can run more than requested due to timing totalOvercommissionedBurst, totalInterferedBurst, // Might be smaller than zero due to FP rounding errors, diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index 09036d0d..520f6dc5 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext import kotlin.coroutines.Continuation import kotlin.coroutines.resume +import kotlin.math.max @OptIn(ExperimentalCoroutinesApi::class) class SimpleVirtProvisioningService( @@ -57,6 +58,10 @@ class SimpleVirtProvisioningService( public var queuedVms = 0L public var runningVms = 0L public var finishedVms = 0L + public var unscheduledVms = 0L + + private var maxCores = 0 + private var maxMemory = 0L /** * The allocation logic to use. @@ -124,15 +129,24 @@ class SimpleVirtProvisioningService( } private suspend fun schedule() { - val log = simulationContext.log + val clock = simulationContext.clock val imagesToBeScheduled = incomingImages.toSet() for (imageInstance in imagesToBeScheduled) { val requiredMemory = (imageInstance.image as VmImage).requiredMemory - val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) ?: break + val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) + + if (selectedHv == null) { + if (requiredMemory > maxMemory || imageInstance.flavor.cpuCount > maxCores) { + unscheduledVms++ + println("[${clock.millis()}] CANNOT SPAWN ${imageInstance.image}") + } + + break + } try { - log.info("Spawning ${imageInstance.image} on ${selectedHv.server}") + println("[${clock.millis()}] SPAWN ${imageInstance.image} on ${selectedHv.server.uid} ${selectedHv.server.name} ${selectedHv.server.flavor}") incomingImages -= imageInstance // Speculatively update the hypervisor view information to prevent other images in the queue from @@ -157,6 +171,7 @@ class SimpleVirtProvisioningService( when (event) { is ServerEvent.StateChanged -> { if (event.server.state == ServerState.SHUTOFF) { + println("[${clock.millis()}] FINISH ${event.server.uid} ${event.server.name} ${event.server.flavor}") runningVms-- finishedVms++ @@ -178,6 +193,8 @@ class SimpleVirtProvisioningService( selectedHv.numberOfActiveServers-- selectedHv.provisionedCores -= imageInstance.flavor.cpuCount selectedHv.availableMemory += requiredMemory + } catch (e: Throwable) { + e.printStackTrace() } } } @@ -196,13 +213,18 @@ class SimpleVirtProvisioningService( server.flavor.memorySize, 0 ) + maxCores = max(maxCores, server.flavor.cpuCount) + maxMemory = max(maxMemory, server.flavor.memorySize) hypervisors[server] = hv } } ServerState.SHUTOFF, ServerState.ERROR -> { val hv = hypervisors[server] ?: return availableHypervisors -= hv - requestCycle() + + if (incomingImages.isNotEmpty()) { + requestCycle() + } } else -> throw IllegalStateException() } diff --git a/opendc/opendc-experiments-sc20/build.gradle.kts b/opendc/opendc-experiments-sc20/build.gradle.kts index a78ea745..8611ffa7 100644 --- a/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/opendc/opendc-experiments-sc20/build.gradle.kts @@ -32,7 +32,7 @@ plugins { application { mainClassName = "com.atlarge.opendc.experiments.sc20.TestExperimentKt" - applicationDefaultJvmArgs = listOf("-Xmx2200M", "-Xms1800M") + applicationDefaultJvmArgs = listOf("-Xmx2500M", "-Xms1800M") } dependencies { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 7c198e56..bac0de21 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -12,6 +12,8 @@ import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import java.io.Closeable +import java.util.concurrent.ArrayBlockingQueue +import kotlin.concurrent.thread class Sc20Monitor( destination: String @@ -45,6 +47,19 @@ class Sc20Monitor( .withPageSize(4 * 1024 * 1024) // For compression .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) .build() + private val queue = ArrayBlockingQueue(2048) + private val writerThread = thread(start = true, name = "sc20-writer") { + try { + while (true) { + val record = queue.take() + writer.write(record) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + writer.close() + } + } suspend fun onVmStateChanged(server: Server) {} @@ -122,10 +137,14 @@ class Sc20Monitor( record.put("totalRunningVms", runningVms) record.put("totalFinishedVms", finishedVms) - writer.write(record) + queue.put(record) } override fun close() { - writer.close() + // Busy loop to wait for writer thread to finish + while (queue.isNotEmpty()) { + Thread.sleep(500) + } + writerThread.interrupt() } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt index 30456204..24ff9eed 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt @@ -39,7 +39,6 @@ import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.predicate.Statistics import org.apache.parquet.filter2.predicate.UserDefinedPredicate -import org.apache.parquet.hadoop.ParquetReader import org.apache.parquet.io.api.Binary import java.io.File import java.io.Serializable @@ -47,7 +46,9 @@ import java.util.Deque import java.util.SortedSet import java.util.TreeSet import java.util.UUID +import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.LinkedBlockingDeque +import kotlin.concurrent.thread import kotlin.random.Random /** @@ -69,37 +70,82 @@ class Sc20ParquetTraceReader( private val iterator: Iterator> /** - * Fill the buffers of the VMs + * The intermediate buffer to store the read records in. */ - private fun pull(reader: ParquetReader, buffers: Map>) { + private val queue = ArrayBlockingQueue>(128) + + /** + * An optional filter for filtering the selected VMs + */ + private val filter = + if (selectedVms.isEmpty()) + null + else + FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms)))) + + /** + * A poisonous fragment. + */ + private val poison = Pair("\u0000", FlopsHistoryFragment(0, 0, 0, 0.0, 0)) + + /** + * The thread to read the records in. + */ + private val readerThread = thread(start = true, name = "sc20-reader") { + val reader = AvroParquetReader.builder(Path(traceFile.absolutePath, "trace.parquet")) + .disableCompatibility() + .run { if (filter != null) withFilter(filter) else this } + .build() + + try { + while (true) { + val record = reader.read() + + if (record == null) { + queue.put(poison) + break + } + + val id = record["id"].toString() + val tick = record["time"] as Long + val duration = record["duration"] as Long + val cores = record["cores"] as Int + val cpuUsage = record["cpuUsage"] as Double + val flops = record["flops"] as Long + + val fragment = FlopsHistoryFragment( + tick, + flops, + duration, + cpuUsage, + cores + ) + + queue.put(id to fragment) + } + } catch (e: InterruptedException) { + // Do not rethrow this + } finally { + reader.close() + } + } + + /** + * Fill the buffers with the VMs + */ + private fun pull(buffers: Map>) { if (!hasNext) { return } - repeat(buffers.size) { - val record = reader.read() + repeat(16) { + val (id, fragment) = queue.take() - if (record == null) { + if (id == poison.first) { hasNext = false - reader.close() return } - val id = record["id"].toString() - val tick = record["time"] as Long - val duration = record["duration"] as Long - val cores = record["cores"] as Int - val cpuUsage = record["cpuUsage"] as Double - val flops = record["flops"] as Long - - val fragment = FlopsHistoryFragment( - tick, - flops, - duration, - cpuUsage, - cores - ) - buffers[id]?.add(fragment) } } @@ -116,30 +162,20 @@ class Sc20ParquetTraceReader( val entries = mutableMapOf>() val buffers = mutableMapOf>() - val filter = - if (selectedVms.isEmpty()) - null - else - FilterCompat.get(FilterApi.userDefined(FilterApi.binaryColumn("id"), SelectedVmFilter(TreeSet(selectedVms)))) - val metaReader = AvroParquetReader.builder(Path(traceFile.absolutePath, "meta.parquet")) .disableCompatibility() .run { if (filter != null) withFilter(filter) else this } .build() - val reader = AvroParquetReader.builder(Path(traceFile.absolutePath, "trace.parquet")) - .disableCompatibility() - .run { if (filter != null) withFilter(filter) else this } - .build() - var idx = 0 while (true) { val record = metaReader.read() ?: break val id = record["id"].toString() val submissionTime = record["submissionTime"] as Long + val endTime = record["endTime"] as Long val maxCores = record["maxCores"] as Int val requiredMemory = record["requiredMemory"] as Long - + val uuid = UUID(0, (idx++).toLong()) println(id) val buffer = LinkedBlockingDeque() @@ -148,16 +184,23 @@ class Sc20ParquetTraceReader( while (true) { if (buffer.isEmpty()) { if (hasNext) { - pull(reader, buffers) + pull(buffers) continue } else { break } } - yield(buffer.poll()) + + val fragment = buffer.poll() + yield(fragment) + + if (fragment.tick >= endTime) { + break + } } + + buffers.remove(id) } - val uuid = UUID(0, (idx++).toLong()) val relevantPerformanceInterferenceModelItems = PerformanceInterferenceModel( performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(), @@ -191,7 +234,9 @@ class Sc20ParquetTraceReader( override fun next(): TraceEntry = iterator.next() - override fun close() {} + override fun close() { + readerThread.interrupt() + } private class SelectedVmFilter(val selectedVms: SortedSet) : UserDefinedPredicate(), Serializable { override fun keep(value: Binary?): Boolean = value != null && selectedVms.contains(value.toStringUsingUTF8()) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 19055ad3..8478b592 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -237,13 +237,10 @@ fun main(args: Array) { null } - var submitted = 0L - val finish = Channel(Channel.RENDEZVOUS) - + val finished = Channel(Channel.RENDEZVOUS) val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed)) while (reader.hasNext()) { val (time, workload) = reader.next() - submitted++ delay(max(0, time - simulationContext.clock.millis())) launch { chan.send(Unit) @@ -254,24 +251,26 @@ fun main(args: Array) { // Monitor server events server.events .onEach { - if (it is ServerEvent.StateChanged) + if (it is ServerEvent.StateChanged) { monitor.onVmStateChanged(it.server) - - if (scheduler.submittedVms == submitted && scheduler.runningVms <= 1 && !reader.hasNext()) { - finish.send(Unit) } + + finished.send(Unit) } .collect() } } - finish.receive() - failureDomain?.cancel() - launch { - scheduler.terminate() + while (scheduler.finishedVms + scheduler.unscheduledVms != scheduler.submittedVms || reader.hasNext()) { + finished.receive() } - println(simulationContext.clock.instant()) - println("${System.currentTimeMillis() - start} milliseconds") + + println("Finish SUBMIT=${scheduler.submittedVms} FAIL=${scheduler.unscheduledVms} QUEUE=${scheduler.queuedVms} RUNNING=${scheduler.runningVms} FINISH=${scheduler.finishedVms}") + + failureDomain?.cancel() + scheduler.terminate() + reader.close() + println("[${simulationContext.clock.millis()}] DONE ${System.currentTimeMillis() - start} milliseconds") } runBlocking { diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt index 7f429b89..d005a157 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TraceConverter.kt @@ -46,6 +46,7 @@ fun main() { .fields() .name("id").type().stringType().noDefault() .name("submissionTime").type().longType().noDefault() + .name("endTime").type().longType().noDefault() .name("maxCores").type().intType().noDefault() .name("requiredMemory").type().longType().noDefault() .endRecord() @@ -92,7 +93,6 @@ fun main() { var vmId = "" var maxCores = -1 var requiredMemory = -1L - var timestamp = -1L var cores = -1 var minTime = Long.MAX_VALUE @@ -112,7 +112,7 @@ fun main() { val values = line.split(" ") vmId = vmFile.name - timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L cores = values[coreCol].trim().toInt() requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) maxCores = max(maxCores, cores) @@ -150,13 +150,16 @@ fun main() { } } + var maxTime = Long.MIN_VALUE flopsFragments.forEach { fragment -> allFragments.add(fragment) + maxTime = max(maxTime, fragment.tick) } val metaRecord = GenericData.Record(metaSchema) metaRecord.put("id", vmId) metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) metaRecord.put("maxCores", maxCores) metaRecord.put("requiredMemory", requiredMemory) metaWriter.write(metaRecord) -- cgit v1.2.3 From 3e056406616860c77168d827f1ca9d8d3c79c08e Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Apr 2020 12:13:09 +0200 Subject: perf: Minor tweaks in trace fetching --- .../experiments/sc20/Sc20ParquetTraceReader.kt | 138 ++++++++++++--------- .../opendc/experiments/sc20/TestExperiment.kt | 4 +- 2 files changed, 80 insertions(+), 62 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt index 24ff9eed..0a7718e9 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20ParquetTraceReader.kt @@ -42,12 +42,10 @@ import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary import java.io.File import java.io.Serializable -import java.util.Deque import java.util.SortedSet import java.util.TreeSet import java.util.UUID import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.LinkedBlockingDeque import kotlin.concurrent.thread import kotlin.random.Random @@ -72,7 +70,7 @@ class Sc20ParquetTraceReader( /** * The intermediate buffer to store the read records in. */ - private val queue = ArrayBlockingQueue>(128) + private val queue = ArrayBlockingQueue>(1024) /** * An optional filter for filtering the selected VMs @@ -133,20 +131,20 @@ class Sc20ParquetTraceReader( /** * Fill the buffers with the VMs */ - private fun pull(buffers: Map>) { + private fun pull(buffers: Map>>) { if (!hasNext) { return } - repeat(16) { - val (id, fragment) = queue.take() + val fragments = mutableListOf>() + queue.drainTo(fragments) + for ((id, fragment) in fragments) { if (id == poison.first) { hasNext = false return } - - buffers[id]?.add(fragment) + buffers[id]?.forEach { it.add(fragment) } } } @@ -159,75 +157,93 @@ class Sc20ParquetTraceReader( * Initialize the reader. */ init { - val entries = mutableMapOf>() - val buffers = mutableMapOf>() + val takenIds = mutableSetOf() + val entries = mutableMapOf() + val buffers = mutableMapOf>>() val metaReader = AvroParquetReader.builder(Path(traceFile.absolutePath, "meta.parquet")) .disableCompatibility() .run { if (filter != null) withFilter(filter) else this } .build() - var idx = 0 while (true) { val record = metaReader.read() ?: break val id = record["id"].toString() - val submissionTime = record["submissionTime"] as Long - val endTime = record["endTime"] as Long - val maxCores = record["maxCores"] as Int - val requiredMemory = record["requiredMemory"] as Long - val uuid = UUID(0, (idx++).toLong()) - println(id) - - val buffer = LinkedBlockingDeque() - buffers[id] = buffer - val fragments = sequence { - while (true) { - if (buffer.isEmpty()) { - if (hasNext) { - pull(buffers) - continue - } else { - break + entries[id] = record + } + + metaReader.close() + + val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms + + // Create the entry iterator + iterator = selection.asSequence() + .mapNotNull { entries[it] } + .mapIndexed { index, record -> + val id = record["id"].toString() + val submissionTime = record["submissionTime"] as Long + val endTime = record["endTime"] as Long + val maxCores = record["maxCores"] as Int + val requiredMemory = record["requiredMemory"] as Long + val uid = UUID.nameUUIDFromBytes("$id-$index".toByteArray()) + + assert(uid !in takenIds) + takenIds += uid + + println(id) + + val internalBuffer = mutableListOf() + val externalBuffer = mutableListOf() + buffers.getOrPut(id) { mutableListOf() }.add(externalBuffer) + val fragments = sequence { + repeat@while (true) { + if (externalBuffer.isEmpty()) { + if (hasNext) { + pull(buffers) + continue + } else { + break + } } - } - val fragment = buffer.poll() - yield(fragment) + internalBuffer.addAll(externalBuffer) + externalBuffer.clear() - if (fragment.tick >= endTime) { - break - } - } + for (fragment in internalBuffer) { + yield(fragment) - buffers.remove(id) - } - val relevantPerformanceInterferenceModelItems = - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(), - Random(random.nextInt()) - ) - val vmWorkload = VmWorkload( - uuid, "VM Workload $id", UnnamedUser, - VmImage( - uuid, - id, - mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), - fragments, - maxCores, - requiredMemory - ) - ) + if (fragment.tick >= endTime) { + break@repeat + } + } - entries[uuid] = TraceEntryImpl( - submissionTime, - vmWorkload - ) - } + internalBuffer.clear() + } - metaReader.close() + buffers.remove(id) + } + val relevantPerformanceInterferenceModelItems = + PerformanceInterferenceModel( + performanceInterferenceModel.items.filter { it.workloadNames.contains(id) }.toSet(), + Random(random.nextInt()) + ) + val vmWorkload = VmWorkload( + uid, "VM Workload $id", UnnamedUser, + VmImage( + uid, + id, + mapOf(IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems), + fragments, + maxCores, + requiredMemory + ) + ) - // Create the entry iterator - iterator = entries.values.sortedBy { it.submissionTime }.iterator() + TraceEntryImpl(submissionTime, vmWorkload) + } + .sortedBy { it.submissionTime } + .toList() + .iterator() } override fun hasNext(): Boolean = iterator.hasNext() diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index 8478b592..028cfb9a 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -237,10 +237,12 @@ fun main(args: Array) { null } + var submitted = 0L val finished = Channel(Channel.RENDEZVOUS) val reader = Sc20ParquetTraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList(), Random(seed)) while (reader.hasNext()) { val (time, workload) = reader.next() + submitted++ delay(max(0, time - simulationContext.clock.millis())) launch { chan.send(Unit) @@ -261,7 +263,7 @@ fun main(args: Array) { } } - while (scheduler.finishedVms + scheduler.unscheduledVms != scheduler.submittedVms || reader.hasNext()) { + while (scheduler.finishedVms + scheduler.unscheduledVms != submitted || reader.hasNext()) { finished.receive() } -- cgit v1.2.3