summaryrefslogtreecommitdiff
path: root/opendc-format/src/main/kotlin/org
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-18 12:11:22 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-24 11:47:52 +0200
commita23ad09d5a1c4033781bd5403ad766cae83a2beb (patch)
treee117d0636847a30ec7115d9aab5d9de094f5251d /opendc-format/src/main/kotlin/org
parent51515bb255b3b32ca3020419a0c84130a4d8d370 (diff)
refactor(format): Clean up Bitbrains trace reader to enable re-use
This change updates the code for the Bitbrains trace reader and upgrades the TraceConverter to re-use existing code of the Bitbrains trace reader.
Diffstat (limited to 'opendc-format/src/main/kotlin/org')
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt100
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt97
2 files changed, 139 insertions, 58 deletions
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt
new file mode 100644
index 00000000..ff6cdd02
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt
@@ -0,0 +1,100 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.trace.bitbrains
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.databind.MappingIterator
+import com.fasterxml.jackson.dataformat.csv.CsvMapper
+import com.fasterxml.jackson.dataformat.csv.CsvSchema
+import java.io.InputStream
+
+/**
+ * A trace reader that enables the user to read Bitbrains specific trace data.
+ */
+public class BitbrainsRawTraceReader(input: InputStream) : Iterator<BitbrainsRawTraceReader.Entry>, AutoCloseable {
+ /**
+ * The [CsvSchema] that is used to parse the trace.
+ */
+ private val schema = CsvSchema.builder()
+ .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .setUseHeader(true)
+ .setColumnSeparator(';')
+ .build()
+
+ /**
+ * The mapping iterator to use.
+ */
+ private val iterator: MappingIterator<Entry> = CsvMapper().readerFor(Entry::class.java).with(schema)
+ .readValues(input)
+
+ override fun hasNext(): Boolean {
+ return iterator.hasNext()
+ }
+
+ override fun next(): Entry {
+ return iterator.next()
+ }
+
+ override fun close() {
+ iterator.close()
+ }
+
+ /**
+ * A single entry in the trace.
+ */
+ public data class Entry(
+ @JsonProperty("Timestamp [ms]")
+ val timestamp: Long,
+ @JsonProperty("CPU cores")
+ val cpuCores: Int,
+ @JsonProperty("CPU capacity provisioned [MHZ]")
+ val cpuCapacity: Double,
+ @JsonProperty("CPU usage [MHZ]")
+ val cpuUsage: Double,
+ @JsonProperty("CPU usage [%]")
+ val cpuUsagePct: Double,
+ @JsonProperty("Memory capacity provisioned [KB]")
+ val memCapacity: Double,
+ @JsonProperty("Memory usage [KB]")
+ val memUsage: Double,
+ @JsonProperty("Disk read throughput [KB/s]")
+ val diskRead: Double,
+ @JsonProperty("Disk write throughput [KB/s]")
+ val diskWrite: Double,
+ @JsonProperty("Network received throughput [KB/s]")
+ val netReceived: Double,
+ @JsonProperty("Network transmitted throughput [KB/s]")
+ val netTransmitted: Double
+ )
+}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
index cd8021fe..9e4876df 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt
@@ -26,10 +26,10 @@ import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.compute.workload.SimWorkload
-import java.io.BufferedReader
import java.io.File
-import java.io.FileReader
+import java.io.FileInputStream
import java.util.*
+import kotlin.math.max
import kotlin.math.min
/**
@@ -48,74 +48,55 @@ public class BitbrainsTraceReader(traceDirectory: File) : TraceReader<SimWorkloa
*/
init {
val entries = mutableMapOf<Long, TraceEntry<SimWorkload>>()
-
- var timestampCol = 0
- var coreCol = 0
- var cpuUsageCol = 0
- var provisionedMemoryCol = 0
val traceInterval = 5 * 60 * 1000L
traceDirectory.walk()
.filterNot { it.isDirectory }
+ .filter { it.extension == "csv" }
.forEach { vmFile ->
- println(vmFile)
val flopsHistory = mutableListOf<SimTraceWorkload.Fragment>()
var vmId = -1L
- var cores = -1
- var requiredMemory = -1L
- var startTime = -1L
-
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .filter { line ->
- // Ignore comments in the trace
- !line.startsWith("#") && line.isNotBlank()
+ var maxCores = Int.MIN_VALUE
+ var requiredMemory = Long.MIN_VALUE
+ var startTime = Long.MAX_VALUE
+ var lastTimestamp = Long.MIN_VALUE
+
+ BitbrainsRawTraceReader(FileInputStream(vmFile)).use { reader ->
+ reader.forEach { entry ->
+ val timestamp = entry.timestamp * 1000L
+ val cpuUsage = entry.cpuUsage
+ vmId = vmFile.nameWithoutExtension.trim().toLong()
+ val cores = entry.cpuCores
+ maxCores = max(maxCores, cores)
+ requiredMemory = max(requiredMemory, (entry.memCapacity / 1000).toLong())
+
+ if (lastTimestamp < 0) {
+ lastTimestamp = timestamp - 5 * 60 * 1000L
+ startTime = min(startTime, lastTimestamp)
}
- .forEachIndexed { idx, line ->
- val values = line.split(";\t")
-
- // Parse GWF header
- if (idx == 0) {
- val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap()
- timestampCol = header["Timestamp [ms]"]!!
- coreCol = header["CPU cores"]!!
- cpuUsageCol = header["CPU usage [MHZ]"]!!
- provisionedMemoryCol = header["Memory capacity provisioned [KB]"]!!
- return@forEachIndexed
- }
- vmId = vmFile.nameWithoutExtension.trim().toLong()
- val timestamp = values[timestampCol].trim().toLong() - 5 * 60
- startTime = min(startTime, timestamp)
- cores = values[coreCol].trim().toInt()
- val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
- requiredMemory = (values[provisionedMemoryCol].trim().toDouble() / 1000).toLong()
-
- if (flopsHistory.isEmpty()) {
- flopsHistory.add(SimTraceWorkload.Fragment(timestamp, traceInterval, cpuUsage, cores))
+ if (flopsHistory.isEmpty()) {
+ flopsHistory.add(SimTraceWorkload.Fragment(lastTimestamp, traceInterval, cpuUsage, cores))
+ } else {
+ val last = flopsHistory.last()
+ val duration = timestamp - lastTimestamp
+ // Perform run-length encoding
+ if (duration == 0L || last.usage == cpuUsage) {
+ flopsHistory[flopsHistory.size - 1] = last.copy(duration = last.duration + duration)
} else {
- if (flopsHistory.last().usage != cpuUsage) {
- flopsHistory.add(
- SimTraceWorkload.Fragment(
- timestamp,
- traceInterval,
- cpuUsage,
- cores
- )
- )
- } else {
- val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1)
- flopsHistory.add(
- SimTraceWorkload.Fragment(
- oldFragment.timestamp,
- oldFragment.duration + traceInterval,
- cpuUsage,
- cores
- )
+ flopsHistory.add(
+ SimTraceWorkload.Fragment(
+ lastTimestamp,
+ duration,
+ cpuUsage,
+ cores
)
- }
+ )
}
}
+
+ lastTimestamp = timestamp
+ }
}
val uuid = UUID(0L, vmId)
@@ -127,7 +108,7 @@ public class BitbrainsTraceReader(traceDirectory: File) : TraceReader<SimWorkloa
startTime,
workload,
mapOf(
- "cores" to cores,
+ "cores" to maxCores,
"required-memory" to requiredMemory,
"workload" to workload
)