summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-bitbrains/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-10 16:06:53 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-10 17:11:52 +0200
commit83497bd122983c7fc0d5cbbdc80b98d58c50cd75 (patch)
tree143cdb68eb380e07ff23649a19bb1126d2a19812 /opendc-trace/opendc-trace-bitbrains/src
parentcae193284570d6ee9dbacdde57b3e4e367aa9d9f (diff)
feat(trace): Support Materna traces from GWA
This change adds support for the Materna traces from the Grid Workload Trace Archive (GWA). These traces are very similar to the Bitbrains traces, so they share the same base implementation.
Diffstat (limited to 'opendc-trace/opendc-trace-bitbrains/src')
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt1
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt192
2 files changed, 129 insertions, 64 deletions
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
index 767ef919..846d5c8a 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
@@ -41,6 +41,7 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, priv
Files.walk(path, 1)
.filter { !Files.isDirectory(it) && it.extension == "csv" }
.collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
override val name: String = TABLE_RESOURCE_STATES
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
index 5687ac7f..dab784c2 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
@@ -22,20 +22,42 @@
package org.opendc.trace.bitbrains
+import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.JsonToken
import com.fasterxml.jackson.dataformat.csv.CsvParser
import com.fasterxml.jackson.dataformat.csv.CsvSchema
import org.opendc.trace.*
+import java.text.NumberFormat
import java.time.Instant
+import java.time.LocalDateTime
+import java.time.ZoneOffset
+import java.time.format.DateTimeFormatter
+import java.time.format.DateTimeParseException
+import java.util.*
/**
* A [TableReader] for the Bitbrains resource state table.
*/
internal class BitbrainsResourceStateTableReader(private val partition: String, private val parser: CsvParser) : TableReader {
/**
- * The current parser state.
+ * The [DateTimeFormatter] used to parse the timestamps in case of the Materna trace.
*/
- private val state = RowState()
+ private val formatter = DateTimeFormatter.ofPattern("dd.MM.yyyy HH:mm:ss")
+
+ /**
+ * The type of timestamps in the trace.
+ */
+ private var timestampType: TimestampType = TimestampType.UNDECIDED
+
+ /**
+ * The [NumberFormat] used to parse doubles containing a comma.
+ */
+ private val nf = NumberFormat.getInstance(Locale.GERMAN)
+
+ /**
+ * A flag to indicate that the trace contains decimals with a comma separator.
+ */
+ private var usesCommaDecimalSeparator = false
init {
parser.schema = schema
@@ -43,7 +65,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun nextRow(): Boolean {
// Reset the row state
- state.reset()
+ reset()
if (!nextStart()) {
return false
@@ -57,17 +79,32 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
}
when (parser.currentName) {
- "Timestamp [ms]" -> state.timestamp = Instant.ofEpochSecond(parser.longValue)
- "CPU cores" -> state.cpuCores = parser.intValue
- "CPU capacity provisioned [MHZ]" -> state.cpuCapacity = parser.doubleValue
- "CPU usage [MHZ]" -> state.cpuUsage = parser.doubleValue
- "CPU usage [%]" -> state.cpuUsagePct = parser.doubleValue
- "Memory capacity provisioned [KB]" -> state.memCapacity = parser.doubleValue
- "Memory usage [KB]" -> state.memUsage = parser.doubleValue
- "Disk read throughput [KB/s]" -> state.diskRead = parser.doubleValue
- "Disk write throughput [KB/s]" -> state.diskWrite = parser.doubleValue
- "Network received throughput [KB/s]" -> state.netReceived = parser.doubleValue
- "Network transmitted throughput [KB/s]" -> state.netTransmitted = parser.doubleValue
+ "Timestamp [ms]" -> {
+ timestamp = when (timestampType) {
+ TimestampType.UNDECIDED -> {
+ try {
+ val res = LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC)
+ timestampType = TimestampType.DATE_TIME
+ res
+ } catch (e: DateTimeParseException) {
+ timestampType = TimestampType.EPOCH_MILLIS
+ Instant.ofEpochSecond(parser.longValue)
+ }
+ }
+ TimestampType.DATE_TIME -> LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC)
+ TimestampType.EPOCH_MILLIS -> Instant.ofEpochSecond(parser.longValue)
+ }
+ }
+ "CPU cores" -> cpuCores = parser.intValue
+ "CPU capacity provisioned [MHZ]" -> cpuCapacity = parseSafeDouble()
+ "CPU usage [MHZ]" -> cpuUsage = parseSafeDouble()
+ "CPU usage [%]" -> cpuUsagePct = parseSafeDouble() / 100.0 // Convert to range [0, 1]
+ "Memory capacity provisioned [KB]" -> memCapacity = parseSafeDouble()
+ "Memory usage [KB]" -> memUsage = parseSafeDouble()
+ "Disk read throughput [KB/s]" -> diskRead = parseSafeDouble()
+ "Disk write throughput [KB/s]" -> diskWrite = parseSafeDouble()
+ "Network received throughput [KB/s]" -> netReceived = parseSafeDouble()
+ "Network transmitted throughput [KB/s]" -> netTransmitted = parseSafeDouble()
}
}
@@ -95,17 +132,17 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun <T> get(column: TableColumn<T>): T {
val res: Any? = when (column) {
RESOURCE_STATE_ID -> partition
- RESOURCE_STATE_TIMESTAMP -> state.timestamp
- RESOURCE_STATE_NCPUS -> state.cpuCores
- RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct
- RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
- RESOURCE_STATE_MEM_USAGE -> state.memUsage
- RESOURCE_STATE_DISK_READ -> state.diskRead
- RESOURCE_STATE_DISK_WRITE -> state.diskWrite
- RESOURCE_STATE_NET_RX -> state.netReceived
- RESOURCE_STATE_NET_TX -> state.netTransmitted
+ RESOURCE_STATE_TIMESTAMP -> timestamp
+ RESOURCE_STATE_NCPUS -> cpuCores
+ RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
+ RESOURCE_STATE_MEM_CAPACITY -> memCapacity
+ RESOURCE_STATE_MEM_USAGE -> memUsage
+ RESOURCE_STATE_DISK_READ -> diskRead
+ RESOURCE_STATE_DISK_WRITE -> diskWrite
+ RESOURCE_STATE_NET_RX -> netReceived
+ RESOURCE_STATE_NET_TX -> netTransmitted
else -> throw IllegalArgumentException("Invalid column")
}
@@ -119,7 +156,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun getInt(column: TableColumn<Int>): Int {
return when (column) {
- RESOURCE_STATE_NCPUS -> state.cpuCores
+ RESOURCE_STATE_NCPUS -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -130,15 +167,15 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun getDouble(column: TableColumn<Double>): Double {
return when (column) {
- RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct
- RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
- RESOURCE_STATE_MEM_USAGE -> state.memUsage
- RESOURCE_STATE_DISK_READ -> state.diskRead
- RESOURCE_STATE_DISK_WRITE -> state.diskWrite
- RESOURCE_STATE_NET_RX -> state.netReceived
- RESOURCE_STATE_NET_TX -> state.netTransmitted
+ RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
+ RESOURCE_STATE_MEM_CAPACITY -> memCapacity
+ RESOURCE_STATE_MEM_USAGE -> memUsage
+ RESOURCE_STATE_DISK_READ -> diskRead
+ RESOURCE_STATE_DISK_WRITE -> diskWrite
+ RESOURCE_STATE_NET_RX -> netReceived
+ RESOURCE_STATE_NET_TX -> netTransmitted
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -161,37 +198,62 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
}
/**
- * The current row state.
+ * Try to parse the current value safely as double.
*/
- private class RowState {
- var timestamp: Instant? = null
- var cpuCores = -1
- var cpuCapacity = Double.NaN
- var cpuUsage = Double.NaN
- var cpuUsagePct = Double.NaN
- var memCapacity = Double.NaN
- var memUsage = Double.NaN
- var diskRead = Double.NaN
- var diskWrite = Double.NaN
- var netReceived = Double.NaN
- var netTransmitted = Double.NaN
+ private fun parseSafeDouble(): Double {
+ if (!usesCommaDecimalSeparator) {
+ try {
+ return parser.doubleValue
+ } catch (e: JsonParseException) {
+ usesCommaDecimalSeparator = true
+ }
+ }
- /**
- * Reset the state.
- */
- fun reset() {
- timestamp = null
- cpuCores = -1
- cpuCapacity = Double.NaN
- cpuUsage = Double.NaN
- cpuUsagePct = Double.NaN
- memCapacity = Double.NaN
- memUsage = Double.NaN
- diskRead = Double.NaN
- diskWrite = Double.NaN
- netReceived = Double.NaN
- netTransmitted = Double.NaN
+ val text = parser.text
+ if (text.isBlank()) {
+ return 0.0
}
+
+ return nf.parse(text).toDouble()
+ }
+
+ /**
+ * State fields of the reader.
+ */
+ private var timestamp: Instant? = null
+ private var cpuCores = -1
+ private var cpuCapacity = Double.NaN
+ private var cpuUsage = Double.NaN
+ private var cpuUsagePct = Double.NaN
+ private var memCapacity = Double.NaN
+ private var memUsage = Double.NaN
+ private var diskRead = Double.NaN
+ private var diskWrite = Double.NaN
+ private var netReceived = Double.NaN
+ private var netTransmitted = Double.NaN
+
+ /**
+ * Reset the state.
+ */
+ private fun reset() {
+ timestamp = null
+ cpuCores = -1
+ cpuCapacity = Double.NaN
+ cpuUsage = Double.NaN
+ cpuUsagePct = Double.NaN
+ memCapacity = Double.NaN
+ memUsage = Double.NaN
+ diskRead = Double.NaN
+ diskWrite = Double.NaN
+ netReceived = Double.NaN
+ netTransmitted = Double.NaN
+ }
+
+ /**
+ * The type of the timestamp in the trace.
+ */
+ private enum class TimestampType {
+ UNDECIDED, DATE_TIME, EPOCH_MILLIS
}
companion object {
@@ -199,15 +261,17 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
* The [CsvSchema] that is used to parse the trace.
*/
private val schema = CsvSchema.builder()
- .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER_OR_STRING)
.addColumn("CPU cores", CsvSchema.ColumnType.NUMBER)
.addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER)
.addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER)
.addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER)
.addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER)
.addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory usage [%]", CsvSchema.ColumnType.NUMBER)
.addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
.addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Disk size [GB]", CsvSchema.ColumnType.NUMBER)
.addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
.addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
.setAllowComments(true)