summaryrefslogtreecommitdiff
path: root/opendc/opendc-experiments-sc20/src
diff options
context:
space:
mode:
authorGeorgios Andreadis <info@gandreadis.com>2020-05-14 18:05:18 +0200
committerGeorgios Andreadis <info@gandreadis.com>2020-05-20 16:02:42 +0200
commit5e2e040cecddd3c9c048b0992d4635ed0515f490 (patch)
tree026caf34cf42963b2426e097800fb145aaedf101 /opendc/opendc-experiments-sc20/src
parent70ad01d793f88b1bef7d7988d24bff384ddbb3b9 (diff)
Add Azure trace reader
Diffstat (limited to 'opendc/opendc-experiments-sc20/src')
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt242
1 files changed, 210 insertions, 32 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
index 04cdd302..e28a497d 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
@@ -24,14 +24,17 @@
package com.atlarge.opendc.experiments.sc20.trace
+import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import java.io.BufferedReader
import java.io.File
import java.io.FileReader
+import java.util.Random
import kotlin.math.max
import kotlin.math.min
@@ -66,20 +69,8 @@ fun main(args: Array<String>) {
.name("flops").type().longType().noDefault()
.endRecord()
- val timestampCol = 0
- val cpuUsageCol = 1
- val coreCol = 12
- val vmIdCol = 19
- val provisionedMemoryCol = 20
- val traceInterval = 5 * 60 * 1000L
-
val dest = File(args[0])
val traceDirectory = File(args[1])
- val vms =
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "meta.parquet"))
.withSchema(metaSchema)
@@ -88,9 +79,68 @@ fun main(args: Array<String>) {
.withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
.build()
+ val writer = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "trace.parquet"))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ val traceType = args[2]
+ val allFragments = if (traceType == "solvinity") {
+ readSolvinityTrace(traceDirectory, metaSchema, metaWriter)
+ } else {
+ readAzureTrace(traceDirectory, metaSchema, metaWriter)
+ }
+ allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id })
+
+ for (fragment in allFragments) {
+ val record = GenericData.Record(schema)
+ record.put("id", fragment.id)
+ record.put("time", fragment.tick)
+ record.put("duration", fragment.duration)
+ record.put("cores", fragment.cores)
+ record.put("cpuUsage", fragment.usage)
+ record.put("flops", fragment.flops)
+
+ writer.write(record)
+ }
+
+ writer.close()
+ metaWriter.close()
+}
+
+data class Fragment(
+ val id: String,
+ val tick: Long,
+ val flops: Long,
+ val duration: Long,
+ val usage: Double,
+ val cores: Int
+)
+
+/**
+ * Reads the confidential Solvinity trace.
+ */
+fun readSolvinityTrace(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+): MutableList<Fragment> {
+ val timestampCol = 0
+ val cpuUsageCol = 1
+ val coreCol = 12
+ val vmIdCol = 19
+ val provisionedMemoryCol = 20
+ val traceInterval = 5 * 60 * 1000L
+
val allFragments = mutableListOf<Fragment>()
- vms
+
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
.forEachIndexed { idx, vmFile ->
println(vmFile)
@@ -176,29 +226,157 @@ fun main(args: Array<String>) {
metaWriter.write(metaRecord)
}
- val writer = AvroParquetWriter.builder<GenericData.Record>(Path(dest.absolutePath, "trace.parquet"))
- .withSchema(schema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
- .build()
+ return allFragments
+}
- allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id })
+/**
+ * Reads the Azure cloud trace.
+ *
+ * See https://github.com/Azure/AzurePublicDataset/ for a definition of the trace.
+ */
+fun readAzureTrace(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+): MutableList<Fragment> {
+ val random = Random(0)
+ val fraction = 0.01
- for (fragment in allFragments) {
- val record = GenericData.Record(schema)
- record.put("id", fragment.id)
- record.put("time", fragment.tick)
- record.put("duration", fragment.duration)
- record.put("cores", fragment.cores)
- record.put("cpuUsage", fragment.usage)
- record.put("flops", fragment.flops)
+ // Read VM table
+ val vmIdTableCol = 0
+ val coreTableCol = 9
+ val provisionedMemoryTableCol = 10
- writer.write(record)
+ var vmId: String
+ var cores: Int
+ var requiredMemory: Long
+
+ val vmIds = mutableSetOf<String>()
+ val vmIdToMetadata = mutableMapOf<String, VmInfo>()
+
+ BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader ->
+ reader.lineSequence()
+ .chunked(1024)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+ // Sample only a fraction of the VMs
+ if (random.nextDouble() > fraction) {
+ continue
+ }
+
+ val values = line.split(",")
+
+ vmId = values[vmIdTableCol].trim()
+ cores = values[coreTableCol].trim().toInt()
+ requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB
+
+ vmIds.add(vmId)
+ vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, -1L, Long.MAX_VALUE)
+ }
+ }
}
- writer.close()
- metaWriter.close()
+ // Read VM metric reading files
+ val timestampCol = 0
+ val vmIdCol = 1
+ val cpuUsageCol = 4
+ val traceInterval = 5 * 60 * 1000L
+
+ val vmIdToFragments = mutableMapOf<String, MutableList<Fragment>>()
+ val vmIdToLastFragment = mutableMapOf<String, Fragment?>()
+ val allFragments = mutableListOf<Fragment>()
+
+ File(traceDirectory, "readings").walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" }
+ .toList()
+ .forEach { readingsFile ->
+ var timestamp: Long
+ var vmId: String
+ var cpuUsage: Double
+ var minTime = Long.MAX_VALUE
+
+ BufferedReader(FileReader(readingsFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split(",")
+ vmId = values[vmIdCol].trim()
+
+ // Ignore readings for VMs not in the sample
+ if (!vmIds.contains(vmId)) {
+ continue
+ }
+
+ timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+ vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp)
+ cpuUsage = values[cpuUsageCol].trim().toDouble() * 4_000 // MHz
+ vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp)
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+ val lastFragment = vmIdToLastFragment[vmId]
+
+ vmIdToLastFragment[vmId] =
+ if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) {
+ Fragment(
+ vmId,
+ lastFragment.tick,
+ lastFragment.flops + flops,
+ lastFragment.duration + traceInterval,
+ cpuUsage,
+ vmIdToMetadata[vmId]!!.cores
+ )
+ } else {
+ val fragment =
+ Fragment(
+ vmId,
+ timestamp,
+ flops,
+ traceInterval,
+ cpuUsage,
+ vmIdToMetadata[vmId]!!.cores
+ )
+ if (lastFragment != null) {
+ if (vmIdToFragments[vmId] == null) {
+ vmIdToFragments[vmId] = mutableListOf()
+ }
+ vmIdToFragments[vmId]!!.add(lastFragment)
+ allFragments.add(lastFragment)
+ }
+ fragment
+ }
+ }
+ }
+ }
+ }
+
+ for (entry in vmIdToLastFragment) {
+ if (entry.value != null) {
+ vmIdToFragments[entry.key]!!.add(entry.value!!)
+ }
+ }
+
+ for (entry in vmIdToMetadata) {
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", entry.key)
+ metaRecord.put("submissionTime", entry.value.minTime)
+ metaRecord.put("endTime", entry.value.maxTime)
+ metaRecord.put("maxCores", entry.value.cores)
+ metaRecord.put("requiredMemory", entry.value.requiredMemory)
+ metaWriter.write(metaRecord)
+ }
+
+ return allFragments
}
-data class Fragment(val id: String, val tick: Long, val flops: Long, val duration: Long, val usage: Double, val cores: Int)
+class VmInfo(val cores: Int, val requiredMemory: Long, var minTime: Long, var maxTime: Long)