From b85ef3f0a27b88515c7d4d23e3c569ecb55f529c Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 19 May 2020 11:00:40 +0200 Subject: Fix issues in Azure trace reader --- .../experiments/sc20/trace/Sc20TraceConverter.kt | 33 ++++++++++++++++++---- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index e28a497d..dc0a1ee4 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -43,7 +43,7 @@ import kotlin.math.min */ fun main(args: Array) { if (args.size < 2) { - println("error: expected ") + println("error: expected ") return } @@ -71,15 +71,24 @@ fun main(args: Array) { val dest = File(args[0]) val traceDirectory = File(args[1]) + val metaParquet = File(dest.absolutePath, "meta.parquet") + val traceParquet = File(dest.absolutePath, "trace.parquet") - val metaWriter = AvroParquetWriter.builder(Path(dest.absolutePath, "meta.parquet")) + if (metaParquet.exists()) { + metaParquet.delete() + } + if (traceParquet.exists()) { + traceParquet.delete() + } + + val metaWriter = AvroParquetWriter.builder(Path(metaParquet.toURI())) .withSchema(metaSchema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withPageSize(4 * 1024 * 1024) // For compression .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) .build() - val writer = AvroParquetWriter.builder(Path(dest.absolutePath, "trace.parquet")) + val writer = AvroParquetWriter.builder(Path(traceParquet.toURI())) .withSchema(schema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withPageSize(4 * 1024 * 1024) // For compression @@ -87,12 +96,14 @@ fun main(args: Array) { .build() val traceType = args[2] + val startTime = System.currentTimeMillis() val allFragments = if (traceType == "solvinity") { readSolvinityTrace(traceDirectory, metaSchema, metaWriter) } else { readAzureTrace(traceDirectory, metaSchema, metaWriter) } allFragments.sortWith(compareBy { it.tick }.thenBy { it.id }) + println("Reading trace took ${(System.currentTimeMillis() - startTime) / 1000} seconds") for (fragment in allFragments) { val record = GenericData.Record(schema) @@ -240,7 +251,7 @@ fun readAzureTrace( metaWriter: ParquetWriter ): MutableList { val random = Random(0) - val fraction = 0.01 + val fraction = 0.001 // Read VM table val vmIdTableCol = 0 @@ -270,12 +281,17 @@ fun readAzureTrace( val values = line.split(",") + // Exclude VMs with a large number of cores (not specified exactly) + if (values[coreTableCol].contains(">")) { + continue + } + vmId = values[vmIdTableCol].trim() cores = values[coreTableCol].trim().toInt() requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB vmIds.add(vmId) - vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, -1L, Long.MAX_VALUE) + vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L) } } } @@ -298,7 +314,6 @@ fun readAzureTrace( var timestamp: Long var vmId: String var cpuUsage: Double - var minTime = Long.MAX_VALUE BufferedReader(FileReader(readingsFile)).use { reader -> reader.lineSequence() @@ -362,15 +377,21 @@ fun readAzureTrace( for (entry in vmIdToLastFragment) { if (entry.value != null) { + if (vmIdToFragments[entry.key] == null) { + vmIdToFragments[entry.key] = mutableListOf() + } vmIdToFragments[entry.key]!!.add(entry.value!!) } } + println("Read ${vmIdToLastFragment.size} VMs") + for (entry in vmIdToMetadata) { val metaRecord = GenericData.Record(metaSchema) metaRecord.put("id", entry.key) metaRecord.put("submissionTime", entry.value.minTime) metaRecord.put("endTime", entry.value.maxTime) + println("${entry.value.minTime} - ${entry.value.maxTime}") metaRecord.put("maxCores", entry.value.cores) metaRecord.put("requiredMemory", entry.value.requiredMemory) metaWriter.write(metaRecord) -- cgit v1.2.3