From 83497bd122983c7fc0d5cbbdc80b98d58c50cd75 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 10 Sep 2021 16:06:53 +0200 Subject: feat(trace): Support Materna traces from GWA This change adds support for the Materna traces from the Grid Workload Trace Archive (GWA). These traces are very similar to the Bitbrains traces, so they share the same base implementation. --- .../trace/bitbrains/BitbrainsResourceStateTable.kt | 1 + .../bitbrains/BitbrainsResourceStateTableReader.kt | 192 ++++++++++++++------- 2 files changed, 129 insertions(+), 64 deletions(-) diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt index 767ef919..846d5c8a 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt @@ -41,6 +41,7 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, priv Files.walk(path, 1) .filter { !Files.isDirectory(it) && it.extension == "csv" } .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() override val name: String = TABLE_RESOURCE_STATES diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt index 5687ac7f..dab784c2 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt @@ -22,20 +22,42 @@ package org.opendc.trace.bitbrains +import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* +import java.text.NumberFormat import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneOffset +import java.time.format.DateTimeFormatter +import java.time.format.DateTimeParseException +import java.util.* /** * A [TableReader] for the Bitbrains resource state table. */ internal class BitbrainsResourceStateTableReader(private val partition: String, private val parser: CsvParser) : TableReader { /** - * The current parser state. + * The [DateTimeFormatter] used to parse the timestamps in case of the Materna trace. */ - private val state = RowState() + private val formatter = DateTimeFormatter.ofPattern("dd.MM.yyyy HH:mm:ss") + + /** + * The type of timestamps in the trace. + */ + private var timestampType: TimestampType = TimestampType.UNDECIDED + + /** + * The [NumberFormat] used to parse doubles containing a comma. + */ + private val nf = NumberFormat.getInstance(Locale.GERMAN) + + /** + * A flag to indicate that the trace contains decimals with a comma separator. + */ + private var usesCommaDecimalSeparator = false init { parser.schema = schema @@ -43,7 +65,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun nextRow(): Boolean { // Reset the row state - state.reset() + reset() if (!nextStart()) { return false @@ -57,17 +79,32 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, } when (parser.currentName) { - "Timestamp [ms]" -> state.timestamp = Instant.ofEpochSecond(parser.longValue) - "CPU cores" -> state.cpuCores = parser.intValue - "CPU capacity provisioned [MHZ]" -> state.cpuCapacity = parser.doubleValue - "CPU usage [MHZ]" -> state.cpuUsage = parser.doubleValue - "CPU usage [%]" -> state.cpuUsagePct = parser.doubleValue - "Memory capacity provisioned [KB]" -> state.memCapacity = parser.doubleValue - "Memory usage [KB]" -> state.memUsage = parser.doubleValue - "Disk read throughput [KB/s]" -> state.diskRead = parser.doubleValue - "Disk write throughput [KB/s]" -> state.diskWrite = parser.doubleValue - "Network received throughput [KB/s]" -> state.netReceived = parser.doubleValue - "Network transmitted throughput [KB/s]" -> state.netTransmitted = parser.doubleValue + "Timestamp [ms]" -> { + timestamp = when (timestampType) { + TimestampType.UNDECIDED -> { + try { + val res = LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC) + timestampType = TimestampType.DATE_TIME + res + } catch (e: DateTimeParseException) { + timestampType = TimestampType.EPOCH_MILLIS + Instant.ofEpochSecond(parser.longValue) + } + } + TimestampType.DATE_TIME -> LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC) + TimestampType.EPOCH_MILLIS -> Instant.ofEpochSecond(parser.longValue) + } + } + "CPU cores" -> cpuCores = parser.intValue + "CPU capacity provisioned [MHZ]" -> cpuCapacity = parseSafeDouble() + "CPU usage [MHZ]" -> cpuUsage = parseSafeDouble() + "CPU usage [%]" -> cpuUsagePct = parseSafeDouble() / 100.0 // Convert to range [0, 1] + "Memory capacity provisioned [KB]" -> memCapacity = parseSafeDouble() + "Memory usage [KB]" -> memUsage = parseSafeDouble() + "Disk read throughput [KB/s]" -> diskRead = parseSafeDouble() + "Disk write throughput [KB/s]" -> diskWrite = parseSafeDouble() + "Network received throughput [KB/s]" -> netReceived = parseSafeDouble() + "Network transmitted throughput [KB/s]" -> netTransmitted = parseSafeDouble() } } @@ -95,17 +132,17 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun get(column: TableColumn): T { val res: Any? = when (column) { RESOURCE_STATE_ID -> partition - RESOURCE_STATE_TIMESTAMP -> state.timestamp - RESOURCE_STATE_NCPUS -> state.cpuCores - RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity - RESOURCE_STATE_CPU_USAGE -> state.cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct - RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity - RESOURCE_STATE_MEM_USAGE -> state.memUsage - RESOURCE_STATE_DISK_READ -> state.diskRead - RESOURCE_STATE_DISK_WRITE -> state.diskWrite - RESOURCE_STATE_NET_RX -> state.netReceived - RESOURCE_STATE_NET_TX -> state.netTransmitted + RESOURCE_STATE_TIMESTAMP -> timestamp + RESOURCE_STATE_NCPUS -> cpuCores + RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity + RESOURCE_STATE_CPU_USAGE -> cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct + RESOURCE_STATE_MEM_CAPACITY -> memCapacity + RESOURCE_STATE_MEM_USAGE -> memUsage + RESOURCE_STATE_DISK_READ -> diskRead + RESOURCE_STATE_DISK_WRITE -> diskWrite + RESOURCE_STATE_NET_RX -> netReceived + RESOURCE_STATE_NET_TX -> netTransmitted else -> throw IllegalArgumentException("Invalid column") } @@ -119,7 +156,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun getInt(column: TableColumn): Int { return when (column) { - RESOURCE_STATE_NCPUS -> state.cpuCores + RESOURCE_STATE_NCPUS -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } @@ -130,15 +167,15 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun getDouble(column: TableColumn): Double { return when (column) { - RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity - RESOURCE_STATE_CPU_USAGE -> state.cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct - RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity - RESOURCE_STATE_MEM_USAGE -> state.memUsage - RESOURCE_STATE_DISK_READ -> state.diskRead - RESOURCE_STATE_DISK_WRITE -> state.diskWrite - RESOURCE_STATE_NET_RX -> state.netReceived - RESOURCE_STATE_NET_TX -> state.netTransmitted + RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity + RESOURCE_STATE_CPU_USAGE -> cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct + RESOURCE_STATE_MEM_CAPACITY -> memCapacity + RESOURCE_STATE_MEM_USAGE -> memUsage + RESOURCE_STATE_DISK_READ -> diskRead + RESOURCE_STATE_DISK_WRITE -> diskWrite + RESOURCE_STATE_NET_RX -> netReceived + RESOURCE_STATE_NET_TX -> netTransmitted else -> throw IllegalArgumentException("Invalid column") } } @@ -161,37 +198,62 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, } /** - * The current row state. + * Try to parse the current value safely as double. */ - private class RowState { - var timestamp: Instant? = null - var cpuCores = -1 - var cpuCapacity = Double.NaN - var cpuUsage = Double.NaN - var cpuUsagePct = Double.NaN - var memCapacity = Double.NaN - var memUsage = Double.NaN - var diskRead = Double.NaN - var diskWrite = Double.NaN - var netReceived = Double.NaN - var netTransmitted = Double.NaN + private fun parseSafeDouble(): Double { + if (!usesCommaDecimalSeparator) { + try { + return parser.doubleValue + } catch (e: JsonParseException) { + usesCommaDecimalSeparator = true + } + } - /** - * Reset the state. - */ - fun reset() { - timestamp = null - cpuCores = -1 - cpuCapacity = Double.NaN - cpuUsage = Double.NaN - cpuUsagePct = Double.NaN - memCapacity = Double.NaN - memUsage = Double.NaN - diskRead = Double.NaN - diskWrite = Double.NaN - netReceived = Double.NaN - netTransmitted = Double.NaN + val text = parser.text + if (text.isBlank()) { + return 0.0 } + + return nf.parse(text).toDouble() + } + + /** + * State fields of the reader. + */ + private var timestamp: Instant? = null + private var cpuCores = -1 + private var cpuCapacity = Double.NaN + private var cpuUsage = Double.NaN + private var cpuUsagePct = Double.NaN + private var memCapacity = Double.NaN + private var memUsage = Double.NaN + private var diskRead = Double.NaN + private var diskWrite = Double.NaN + private var netReceived = Double.NaN + private var netTransmitted = Double.NaN + + /** + * Reset the state. + */ + private fun reset() { + timestamp = null + cpuCores = -1 + cpuCapacity = Double.NaN + cpuUsage = Double.NaN + cpuUsagePct = Double.NaN + memCapacity = Double.NaN + memUsage = Double.NaN + diskRead = Double.NaN + diskWrite = Double.NaN + netReceived = Double.NaN + netTransmitted = Double.NaN + } + + /** + * The type of the timestamp in the trace. + */ + private enum class TimestampType { + UNDECIDED, DATE_TIME, EPOCH_MILLIS } companion object { @@ -199,15 +261,17 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, * The [CsvSchema] that is used to parse the trace. */ private val schema = CsvSchema.builder() - .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER) + .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER_OR_STRING) .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER) .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER) .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER) .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER) .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER) .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory usage [%]", CsvSchema.ColumnType.NUMBER) .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER) .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Disk size [GB]", CsvSchema.ColumnType.NUMBER) .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER) .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER) .setAllowComments(true) -- cgit v1.2.3