summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorgios Andreadis <info@gandreadis.com>2020-05-19 11:00:40 +0200
committerGeorgios Andreadis <info@gandreadis.com>2020-05-20 16:02:42 +0200
commitb85ef3f0a27b88515c7d4d23e3c569ecb55f529c (patch)
treed0f1dfaada8abcf7a88c86dcf490b8ea62fac1d1
parent5e2e040cecddd3c9c048b0992d4635ed0515f490 (diff)
Fix issues in Azure trace reader
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt33
1 files changed, 27 insertions, 6 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
index e28a497d..dc0a1ee4 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
@@ -43,7 +43,7 @@ import kotlin.math.min
*/
fun main(args: Array<String>) {
if (args.size < 2) {
- println("error: expected <INPUT> <OUTPUT>")
+ println("error: expected <OUTPUT> <INPUT> <TRACE-TYPE>")
return
}
@@ -71,15 +71,24 @@ fun main(args: Array<String>) {
val dest = File(args[0])
val traceDirectory = File(args[1])
+ val metaParquet = File(dest.absolutePath, "meta.parquet")
+ val traceParquet = File(dest.absolutePath, "trace.parquet")
- val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "meta.parquet"))
+ if (metaParquet.exists()) {
+ metaParquet.delete()
+ }
+ if (traceParquet.exists()) {
+ traceParquet.delete()
+ }
+
+ val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(metaParquet.toURI()))
.withSchema(metaSchema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withPageSize(4 * 1024 * 1024) // For compression
.withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
.build()
- val writer = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "trace.parquet"))
+ val writer = AvroParquetWriter.builder<GenericData.Record>(Path(traceParquet.toURI()))
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withPageSize(4 * 1024 * 1024) // For compression
@@ -87,12 +96,14 @@ fun main(args: Array<String>) {
.build()
val traceType = args[2]
+ val startTime = System.currentTimeMillis()
val allFragments = if (traceType == "solvinity") {
readSolvinityTrace(traceDirectory, metaSchema, metaWriter)
} else {
readAzureTrace(traceDirectory, metaSchema, metaWriter)
}
allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id })
+ println("Reading trace took ${(System.currentTimeMillis() - startTime) / 1000} seconds")
for (fragment in allFragments) {
val record = GenericData.Record(schema)
@@ -240,7 +251,7 @@ fun readAzureTrace(
metaWriter: ParquetWriter<GenericData.Record>
): MutableList<Fragment> {
val random = Random(0)
- val fraction = 0.01
+ val fraction = 0.001
// Read VM table
val vmIdTableCol = 0
@@ -270,12 +281,17 @@ fun readAzureTrace(
val values = line.split(",")
+ // Exclude VMs with a large number of cores (not specified exactly)
+ if (values[coreTableCol].contains(">")) {
+ continue
+ }
+
vmId = values[vmIdTableCol].trim()
cores = values[coreTableCol].trim().toInt()
requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB
vmIds.add(vmId)
- vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, -1L, Long.MAX_VALUE)
+ vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L)
}
}
}
@@ -298,7 +314,6 @@ fun readAzureTrace(
var timestamp: Long
var vmId: String
var cpuUsage: Double
- var minTime = Long.MAX_VALUE
BufferedReader(FileReader(readingsFile)).use { reader ->
reader.lineSequence()
@@ -362,15 +377,21 @@ fun readAzureTrace(
for (entry in vmIdToLastFragment) {
if (entry.value != null) {
+ if (vmIdToFragments[entry.key] == null) {
+ vmIdToFragments[entry.key] = mutableListOf()
+ }
vmIdToFragments[entry.key]!!.add(entry.value!!)
}
}
+ println("Read ${vmIdToLastFragment.size} VMs")
+
for (entry in vmIdToMetadata) {
val metaRecord = GenericData.Record(metaSchema)
metaRecord.put("id", entry.key)
metaRecord.put("submissionTime", entry.value.minTime)
metaRecord.put("endTime", entry.value.maxTime)
+ println("${entry.value.minTime} - ${entry.value.maxTime}")
metaRecord.put("maxCores", entry.value.cores)
metaRecord.put("requiredMemory", entry.value.requiredMemory)
metaWriter.write(metaRecord)