From fa08b63bd749e9fbe1a1d04ef2ebd7a86453fa4b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 11 Sep 2021 10:52:30 +0200 Subject: perf(trace): Keep reader state in own class This change removes the external class that holds the state of the reader and instead puts the state in the reader implementation. Maintaining a separate class for the state increases the complexity and has worse performance characteristics due to the bytecode produced by Kotlin for property accesses. --- .../capelin/trace/RawParquetTraceReader.kt | 4 +- .../capelin/trace/bp/BPResourceTable.kt | 2 +- .../capelin/trace/bp/BPResourceTableReader.kt | 10 +- .../capelin/trace/sv/SvResourceStateTable.kt | 5 +- .../capelin/trace/sv/SvResourceStateTableReader.kt | 142 +++++++++------------ 5 files changed, 73 insertions(+), 90 deletions(-) (limited to 'opendc-experiments/opendc-experiments-capelin/src') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt index fa4e9ed8..ca937328 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt @@ -90,9 +90,9 @@ class RawParquetTraceReader(private val path: File) { } val submissionTime = reader.get(RESOURCE_START_TIME) - val endTime = reader.get(RESOURCE_END_TIME) + val endTime = reader.get(RESOURCE_STOP_TIME) val maxCores = reader.getInt(RESOURCE_NCPUS) - val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) + val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) val vmFragments = fragments.getValue(id).asSequence() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt index 74d1e574..bff8c55e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt @@ -38,7 +38,7 @@ internal class BPResourceTable(private val path: Path) : Table { return when (column) { RESOURCE_ID -> true RESOURCE_START_TIME -> true - RESOURCE_END_TIME -> true + RESOURCE_STOP_TIME -> true RESOURCE_NCPUS -> true RESOURCE_MEM_CAPACITY -> true else -> false diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt index 0a105783..4416aae8 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt @@ -45,7 +45,7 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader true RESOURCE_START_TIME -> true - RESOURCE_END_TIME -> true + RESOURCE_STOP_TIME -> true RESOURCE_NCPUS -> true RESOURCE_MEM_CAPACITY -> true else -> false @@ -59,9 +59,9 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader record["id"].toString() RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long) - RESOURCE_END_TIME -> Instant.ofEpochMilli(record["endTime"] as Long) - RESOURCE_NCPUS -> record["maxCores"] - RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() + RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long) + RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS) + RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) else -> throw IllegalArgumentException("Invalid column") } @@ -90,7 +90,7 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader (record["requiredMemory"] as Number).toDouble() + RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB else -> throw IllegalArgumentException("Invalid column") } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt index 24abb109..3a9bda69 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt @@ -31,7 +31,7 @@ import kotlin.io.path.extension import kotlin.io.path.nameWithoutExtension /** - * The resource state [Table] in the Bitbrains format. + * The resource state [Table] in the extended Bitbrains format. */ internal class SvResourceStateTable(path: Path) : Table { /** @@ -40,6 +40,7 @@ internal class SvResourceStateTable(path: Path) : Table { private val partitions = Files.walk(path, 1) .filter { !Files.isDirectory(it) && it.extension == "txt" } .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() override val name: String = TABLE_RESOURCE_STATES @@ -126,7 +127,7 @@ internal class SvResourceStateTable(path: Path) : Table { } } - override fun toString(): String = "BitbrainsCompositeTableReader" + override fun toString(): String = "SvCompositeTableReader" } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt index 1a556f8d..a7d2d70a 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt @@ -30,13 +30,8 @@ import java.time.Instant * A [TableReader] for the Bitbrains resource state table. */ internal class SvResourceStateTableReader(private val reader: BufferedReader) : TableReader { - /** - * The current parser state. - */ - private val state = RowState() - override fun nextRow(): Boolean { - state.reset() + reset() var line: String var num = 0 @@ -75,18 +70,18 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : val field = line.subSequence(start, end) as String when (col++) { - COL_TIMESTAMP -> state.timestamp = Instant.ofEpochSecond(field.toLong(10)) - COL_CPU_USAGE -> state.cpuUsage = field.toDouble() - COL_CPU_DEMAND -> state.cpuDemand = field.toDouble() - COL_DISK_READ -> state.diskRead = field.toDouble() - COL_DISK_WRITE -> state.diskWrite = field.toDouble() - COL_CLUSTER_ID -> state.cluster = field.trim() - COL_NCPUS -> state.cpuCores = field.toInt(10) - COL_CPU_READY_PCT -> state.cpuReadyPct = field.toDouble() - COL_POWERED_ON -> state.poweredOn = field.toInt(10) == 1 - COL_CPU_CAPACITY -> state.cpuCapacity = field.toDouble() - COL_ID -> state.id = field.trim() - COL_MEM_CAPACITY -> state.memCapacity = field.toDouble() + COL_TIMESTAMP -> timestamp = Instant.ofEpochSecond(field.toLong(10)) + COL_CPU_USAGE -> cpuUsage = field.toDouble() + COL_CPU_DEMAND -> cpuDemand = field.toDouble() + COL_DISK_READ -> diskRead = field.toDouble() + COL_DISK_WRITE -> diskWrite = field.toDouble() + COL_CLUSTER_ID -> cluster = field.trim() + COL_NCPUS -> cpuCores = field.toInt(10) + COL_CPU_READY_PCT -> cpuReadyPct = field.toDouble() + COL_POWERED_ON -> poweredOn = field.toInt(10) == 1 + COL_CPU_CAPACITY -> cpuCapacity = field.toDouble() + COL_ID -> id = field.trim() + COL_MEM_CAPACITY -> memCapacity = field.toDouble() } } @@ -113,16 +108,16 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : override fun get(column: TableColumn): T { val res: Any? = when (column) { - RESOURCE_STATE_ID -> state.id - RESOURCE_STATE_CLUSTER_ID -> state.cluster - RESOURCE_STATE_TIMESTAMP -> state.timestamp - RESOURCE_STATE_NCPUS -> state.cpuCores - RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity - RESOURCE_STATE_CPU_USAGE -> state.cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity - RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity - RESOURCE_STATE_DISK_READ -> state.diskRead - RESOURCE_STATE_DISK_WRITE -> state.diskWrite + RESOURCE_STATE_ID -> id + RESOURCE_STATE_CLUSTER_ID -> cluster + RESOURCE_STATE_TIMESTAMP -> timestamp + RESOURCE_STATE_NCPUS -> getInt(RESOURCE_STATE_NCPUS) + RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY) + RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) + RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT) + RESOURCE_STATE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY) + RESOURCE_STATE_DISK_READ -> getDouble(RESOURCE_STATE_DISK_READ) + RESOURCE_STATE_DISK_WRITE -> getDouble(RESOURCE_STATE_DISK_WRITE) else -> throw IllegalArgumentException("Invalid column") } @@ -132,14 +127,14 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : override fun getBoolean(column: TableColumn): Boolean { return when (column) { - RESOURCE_STATE_POWERED_ON -> state.poweredOn + RESOURCE_STATE_POWERED_ON -> poweredOn else -> throw IllegalArgumentException("Invalid column") } } override fun getInt(column: TableColumn): Int { return when (column) { - RESOURCE_STATE_NCPUS -> state.cpuCores + RESOURCE_STATE_NCPUS -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } @@ -150,12 +145,13 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : override fun getDouble(column: TableColumn): Double { return when (column) { - RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity - RESOURCE_STATE_CPU_USAGE -> state.cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity - RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity - RESOURCE_STATE_DISK_READ -> state.diskRead - RESOURCE_STATE_DISK_WRITE -> state.diskWrite + RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity + RESOURCE_STATE_CPU_USAGE -> cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsage / cpuCapacity + RESOURCE_STATE_CPU_DEMAND -> cpuDemand + RESOURCE_STATE_MEM_CAPACITY -> memCapacity + RESOURCE_STATE_DISK_READ -> diskRead + RESOURCE_STATE_DISK_WRITE -> diskWrite else -> throw IllegalArgumentException("Invalid column") } } @@ -165,51 +161,37 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : } /** - * The current row state. + * State fields of the reader. */ - private class RowState { - @JvmField - var id: String? = null - @JvmField - var cluster: String? = null - @JvmField - var timestamp: Instant? = null - @JvmField - var cpuCores = -1 - @JvmField - var cpuCapacity = Double.NaN - @JvmField - var cpuUsage = Double.NaN - @JvmField - var cpuDemand = Double.NaN - @JvmField - var cpuReadyPct = Double.NaN - @JvmField - var memCapacity = Double.NaN - @JvmField - var diskRead = Double.NaN - @JvmField - var diskWrite = Double.NaN - @JvmField - var poweredOn: Boolean = false - - /** - * Reset the state. - */ - fun reset() { - id = null - timestamp = null - cluster = null - cpuCores = -1 - cpuCapacity = Double.NaN - cpuUsage = Double.NaN - cpuDemand = Double.NaN - cpuReadyPct = Double.NaN - memCapacity = Double.NaN - diskRead = Double.NaN - diskWrite = Double.NaN - poweredOn = false - } + private var id: String? = null + private var cluster: String? = null + private var timestamp: Instant? = null + private var cpuCores = -1 + private var cpuCapacity = Double.NaN + private var cpuUsage = Double.NaN + private var cpuDemand = Double.NaN + private var cpuReadyPct = Double.NaN + private var memCapacity = Double.NaN + private var diskRead = Double.NaN + private var diskWrite = Double.NaN + private var poweredOn: Boolean = false + + /** + * Reset the state of the reader. + */ + private fun reset() { + id = null + timestamp = null + cluster = null + cpuCores = -1 + cpuCapacity = Double.NaN + cpuUsage = Double.NaN + cpuDemand = Double.NaN + cpuReadyPct = Double.NaN + memCapacity = Double.NaN + diskRead = Double.NaN + diskWrite = Double.NaN + poweredOn = false } /** -- cgit v1.2.3