diff options
| author | Georgios Andreadis <info@gandreadis.com> | 2020-05-14 18:05:18 +0200 |
|---|---|---|
| committer | Georgios Andreadis <info@gandreadis.com> | 2020-05-20 16:02:42 +0200 |
| commit | 5e2e040cecddd3c9c048b0992d4635ed0515f490 (patch) | |
| tree | 026caf34cf42963b2426e097800fb145aaedf101 /opendc/opendc-experiments-sc20/src | |
| parent | 70ad01d793f88b1bef7d7988d24bff384ddbb3b9 (diff) | |
Add Azure trace reader
Diffstat (limited to 'opendc/opendc-experiments-sc20/src')
| -rw-r--r-- | opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt | 242 |
1 files changed, 210 insertions, 32 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index 04cdd302..e28a497d 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -24,14 +24,17 @@ package com.atlarge.opendc.experiments.sc20.trace +import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import java.io.BufferedReader import java.io.File import java.io.FileReader +import java.util.Random import kotlin.math.max import kotlin.math.min @@ -66,20 +69,8 @@ fun main(args: Array<String>) { .name("flops").type().longType().noDefault() .endRecord() - val timestampCol = 0 - val cpuUsageCol = 1 - val coreCol = 12 - val vmIdCol = 19 - val provisionedMemoryCol = 20 - val traceInterval = 5 * 60 * 1000L - val dest = File(args[0]) val traceDirectory = File(args[1]) - val vms = - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "meta.parquet")) .withSchema(metaSchema) @@ -88,9 +79,68 @@ fun main(args: Array<String>) { .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) .build() + val writer = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "trace.parquet")) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + val traceType = args[2] + val allFragments = if (traceType == "solvinity") { + readSolvinityTrace(traceDirectory, metaSchema, metaWriter) + } else { + readAzureTrace(traceDirectory, metaSchema, metaWriter) + } + allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id }) + + for (fragment in allFragments) { + val record = GenericData.Record(schema) + record.put("id", fragment.id) + record.put("time", fragment.tick) + record.put("duration", fragment.duration) + record.put("cores", fragment.cores) + record.put("cpuUsage", fragment.usage) + record.put("flops", fragment.flops) + + writer.write(record) + } + + writer.close() + metaWriter.close() +} + +data class Fragment( + val id: String, + val tick: Long, + val flops: Long, + val duration: Long, + val usage: Double, + val cores: Int +) + +/** + * Reads the confidential Solvinity trace. + */ +fun readSolvinityTrace( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter<GenericData.Record> +): MutableList<Fragment> { + val timestampCol = 0 + val cpuUsageCol = 1 + val coreCol = 12 + val vmIdCol = 19 + val provisionedMemoryCol = 20 + val traceInterval = 5 * 60 * 1000L + val allFragments = mutableListOf<Fragment>() - vms + + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() .forEachIndexed { idx, vmFile -> println(vmFile) @@ -176,29 +226,157 @@ fun main(args: Array<String>) { metaWriter.write(metaRecord) } - val writer = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "trace.parquet")) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() + return allFragments +} - allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id }) +/** + * Reads the Azure cloud trace. + * + * See https://github.com/Azure/AzurePublicDataset/ for a definition of the trace. + */ +fun readAzureTrace( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter<GenericData.Record> +): MutableList<Fragment> { + val random = Random(0) + val fraction = 0.01 - for (fragment in allFragments) { - val record = GenericData.Record(schema) - record.put("id", fragment.id) - record.put("time", fragment.tick) - record.put("duration", fragment.duration) - record.put("cores", fragment.cores) - record.put("cpuUsage", fragment.usage) - record.put("flops", fragment.flops) + // Read VM table + val vmIdTableCol = 0 + val coreTableCol = 9 + val provisionedMemoryTableCol = 10 - writer.write(record) + var vmId: String + var cores: Int + var requiredMemory: Long + + val vmIds = mutableSetOf<String>() + val vmIdToMetadata = mutableMapOf<String, VmInfo>() + + BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader -> + reader.lineSequence() + .chunked(1024) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + // Sample only a fraction of the VMs + if (random.nextDouble() > fraction) { + continue + } + + val values = line.split(",") + + vmId = values[vmIdTableCol].trim() + cores = values[coreTableCol].trim().toInt() + requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB + + vmIds.add(vmId) + vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, -1L, Long.MAX_VALUE) + } + } } - writer.close() - metaWriter.close() + // Read VM metric reading files + val timestampCol = 0 + val vmIdCol = 1 + val cpuUsageCol = 4 + val traceInterval = 5 * 60 * 1000L + + val vmIdToFragments = mutableMapOf<String, MutableList<Fragment>>() + val vmIdToLastFragment = mutableMapOf<String, Fragment?>() + val allFragments = mutableListOf<Fragment>() + + File(traceDirectory, "readings").walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" } + .toList() + .forEach { readingsFile -> + var timestamp: Long + var vmId: String + var cpuUsage: Double + var minTime = Long.MAX_VALUE + + BufferedReader(FileReader(readingsFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(",") + vmId = values[vmIdCol].trim() + + // Ignore readings for VMs not in the sample + if (!vmIds.contains(vmId)) { + continue + } + + timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp) + cpuUsage = values[cpuUsageCol].trim().toDouble() * 4_000 // MHz + vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + val lastFragment = vmIdToLastFragment[vmId] + + vmIdToLastFragment[vmId] = + if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) { + Fragment( + vmId, + lastFragment.tick, + lastFragment.flops + flops, + lastFragment.duration + traceInterval, + cpuUsage, + vmIdToMetadata[vmId]!!.cores + ) + } else { + val fragment = + Fragment( + vmId, + timestamp, + flops, + traceInterval, + cpuUsage, + vmIdToMetadata[vmId]!!.cores + ) + if (lastFragment != null) { + if (vmIdToFragments[vmId] == null) { + vmIdToFragments[vmId] = mutableListOf() + } + vmIdToFragments[vmId]!!.add(lastFragment) + allFragments.add(lastFragment) + } + fragment + } + } + } + } + } + + for (entry in vmIdToLastFragment) { + if (entry.value != null) { + vmIdToFragments[entry.key]!!.add(entry.value!!) + } + } + + for (entry in vmIdToMetadata) { + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", entry.key) + metaRecord.put("submissionTime", entry.value.minTime) + metaRecord.put("endTime", entry.value.maxTime) + metaRecord.put("maxCores", entry.value.cores) + metaRecord.put("requiredMemory", entry.value.requiredMemory) + metaWriter.write(metaRecord) + } + + return allFragments } -data class Fragment(val id: String, val tick: Long, val flops: Long, val duration: Long, val usage: Double, val cores: Int) +class VmInfo(val cores: Int, val requiredMemory: Long, var minTime: Long, var maxTime: Long) |
