diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-08-18 12:11:22 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-08-24 11:47:52 +0200 |
| commit | a23ad09d5a1c4033781bd5403ad766cae83a2beb (patch) | |
| tree | e117d0636847a30ec7115d9aab5d9de094f5251d /opendc-format/src/main/kotlin/org | |
| parent | 51515bb255b3b32ca3020419a0c84130a4d8d370 (diff) | |
refactor(format): Clean up Bitbrains trace reader to enable re-use
This change updates the code for the Bitbrains trace reader and upgrades
the TraceConverter to re-use existing code of the Bitbrains trace
reader.
Diffstat (limited to 'opendc-format/src/main/kotlin/org')
2 files changed, 139 insertions, 58 deletions
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt new file mode 100644 index 00000000..ff6cdd02 --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.trace.bitbrains + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.databind.MappingIterator +import com.fasterxml.jackson.dataformat.csv.CsvMapper +import com.fasterxml.jackson.dataformat.csv.CsvSchema +import java.io.InputStream + +/** + * A trace reader that enables the user to read Bitbrains specific trace data. + */ +public class BitbrainsRawTraceReader(input: InputStream) : Iterator<BitbrainsRawTraceReader.Entry>, AutoCloseable { + /** + * The [CsvSchema] that is used to parse the trace. + */ + private val schema = CsvSchema.builder() + .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER) + .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .setAllowComments(true) + .setUseHeader(true) + .setColumnSeparator(';') + .build() + + /** + * The mapping iterator to use. + */ + private val iterator: MappingIterator<Entry> = CsvMapper().readerFor(Entry::class.java).with(schema) + .readValues(input) + + override fun hasNext(): Boolean { + return iterator.hasNext() + } + + override fun next(): Entry { + return iterator.next() + } + + override fun close() { + iterator.close() + } + + /** + * A single entry in the trace. + */ + public data class Entry( + @JsonProperty("Timestamp [ms]") + val timestamp: Long, + @JsonProperty("CPU cores") + val cpuCores: Int, + @JsonProperty("CPU capacity provisioned [MHZ]") + val cpuCapacity: Double, + @JsonProperty("CPU usage [MHZ]") + val cpuUsage: Double, + @JsonProperty("CPU usage [%]") + val cpuUsagePct: Double, + @JsonProperty("Memory capacity provisioned [KB]") + val memCapacity: Double, + @JsonProperty("Memory usage [KB]") + val memUsage: Double, + @JsonProperty("Disk read throughput [KB/s]") + val diskRead: Double, + @JsonProperty("Disk write throughput [KB/s]") + val diskWrite: Double, + @JsonProperty("Network received throughput [KB/s]") + val netReceived: Double, + @JsonProperty("Network transmitted throughput [KB/s]") + val netTransmitted: Double + ) +} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt index cd8021fe..9e4876df 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -26,10 +26,10 @@ import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload -import java.io.BufferedReader import java.io.File -import java.io.FileReader +import java.io.FileInputStream import java.util.* +import kotlin.math.max import kotlin.math.min /** @@ -48,74 +48,55 @@ public class BitbrainsTraceReader(traceDirectory: File) : TraceReader<SimWorkloa */ init { val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>() - - var timestampCol = 0 - var coreCol = 0 - var cpuUsageCol = 0 - var provisionedMemoryCol = 0 val traceInterval = 5 * 60 * 1000L traceDirectory.walk() .filterNot { it.isDirectory } + .filter { it.extension == "csv" } .forEach { vmFile -> - println(vmFile) val flopsHistory = mutableListOf<SimTraceWorkload.Fragment>() var vmId = -1L - var cores = -1 - var requiredMemory = -1L - var startTime = -1L - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .filter { line -> - // Ignore comments in the trace - !line.startsWith("#") && line.isNotBlank() + var maxCores = Int.MIN_VALUE + var requiredMemory = Long.MIN_VALUE + var startTime = Long.MAX_VALUE + var lastTimestamp = Long.MIN_VALUE + + BitbrainsRawTraceReader(FileInputStream(vmFile)).use { reader -> + reader.forEach { entry -> + val timestamp = entry.timestamp * 1000L + val cpuUsage = entry.cpuUsage + vmId = vmFile.nameWithoutExtension.trim().toLong() + val cores = entry.cpuCores + maxCores = max(maxCores, cores) + requiredMemory = max(requiredMemory, (entry.memCapacity / 1000).toLong()) + + if (lastTimestamp < 0) { + lastTimestamp = timestamp - 5 * 60 * 1000L + startTime = min(startTime, lastTimestamp) } - .forEachIndexed { idx, line -> - val values = line.split(";\t") - - // Parse GWF header - if (idx == 0) { - val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() - timestampCol = header["Timestamp [ms]"]!! - coreCol = header["CPU cores"]!! - cpuUsageCol = header["CPU usage [MHZ]"]!! - provisionedMemoryCol = header["Memory capacity provisioned [KB]"]!! - return@forEachIndexed - } - vmId = vmFile.nameWithoutExtension.trim().toLong() - val timestamp = values[timestampCol].trim().toLong() - 5 * 60 - startTime = min(startTime, timestamp) - cores = values[coreCol].trim().toInt() - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - requiredMemory = (values[provisionedMemoryCol].trim().toDouble() / 1000).toLong() - - if (flopsHistory.isEmpty()) { - flopsHistory.add(SimTraceWorkload.Fragment(timestamp, traceInterval, cpuUsage, cores)) + if (flopsHistory.isEmpty()) { + flopsHistory.add(SimTraceWorkload.Fragment(lastTimestamp, traceInterval, cpuUsage, cores)) + } else { + val last = flopsHistory.last() + val duration = timestamp - lastTimestamp + // Perform run-length encoding + if (duration == 0L || last.usage == cpuUsage) { + flopsHistory[flopsHistory.size - 1] = last.copy(duration = last.duration + duration) } else { - if (flopsHistory.last().usage != cpuUsage) { - flopsHistory.add( - SimTraceWorkload.Fragment( - timestamp, - traceInterval, - cpuUsage, - cores - ) - ) - } else { - val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1) - flopsHistory.add( - SimTraceWorkload.Fragment( - oldFragment.timestamp, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) + flopsHistory.add( + SimTraceWorkload.Fragment( + lastTimestamp, + duration, + cpuUsage, + cores ) - } + ) } } + + lastTimestamp = timestamp + } } val uuid = UUID(0L, vmId) @@ -127,7 +108,7 @@ public class BitbrainsTraceReader(traceDirectory: File) : TraceReader<SimWorkloa startTime, workload, mapOf( - "cores" to cores, + "cores" to maxCores, "required-memory" to requiredMemory, "workload" to workload ) |
