diff options
| -rw-r--r-- | simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt | 120 |
1 files changed, 117 insertions, 3 deletions
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index d6726910..6c599517 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -104,10 +104,15 @@ fun main(args: Array<String>) { val vmPlacements = Sc20VmPlacementReader(vmPlacementFile.inputStream().buffered()).construct() readSolvinityTrace(traceDirectory, metaSchema, metaWriter, clusters, vmPlacements) - } else { + } else if (traceType == "bitbrains") { + readBitbrainsTrace(traceDirectory, metaSchema, metaWriter) + } else if (traceType == "azure") { val seed = args[3].toLong() readAzureTrace(traceDirectory, metaSchema, metaWriter, seed) + } else { + throw IllegalArgumentException("Unsupported trace type") } + allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id }) for (fragment in allFragments) { @@ -176,7 +181,7 @@ fun readSolvinityTrace( continue } - val values = line.split(" ") + val values = line.split("\t") val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L if (timestamp < minTimestamp) { @@ -221,7 +226,7 @@ fun readSolvinityTrace( continue } - val values = line.split(" ") + val values = line.split("\t") vmId = vmFile.name @@ -301,6 +306,115 @@ fun readSolvinityTrace( } /** + * Reads the confidential Solvinity trace. + */ +fun readBitbrainsTrace( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter<GenericData.Record> +): MutableList<Fragment> { + val timestampCol = 0 + val cpuUsageCol = 3 + val coreCol = 1 + val provisionedMemoryCol = 5 + val traceInterval = 5 * 60 * 1000L + + val allFragments = mutableListOf<Fragment>() + + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + .forEachIndexed { idx, vmFile -> + println(vmFile) + + var vmId = "" + var maxCores = -1 + var requiredMemory = -1L + var cores = -1 + var minTime = Long.MAX_VALUE + + val flopsFragments = sequence { + var last: Fragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .drop(1) + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(";\t") + + vmId = vmFile.name + + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + + cores = values[coreCol].trim().toInt() + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toDouble().toLong()) + maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + + val flops: Long = (cpuUsage * 5 * 60).toLong() + + last = if (last != null && last!!.flops == 0L && flops == 0L) { + val oldFragment = last!! + Fragment( + vmId, + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval, + cpuUsage, + cores + ) + } else { + val fragment = + Fragment( + vmId, + timestamp, + flops, + traceInterval, + cpuUsage, + cores + ) + if (last != null) { + yield(last!!) + } + fragment + } + } + } + } + + if (last != null) { + yield(last!!) + } + } + + var maxTime = Long.MIN_VALUE + flopsFragments.forEach { fragment -> + allFragments.add(fragment) + maxTime = max(maxTime, fragment.tick) + } + + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", vmId) + metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) + metaRecord.put("maxCores", maxCores) + metaRecord.put("requiredMemory", requiredMemory) + metaWriter.write(metaRecord) + } + + return allFragments +} + +/** * Reads the Azure cloud trace. * * See https://github.com/Azure/AzurePublicDataset/ for a definition of the trace. |
