summaryrefslogtreecommitdiff
path: root/simulator/opendc
diff options
context:
space:
mode:
authorGeorgios Andreadis <info@gandreadis.com>2020-07-25 12:10:12 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-08-24 19:48:27 +0200
commitb9d92d91273804da1dc8a1ccd93f04ceb573263a (patch)
tree097231b04c56f81b55c54ff9403e74385ef70b6c /simulator/opendc
parentfcf9147c116de46a57b662f7a84255faf5581d67 (diff)
parent537a9405998af49cdcd437dd54e8fcfaa9fe9aaa (diff)
Merge pull request #14 from atlarge-research/feat/bitbrains-converter
Add Bitbrains trace converter
Diffstat (limited to 'simulator/opendc')
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt825
1 files changed, 493 insertions, 332 deletions
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
index d6726910..56ddbb6d 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
@@ -25,6 +25,18 @@
package com.atlarge.opendc.experiments.sc20.trace
import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.parameters.arguments.argument
+import com.github.ajalt.clikt.parameters.groups.OptionGroup
+import com.github.ajalt.clikt.parameters.groups.groupChoice
+import com.github.ajalt.clikt.parameters.options.convert
+import com.github.ajalt.clikt.parameters.options.default
+import com.github.ajalt.clikt.parameters.options.defaultLazy
+import com.github.ajalt.clikt.parameters.options.option
+import com.github.ajalt.clikt.parameters.options.required
+import com.github.ajalt.clikt.parameters.options.split
+import com.github.ajalt.clikt.parameters.types.file
+import com.github.ajalt.clikt.parameters.types.long
import java.io.BufferedReader
import java.io.File
import java.io.FileReader
@@ -41,189 +53,154 @@ import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
/**
- * A script to convert a trace in text format into a Parquet trace.
+ * Represents the command for converting traces
*/
-fun main(args: Array<String>) {
- if (args.size < 4) {
- println("error: expected <OUTPUT> <INPUT> <TRACE-TYPE> <SEED/CLUSTERS+MAPPING>")
- return
- }
-
- val metaSchema = SchemaBuilder
- .record("meta")
- .namespace("com.atlarge.opendc.format.sc20")
- .fields()
- .name("id").type().stringType().noDefault()
- .name("submissionTime").type().longType().noDefault()
- .name("endTime").type().longType().noDefault()
- .name("maxCores").type().intType().noDefault()
- .name("requiredMemory").type().longType().noDefault()
- .endRecord()
- val schema = SchemaBuilder
- .record("trace")
- .namespace("com.atlarge.opendc.format.sc20")
- .fields()
- .name("id").type().stringType().noDefault()
- .name("time").type().longType().noDefault()
- .name("duration").type().longType().noDefault()
- .name("cores").type().intType().noDefault()
- .name("cpuUsage").type().doubleType().noDefault()
- .name("flops").type().longType().noDefault()
- .endRecord()
-
- val dest = File(args[0])
- val traceDirectory = File(args[1])
- val metaParquet = File(dest.absolutePath, "meta.parquet")
- val traceParquet = File(dest.absolutePath, "trace.parquet")
-
- if (metaParquet.exists()) {
- metaParquet.delete()
- }
- if (traceParquet.exists()) {
- traceParquet.delete()
- }
+class TraceConverterCli : CliktCommand(name = "trace-converter") {
+ /**
+ * The directory where the trace should be stored.
+ */
+ private val outputPath by option("-O", "--output", help = "path to store the trace")
+ .file(canBeFile = false, mustExist = false)
+ .defaultLazy { File("output") }
+
+ /**
+ * The directory where the input trace is located.
+ */
+ private val inputPath by argument("input", help = "path to the input trace")
+ .file(canBeFile = false)
+
+ /**
+ * The input type of the trace.
+ */
+ val type by option("-t", "--type", help = "input type of trace").groupChoice(
+ "solvinity" to SolvinityConversion(),
+ "bitbrains" to BitbrainsConversion(),
+ "azure" to AzureConversion()
+ )
+
+ override fun run() {
+ val metaSchema = SchemaBuilder
+ .record("meta")
+ .namespace("com.atlarge.opendc.format.sc20")
+ .fields()
+ .name("id").type().stringType().noDefault()
+ .name("submissionTime").type().longType().noDefault()
+ .name("endTime").type().longType().noDefault()
+ .name("maxCores").type().intType().noDefault()
+ .name("requiredMemory").type().longType().noDefault()
+ .endRecord()
+ val schema = SchemaBuilder
+ .record("trace")
+ .namespace("com.atlarge.opendc.format.sc20")
+ .fields()
+ .name("id").type().stringType().noDefault()
+ .name("time").type().longType().noDefault()
+ .name("duration").type().longType().noDefault()
+ .name("cores").type().intType().noDefault()
+ .name("cpuUsage").type().doubleType().noDefault()
+ .name("flops").type().longType().noDefault()
+ .endRecord()
+
+ val metaParquet = File(outputPath, "meta.parquet")
+ val traceParquet = File(outputPath, "trace.parquet")
+
+ if (metaParquet.exists()) {
+ metaParquet.delete()
+ }
+ if (traceParquet.exists()) {
+ traceParquet.delete()
+ }
- val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(metaParquet.toURI()))
- .withSchema(metaSchema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
- .build()
-
- val writer = AvroParquetWriter.builder<GenericData.Record>(Path(traceParquet.toURI()))
- .withSchema(schema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
- .build()
-
- val traceType = args[2]
- val allFragments = if (traceType == "solvinity") {
- val clusters = args[3].split(",")
- val vmPlacementFile = File(args[4])
- val vmPlacements = Sc20VmPlacementReader(vmPlacementFile.inputStream().buffered()).construct()
-
- readSolvinityTrace(traceDirectory, metaSchema, metaWriter, clusters, vmPlacements)
- } else {
- val seed = args[3].toLong()
- readAzureTrace(traceDirectory, metaSchema, metaWriter, seed)
- }
- allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id })
-
- for (fragment in allFragments) {
- val record = GenericData.Record(schema)
- record.put("id", fragment.id)
- record.put("time", fragment.tick)
- record.put("duration", fragment.duration)
- record.put("cores", fragment.cores)
- record.put("cpuUsage", fragment.usage)
- record.put("flops", fragment.flops)
-
- writer.write(record)
+ val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(metaParquet.toURI()))
+ .withSchema(metaSchema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ val writer = AvroParquetWriter.builder<GenericData.Record>(Path(traceParquet.toURI()))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(4 * 1024 * 1024) // For compression
+ .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
+ .build()
+
+ try {
+ val type = type ?: throw IllegalArgumentException("Invalid trace conversion")
+ val allFragments = type.read(inputPath, metaSchema, metaWriter)
+ allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id })
+
+ for (fragment in allFragments) {
+ val record = GenericData.Record(schema)
+ record.put("id", fragment.id)
+ record.put("time", fragment.tick)
+ record.put("duration", fragment.duration)
+ record.put("cores", fragment.cores)
+ record.put("cpuUsage", fragment.usage)
+ record.put("flops", fragment.flops)
+
+ writer.write(record)
+ }
+ } finally {
+ writer.close()
+ metaWriter.close()
+ }
}
-
- writer.close()
- metaWriter.close()
}
-data class Fragment(
- val id: String,
- val tick: Long,
- val flops: Long,
- val duration: Long,
- val usage: Double,
- val cores: Int
-)
-
/**
- * Reads the confidential Solvinity trace.
+ * The supported trace conversions.
*/
-fun readSolvinityTrace(
- traceDirectory: File,
- metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>,
- clusters: List<String>,
- vmPlacements: Map<String, String>
-): MutableList<Fragment> {
- val timestampCol = 0
- val cpuUsageCol = 1
- val coreCol = 12
- val provisionedMemoryCol = 20
- val traceInterval = 5 * 60 * 1000L
-
- // Identify start time of the entire trace
- var minTimestamp = Long.MAX_VALUE
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
- .forEach { vmFile ->
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .chunked(128)
- .forEachIndexed { idx, lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val vmId = vmFile.name
-
- // Check if VM in topology
- val clusterName = vmPlacements[vmId]
- if (clusterName == null || !clusters.contains(clusterName)) {
- continue
- }
-
- val values = line.split(" ")
- val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
-
- if (timestamp < minTimestamp) {
- minTimestamp = timestamp
- }
- return@forEach
- }
- }
- }
- }
-
- println("Start of trace at $minTimestamp")
-
- val allFragments = mutableListOf<Fragment>()
-
- val begin = 15 * 24 * 60 * 60 * 1000L
- val end = 45 * 24 * 60 * 60 * 1000L
-
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
- .forEachIndexed { idx, vmFile ->
- println(vmFile)
-
- var vmId = ""
- var maxCores = -1
- var requiredMemory = -1L
- var cores = -1
- var minTime = Long.MAX_VALUE
-
- val flopsFragments = sequence {
- var last: Fragment? = null
+sealed class TraceConversion(name: String) : OptionGroup(name) {
+ /**
+ * Read the fragments of the trace.
+ */
+ abstract fun read(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+ ): MutableList<Fragment>
+}
+class SolvinityConversion : TraceConversion("Solvinity") {
+ val clusters by option()
+ .split(",")
+
+ val vmPlacements by option("--vm-placements", help = "file containing the VM placements")
+ .file(canBeDir = false)
+ .convert { it.inputStream().buffered().use { Sc20VmPlacementReader(it).construct() } }
+ .required()
+
+ override fun read(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+ ): MutableList<Fragment> {
+ val clusters = clusters?.toSet() ?: emptySet()
+ val timestampCol = 0
+ val cpuUsageCol = 1
+ val coreCol = 12
+ val provisionedMemoryCol = 20
+ val traceInterval = 5 * 60 * 1000L
+
+ // Identify start time of the entire trace
+ var minTimestamp = Long.MAX_VALUE
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
+ .forEach { vmFile ->
BufferedReader(FileReader(vmFile)).use { reader ->
reader.lineSequence()
.chunked(128)
- .forEach { lines ->
+ .forEachIndexed { idx, lines ->
for (line in lines) {
// Ignore comments in the trace
if (line.startsWith("#") || line.isBlank()) {
continue
}
- val values = line.split(" ")
-
- vmId = vmFile.name
+ val vmId = vmFile.name
// Check if VM in topology
val clusterName = vmPlacements[vmId]
@@ -231,230 +208,414 @@ fun readSolvinityTrace(
continue
}
- val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp
- if (begin > timestamp || timestamp > end) {
- continue
+ val values = line.split("\t")
+ val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+
+ if (timestamp < minTimestamp) {
+ minTimestamp = timestamp
}
+ return@forEach
+ }
+ }
+ }
+ }
- cores = values[coreCol].trim().toInt()
- requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
- maxCores = max(maxCores, cores)
- minTime = min(minTime, timestamp)
- val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
- requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
- maxCores = max(maxCores, cores)
+ println("Start of trace at $minTimestamp")
+
+ val allFragments = mutableListOf<Fragment>()
+
+ val begin = 15 * 24 * 60 * 60 * 1000L
+ val end = 45 * 24 * 60 * 60 * 1000L
+
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
+ .forEachIndexed { idx, vmFile ->
+ println(vmFile)
+
+ var vmId = ""
+ var maxCores = -1
+ var requiredMemory = -1L
+ var cores = -1
+ var minTime = Long.MAX_VALUE
+
+ val flopsFragments = sequence {
+ var last: Fragment? = null
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
- val flops: Long = (cpuUsage * 5 * 60).toLong()
+ val values = line.split("\t")
- last = if (last != null && last!!.flops == 0L && flops == 0L) {
- val oldFragment = last!!
- Fragment(
- vmId,
- oldFragment.tick,
- oldFragment.flops + flops,
- oldFragment.duration + traceInterval,
- cpuUsage,
- cores
- )
- } else {
- val fragment =
+ vmId = vmFile.name
+
+ // Check if VM in topology
+ val clusterName = vmPlacements[vmId]
+ if (clusterName == null || !clusters.contains(clusterName)) {
+ continue
+ }
+
+ val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp
+ if (begin > timestamp || timestamp > end) {
+ continue
+ }
+
+ cores = values[coreCol].trim().toInt()
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+ minTime = min(minTime, timestamp)
+ val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong())
+ maxCores = max(maxCores, cores)
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+
+ last = if (last != null && last!!.flops == 0L && flops == 0L) {
+ val oldFragment = last!!
Fragment(
vmId,
- timestamp,
- flops,
- traceInterval,
+ oldFragment.tick,
+ oldFragment.flops + flops,
+ oldFragment.duration + traceInterval,
cpuUsage,
cores
)
- if (last != null) {
- yield(last!!)
+ } else {
+ val fragment =
+ Fragment(
+ vmId,
+ timestamp,
+ flops,
+ traceInterval,
+ cpuUsage,
+ cores
+ )
+ if (last != null) {
+ yield(last!!)
+ }
+ fragment
}
- fragment
}
}
- }
- }
+ }
- if (last != null) {
- yield(last!!)
+ if (last != null) {
+ yield(last!!)
+ }
}
- }
- var maxTime = Long.MIN_VALUE
- flopsFragments.filter { it.tick in begin until end }.forEach { fragment ->
- allFragments.add(fragment)
- maxTime = max(maxTime, fragment.tick)
- }
+ var maxTime = Long.MIN_VALUE
+ flopsFragments.filter { it.tick in begin until end }.forEach { fragment ->
+ allFragments.add(fragment)
+ maxTime = max(maxTime, fragment.tick)
+ }
- if (minTime in begin until end) {
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", vmId)
- metaRecord.put("submissionTime", minTime)
- metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", maxCores)
- metaRecord.put("requiredMemory", requiredMemory)
- metaWriter.write(metaRecord)
+ if (minTime in begin until end) {
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", vmId)
+ metaRecord.put("submissionTime", minTime)
+ metaRecord.put("endTime", maxTime)
+ metaRecord.put("maxCores", maxCores)
+ metaRecord.put("requiredMemory", requiredMemory)
+ metaWriter.write(metaRecord)
+ }
}
- }
- return allFragments
+ return allFragments
+ }
}
/**
- * Reads the Azure cloud trace.
- *
- * See https://github.com/Azure/AzurePublicDataset/ for a definition of the trace.
+ * Conversion of the Bitbrains public trace.
*/
-fun readAzureTrace(
- traceDirectory: File,
- metaSchema: Schema,
- metaWriter: ParquetWriter<GenericData.Record>,
- seed: Long
-): MutableList<Fragment> {
- val random = Random(seed)
- val fraction = 0.01
-
- // Read VM table
- val vmIdTableCol = 0
- val coreTableCol = 9
- val provisionedMemoryTableCol = 10
-
- var vmId: String
- var cores: Int
- var requiredMemory: Long
-
- val vmIds = mutableSetOf<String>()
- val vmIdToMetadata = mutableMapOf<String, VmInfo>()
-
- BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader ->
- reader.lineSequence()
- .chunked(1024)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
- // Sample only a fraction of the VMs
- if (random.nextDouble() > fraction) {
- continue
- }
+class BitbrainsConversion : TraceConversion("Bitbrains") {
+ override fun read(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+ ): MutableList<Fragment> {
+ val timestampCol = 0
+ val cpuUsageCol = 3
+ val coreCol = 1
+ val provisionedMemoryCol = 5
+ val traceInterval = 5 * 60 * 1000L
+
+ val allFragments = mutableListOf<Fragment>()
+
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
+ .forEachIndexed { idx, vmFile ->
+ println(vmFile)
+
+ var vmId = ""
+ var maxCores = -1
+ var requiredMemory = -1L
+ var cores = -1
+ var minTime = Long.MAX_VALUE
+
+ val flopsFragments = sequence {
+ var last: Fragment? = null
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .drop(1)
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
- val values = line.split(",")
+ val values = line.split(";\t")
- // Exclude VMs with a large number of cores (not specified exactly)
- if (values[coreTableCol].contains(">")) {
- continue
- }
+ vmId = vmFile.name
+
+ val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+
+ cores = values[coreCol].trim().toInt()
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toDouble().toLong())
+ maxCores = max(maxCores, cores)
+ minTime = min(minTime, timestamp)
+ val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
- vmId = values[vmIdTableCol].trim()
- cores = values[coreTableCol].trim().toInt()
- requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB
+ last = if (last != null && last!!.flops == 0L && flops == 0L) {
+ val oldFragment = last!!
+ Fragment(
+ vmId,
+ oldFragment.tick,
+ oldFragment.flops + flops,
+ oldFragment.duration + traceInterval,
+ cpuUsage,
+ cores
+ )
+ } else {
+ val fragment =
+ Fragment(
+ vmId,
+ timestamp,
+ flops,
+ traceInterval,
+ cpuUsage,
+ cores
+ )
+ if (last != null) {
+ yield(last!!)
+ }
+ fragment
+ }
+ }
+ }
+ }
- vmIds.add(vmId)
- vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L)
+ if (last != null) {
+ yield(last!!)
+ }
}
- }
- }
- // Read VM metric reading files
- val timestampCol = 0
- val vmIdCol = 1
- val cpuUsageCol = 4
- val traceInterval = 5 * 60 * 1000L
+ var maxTime = Long.MIN_VALUE
+ flopsFragments.forEach { fragment ->
+ allFragments.add(fragment)
+ maxTime = max(maxTime, fragment.tick)
+ }
- val vmIdToFragments = mutableMapOf<String, MutableList<Fragment>>()
- val vmIdToLastFragment = mutableMapOf<String, Fragment?>()
- val allFragments = mutableListOf<Fragment>()
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", vmId)
+ metaRecord.put("submissionTime", minTime)
+ metaRecord.put("endTime", maxTime)
+ metaRecord.put("maxCores", maxCores)
+ metaRecord.put("requiredMemory", requiredMemory)
+ metaWriter.write(metaRecord)
+ }
- for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) {
- val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv")
- var timestamp: Long
- var cpuUsage: Double
+ return allFragments
+ }
+}
- BufferedReader(FileReader(readingsFile)).use { reader ->
+/**
+ * Conversion of the Azure public VM trace.
+ */
+class AzureConversion : TraceConversion("Azure") {
+ val seed by option(help = "seed for trace sampling")
+ .long()
+ .default(0)
+
+ override fun read(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+ ): MutableList<Fragment> {
+ val random = Random(seed)
+ val fraction = 0.01
+
+ // Read VM table
+ val vmIdTableCol = 0
+ val coreTableCol = 9
+ val provisionedMemoryTableCol = 10
+
+ var vmId: String
+ var cores: Int
+ var requiredMemory: Long
+
+ val vmIds = mutableSetOf<String>()
+ val vmIdToMetadata = mutableMapOf<String, VmInfo>()
+
+ BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader ->
reader.lineSequence()
- .chunked(128)
+ .chunked(1024)
.forEach { lines ->
for (line in lines) {
// Ignore comments in the trace
if (line.startsWith("#") || line.isBlank()) {
continue
}
+ // Sample only a fraction of the VMs
+ if (random.nextDouble() > fraction) {
+ continue
+ }
val values = line.split(",")
- vmId = values[vmIdCol].trim()
- // Ignore readings for VMs not in the sample
- if (!vmIds.contains(vmId)) {
+ // Exclude VMs with a large number of cores (not specified exactly)
+ if (values[coreTableCol].contains(">")) {
continue
}
- timestamp = values[timestampCol].trim().toLong() * 1000L
- vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp)
- cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz
- vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp)
-
- val flops: Long = (cpuUsage * 5 * 60).toLong()
- val lastFragment = vmIdToLastFragment[vmId]
-
- vmIdToLastFragment[vmId] =
- if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) {
- Fragment(
- vmId,
- lastFragment.tick,
- lastFragment.flops + flops,
- lastFragment.duration + traceInterval,
- cpuUsage,
- vmIdToMetadata[vmId]!!.cores
- )
- } else {
- val fragment =
+ vmId = values[vmIdTableCol].trim()
+ cores = values[coreTableCol].trim().toInt()
+ requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB
+
+ vmIds.add(vmId)
+ vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L)
+ }
+ }
+ }
+
+ // Read VM metric reading files
+ val timestampCol = 0
+ val vmIdCol = 1
+ val cpuUsageCol = 4
+ val traceInterval = 5 * 60 * 1000L
+
+ val vmIdToFragments = mutableMapOf<String, MutableList<Fragment>>()
+ val vmIdToLastFragment = mutableMapOf<String, Fragment?>()
+ val allFragments = mutableListOf<Fragment>()
+
+ for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) {
+ val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv")
+ var timestamp: Long
+ var cpuUsage: Double
+
+ BufferedReader(FileReader(readingsFile)).use { reader ->
+ reader.lineSequence()
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split(",")
+ vmId = values[vmIdCol].trim()
+
+ // Ignore readings for VMs not in the sample
+ if (!vmIds.contains(vmId)) {
+ continue
+ }
+
+ timestamp = values[timestampCol].trim().toLong() * 1000L
+ vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp)
+ cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz
+ vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp)
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+ val lastFragment = vmIdToLastFragment[vmId]
+
+ vmIdToLastFragment[vmId] =
+ if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) {
Fragment(
vmId,
- timestamp,
- flops,
- traceInterval,
+ lastFragment.tick,
+ lastFragment.flops + flops,
+ lastFragment.duration + traceInterval,
cpuUsage,
vmIdToMetadata[vmId]!!.cores
)
- if (lastFragment != null) {
- if (vmIdToFragments[vmId] == null) {
- vmIdToFragments[vmId] = mutableListOf()
+ } else {
+ val fragment =
+ Fragment(
+ vmId,
+ timestamp,
+ flops,
+ traceInterval,
+ cpuUsage,
+ vmIdToMetadata[vmId]!!.cores
+ )
+ if (lastFragment != null) {
+ if (vmIdToFragments[vmId] == null) {
+ vmIdToFragments[vmId] = mutableListOf()
+ }
+ vmIdToFragments[vmId]!!.add(lastFragment)
+ allFragments.add(lastFragment)
}
- vmIdToFragments[vmId]!!.add(lastFragment)
- allFragments.add(lastFragment)
+ fragment
}
- fragment
- }
+ }
}
- }
+ }
}
- }
- for (entry in vmIdToLastFragment) {
- if (entry.value != null) {
- if (vmIdToFragments[entry.key] == null) {
- vmIdToFragments[entry.key] = mutableListOf()
+ for (entry in vmIdToLastFragment) {
+ if (entry.value != null) {
+ if (vmIdToFragments[entry.key] == null) {
+ vmIdToFragments[entry.key] = mutableListOf()
+ }
+ vmIdToFragments[entry.key]!!.add(entry.value!!)
}
- vmIdToFragments[entry.key]!!.add(entry.value!!)
}
- }
- println("Read ${vmIdToLastFragment.size} VMs")
-
- for (entry in vmIdToMetadata) {
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", entry.key)
- metaRecord.put("submissionTime", entry.value.minTime)
- metaRecord.put("endTime", entry.value.maxTime)
- println("${entry.value.minTime} - ${entry.value.maxTime}")
- metaRecord.put("maxCores", entry.value.cores)
- metaRecord.put("requiredMemory", entry.value.requiredMemory)
- metaWriter.write(metaRecord)
- }
+ println("Read ${vmIdToLastFragment.size} VMs")
+
+ for (entry in vmIdToMetadata) {
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", entry.key)
+ metaRecord.put("submissionTime", entry.value.minTime)
+ metaRecord.put("endTime", entry.value.maxTime)
+ println("${entry.value.minTime} - ${entry.value.maxTime}")
+ metaRecord.put("maxCores", entry.value.cores)
+ metaRecord.put("requiredMemory", entry.value.requiredMemory)
+ metaWriter.write(metaRecord)
+ }
- return allFragments
+ return allFragments
+ }
}
+data class Fragment(
+ val id: String,
+ val tick: Long,
+ val flops: Long,
+ val duration: Long,
+ val usage: Double,
+ val cores: Int
+)
+
class VmInfo(val cores: Int, val requiredMemory: Long, var minTime: Long, var maxTime: Long)
+
+/**
+ * A script to convert a trace in text format into a Parquet trace.
+ */
+fun main(args: Array<String>) = TraceConverterCli().main(args)