diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-08-18 12:11:22 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-08-24 11:47:52 +0200 |
| commit | a23ad09d5a1c4033781bd5403ad766cae83a2beb (patch) | |
| tree | e117d0636847a30ec7115d9aab5d9de094f5251d /opendc-experiments/opendc-experiments-capelin | |
| parent | 51515bb255b3b32ca3020419a0c84130a4d8d370 (diff) | |
refactor(format): Clean up Bitbrains trace reader to enable re-use
This change updates the code for the Bitbrains trace reader and upgrades
the TraceConverter to re-use existing code of the Bitbrains trace
reader.
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin')
| -rw-r--r-- | opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt | 127 |
1 files changed, 27 insertions, 100 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt index 7cd1f159..d7daa35b 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt @@ -26,12 +26,7 @@ import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.arguments.argument import com.github.ajalt.clikt.parameters.groups.OptionGroup import com.github.ajalt.clikt.parameters.groups.groupChoice -import com.github.ajalt.clikt.parameters.options.convert -import com.github.ajalt.clikt.parameters.options.default -import com.github.ajalt.clikt.parameters.options.defaultLazy -import com.github.ajalt.clikt.parameters.options.option -import com.github.ajalt.clikt.parameters.options.required -import com.github.ajalt.clikt.parameters.options.split +import com.github.ajalt.clikt.parameters.options.* import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.long import me.tongfei.progressbar.ProgressBar @@ -41,11 +36,13 @@ import org.apache.avro.generic.GenericData import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.format.trace.bitbrains.BitbrainsTraceReader import org.opendc.format.util.LocalOutputFile +import org.opendc.simulator.compute.workload.SimTraceWorkload import java.io.BufferedReader import java.io.File import java.io.FileReader -import java.util.Random +import java.util.* import kotlin.math.max import kotlin.math.min @@ -340,106 +337,36 @@ class BitbrainsConversion : TraceConversion("Bitbrains") { metaSchema: Schema, metaWriter: ParquetWriter<GenericData.Record> ): MutableList<Fragment> { - val timestampCol = 0 - val cpuUsageCol = 3 - val coreCol = 1 - val provisionedMemoryCol = 5 - val traceInterval = 5 * 60 * 1000L - - val allFragments = mutableListOf<Fragment>() - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach { vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var cores: Int - var minTime = Long.MAX_VALUE - - val flopsFragments = sequence { - var last: Fragment? = null - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .drop(1) - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split(";\t") - - vmId = vmFile.name - - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - - cores = values[coreCol].trim().toInt() - val provisionedMemory = values[provisionedMemoryCol].trim().toDouble() // KB - requiredMemory = max(requiredMemory, (provisionedMemory / 1000).toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - - val flops: Long = (cpuUsage * 5 * 60).toLong() - - last = if (last != null && last!!.flops == 0L && flops == 0L) { - val oldFragment = last!! - Fragment( - vmId, - oldFragment.tick, - oldFragment.flops + flops, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - cores - ) - if (last != null) { - yield(last!!) - } - fragment - } - } - } - } - - if (last != null) { - yield(last!!) - } - } - + val fragments = mutableListOf<Fragment>() + BitbrainsTraceReader(traceDirectory).use { reader -> + reader.forEach { entry -> + val trace = (entry.workload as SimTraceWorkload).trace var maxTime = Long.MIN_VALUE - flopsFragments.forEach { fragment -> - allFragments.add(fragment) - maxTime = max(maxTime, fragment.tick) + trace.forEach { fragment -> + val flops: Long = (fragment.usage * fragment.duration / 1000).toLong() + fragments.add( + Fragment( + entry.name, + fragment.timestamp, + flops, + fragment.duration, + fragment.usage, + fragment.cores + ) + ) + maxTime = max(maxTime, fragment.timestamp + fragment.duration) } val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", vmId) - metaRecord.put("submissionTime", minTime) + metaRecord.put("id", entry.name) + metaRecord.put("submissionTime", entry.start) metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", maxCores) - metaRecord.put("requiredMemory", requiredMemory) + metaRecord.put("maxCores", entry.meta["cores"]) + metaRecord.put("requiredMemory", entry.meta["required-memory"]) metaWriter.write(metaRecord) } - - return allFragments + } + return fragments } } |
