summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-bitbrains/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-trace/opendc-trace-bitbrains/src')
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt1
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt192
2 files changed, 129 insertions, 64 deletions
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
index 767ef919..846d5c8a 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
@@ -41,6 +41,7 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, priv
Files.walk(path, 1)
.filter { !Files.isDirectory(it) && it.extension == "csv" }
.collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
override val name: String = TABLE_RESOURCE_STATES
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
index 5687ac7f..dab784c2 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
@@ -22,20 +22,42 @@
package org.opendc.trace.bitbrains
+import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.JsonToken
import com.fasterxml.jackson.dataformat.csv.CsvParser
import com.fasterxml.jackson.dataformat.csv.CsvSchema
import org.opendc.trace.*
+import java.text.NumberFormat
import java.time.Instant
+import java.time.LocalDateTime
+import java.time.ZoneOffset
+import java.time.format.DateTimeFormatter
+import java.time.format.DateTimeParseException
+import java.util.*
/**
* A [TableReader] for the Bitbrains resource state table.
*/
internal class BitbrainsResourceStateTableReader(private val partition: String, private val parser: CsvParser) : TableReader {
/**
- * The current parser state.
+ * The [DateTimeFormatter] used to parse the timestamps in case of the Materna trace.
*/
- private val state = RowState()
+ private val formatter = DateTimeFormatter.ofPattern("dd.MM.yyyy HH:mm:ss")
+
+ /**
+ * The type of timestamps in the trace.
+ */
+ private var timestampType: TimestampType = TimestampType.UNDECIDED
+
+ /**
+ * The [NumberFormat] used to parse doubles containing a comma.
+ */
+ private val nf = NumberFormat.getInstance(Locale.GERMAN)
+
+ /**
+ * A flag to indicate that the trace contains decimals with a comma separator.
+ */
+ private var usesCommaDecimalSeparator = false
init {
parser.schema = schema
@@ -43,7 +65,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun nextRow(): Boolean {
// Reset the row state
- state.reset()
+ reset()
if (!nextStart()) {
return false
@@ -57,17 +79,32 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
}
when (parser.currentName) {
- "Timestamp [ms]" -> state.timestamp = Instant.ofEpochSecond(parser.longValue)
- "CPU cores" -> state.cpuCores = parser.intValue
- "CPU capacity provisioned [MHZ]" -> state.cpuCapacity = parser.doubleValue
- "CPU usage [MHZ]" -> state.cpuUsage = parser.doubleValue
- "CPU usage [%]" -> state.cpuUsagePct = parser.doubleValue
- "Memory capacity provisioned [KB]" -> state.memCapacity = parser.doubleValue
- "Memory usage [KB]" -> state.memUsage = parser.doubleValue
- "Disk read throughput [KB/s]" -> state.diskRead = parser.doubleValue
- "Disk write throughput [KB/s]" -> state.diskWrite = parser.doubleValue
- "Network received throughput [KB/s]" -> state.netReceived = parser.doubleValue
- "Network transmitted throughput [KB/s]" -> state.netTransmitted = parser.doubleValue
+ "Timestamp [ms]" -> {
+ timestamp = when (timestampType) {
+ TimestampType.UNDECIDED -> {
+ try {
+ val res = LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC)
+ timestampType = TimestampType.DATE_TIME
+ res
+ } catch (e: DateTimeParseException) {
+ timestampType = TimestampType.EPOCH_MILLIS
+ Instant.ofEpochSecond(parser.longValue)
+ }
+ }
+ TimestampType.DATE_TIME -> LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC)
+ TimestampType.EPOCH_MILLIS -> Instant.ofEpochSecond(parser.longValue)
+ }
+ }
+ "CPU cores" -> cpuCores = parser.intValue
+ "CPU capacity provisioned [MHZ]" -> cpuCapacity = parseSafeDouble()
+ "CPU usage [MHZ]" -> cpuUsage = parseSafeDouble()
+ "CPU usage [%]" -> cpuUsagePct = parseSafeDouble() / 100.0 // Convert to range [0, 1]
+ "Memory capacity provisioned [KB]" -> memCapacity = parseSafeDouble()
+ "Memory usage [KB]" -> memUsage = parseSafeDouble()
+ "Disk read throughput [KB/s]" -> diskRead = parseSafeDouble()
+ "Disk write throughput [KB/s]" -> diskWrite = parseSafeDouble()
+ "Network received throughput [KB/s]" -> netReceived = parseSafeDouble()
+ "Network transmitted throughput [KB/s]" -> netTransmitted = parseSafeDouble()
}
}
@@ -95,17 +132,17 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun <T> get(column: TableColumn<T>): T {
val res: Any? = when (column) {
RESOURCE_STATE_ID -> partition
- RESOURCE_STATE_TIMESTAMP -> state.timestamp
- RESOURCE_STATE_NCPUS -> state.cpuCores
- RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct
- RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
- RESOURCE_STATE_MEM_USAGE -> state.memUsage
- RESOURCE_STATE_DISK_READ -> state.diskRead
- RESOURCE_STATE_DISK_WRITE -> state.diskWrite
- RESOURCE_STATE_NET_RX -> state.netReceived
- RESOURCE_STATE_NET_TX -> state.netTransmitted
+ RESOURCE_STATE_TIMESTAMP -> timestamp
+ RESOURCE_STATE_NCPUS -> cpuCores
+ RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
+ RESOURCE_STATE_MEM_CAPACITY -> memCapacity
+ RESOURCE_STATE_MEM_USAGE -> memUsage
+ RESOURCE_STATE_DISK_READ -> diskRead
+ RESOURCE_STATE_DISK_WRITE -> diskWrite
+ RESOURCE_STATE_NET_RX -> netReceived
+ RESOURCE_STATE_NET_TX -> netTransmitted
else -> throw IllegalArgumentException("Invalid column")
}
@@ -119,7 +156,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun getInt(column: TableColumn<Int>): Int {
return when (column) {
- RESOURCE_STATE_NCPUS -> state.cpuCores
+ RESOURCE_STATE_NCPUS -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -130,15 +167,15 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun getDouble(column: TableColumn<Double>): Double {
return when (column) {
- RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct
- RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
- RESOURCE_STATE_MEM_USAGE -> state.memUsage
- RESOURCE_STATE_DISK_READ -> state.diskRead
- RESOURCE_STATE_DISK_WRITE -> state.diskWrite
- RESOURCE_STATE_NET_RX -> state.netReceived
- RESOURCE_STATE_NET_TX -> state.netTransmitted
+ RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
+ RESOURCE_STATE_MEM_CAPACITY -> memCapacity
+ RESOURCE_STATE_MEM_USAGE -> memUsage
+ RESOURCE_STATE_DISK_READ -> diskRead
+ RESOURCE_STATE_DISK_WRITE -> diskWrite
+ RESOURCE_STATE_NET_RX -> netReceived
+ RESOURCE_STATE_NET_TX -> netTransmitted
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -161,37 +198,62 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
}
/**
- * The current row state.
+ * Try to parse the current value safely as double.
*/
- private class RowState {
- var timestamp: Instant? = null
- var cpuCores = -1
- var cpuCapacity = Double.NaN
- var cpuUsage = Double.NaN
- var cpuUsagePct = Double.NaN
- var memCapacity = Double.NaN
- var memUsage = Double.NaN
- var diskRead = Double.NaN
- var diskWrite = Double.NaN
- var netReceived = Double.NaN
- var netTransmitted = Double.NaN
+ private fun parseSafeDouble(): Double {
+ if (!usesCommaDecimalSeparator) {
+ try {
+ return parser.doubleValue
+ } catch (e: JsonParseException) {
+ usesCommaDecimalSeparator = true
+ }
+ }
- /**
- * Reset the state.
- */
- fun reset() {
- timestamp = null
- cpuCores = -1
- cpuCapacity = Double.NaN
- cpuUsage = Double.NaN
- cpuUsagePct = Double.NaN
- memCapacity = Double.NaN
- memUsage = Double.NaN
- diskRead = Double.NaN
- diskWrite = Double.NaN
- netReceived = Double.NaN
- netTransmitted = Double.NaN
+ val text = parser.text
+ if (text.isBlank()) {
+ return 0.0
}
+
+ return nf.parse(text).toDouble()
+ }
+
+ /**
+ * State fields of the reader.
+ */
+ private var timestamp: Instant? = null
+ private var cpuCores = -1
+ private var cpuCapacity = Double.NaN
+ private var cpuUsage = Double.NaN
+ private var cpuUsagePct = Double.NaN
+ private var memCapacity = Double.NaN
+ private var memUsage = Double.NaN
+ private var diskRead = Double.NaN
+ private var diskWrite = Double.NaN
+ private var netReceived = Double.NaN
+ private var netTransmitted = Double.NaN
+
+ /**
+ * Reset the state.
+ */
+ private fun reset() {
+ timestamp = null
+ cpuCores = -1
+ cpuCapacity = Double.NaN
+ cpuUsage = Double.NaN
+ cpuUsagePct = Double.NaN
+ memCapacity = Double.NaN
+ memUsage = Double.NaN
+ diskRead = Double.NaN
+ diskWrite = Double.NaN
+ netReceived = Double.NaN
+ netTransmitted = Double.NaN
+ }
+
+ /**
+ * The type of the timestamp in the trace.
+ */
+ private enum class TimestampType {
+ UNDECIDED, DATE_TIME, EPOCH_MILLIS
}
companion object {
@@ -199,15 +261,17 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
* The [CsvSchema] that is used to parse the trace.
*/
private val schema = CsvSchema.builder()
- .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER_OR_STRING)
.addColumn("CPU cores", CsvSchema.ColumnType.NUMBER)
.addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER)
.addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER)
.addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER)
.addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER)
.addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory usage [%]", CsvSchema.ColumnType.NUMBER)
.addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
.addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Disk size [GB]", CsvSchema.ColumnType.NUMBER)
.addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
.addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
.setAllowComments(true)