diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-20 15:12:10 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-20 15:12:10 +0200 |
| commit | 768bfa0d2ae763e359d74612385ce43c41afb432 (patch) | |
| tree | 1ca870d5dd75f7250e91ae7dec41a5e68a77856f /opendc-trace/opendc-trace-opendc/src | |
| parent | 55a4c8208cc44ac626f7b8c61a19d5ec725ec936 (diff) | |
feat(trace): Support column lookup via index
This change adds support for looking up the column value through the
column index. This enables faster lookup when processing very large
traces.
Diffstat (limited to 'opendc-trace/opendc-trace-opendc/src')
2 files changed, 88 insertions, 76 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index e4b18735..b5043f82 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -55,54 +55,46 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea return record != null } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_DURATION -> true - RESOURCE_CPU_COUNT -> true - RESOURCE_STATE_CPU_USAGE -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column index" } + return get(index) == null } - override fun <T> get(column: TableColumn<T>): T { + override fun get(index: Int): Any? { val record = checkNotNull(record) { "Reader in invalid state" } - @Suppress("UNCHECKED_CAST") - val res: Any = when (column) { - RESOURCE_ID -> record[COL_ID].toString() - RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record[COL_TIMESTAMP] as Long) - RESOURCE_STATE_DURATION -> Duration.ofMillis(record[COL_DURATION] as Long) - RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) - RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) + return when (index) { + COL_ID -> record[AVRO_COL_ID].toString() + COL_TIMESTAMP -> Instant.ofEpochMilli(record[AVRO_COL_TIMESTAMP] as Long) + COL_DURATION -> Duration.ofMillis(record[AVRO_COL_DURATION] as Long) + COL_CPU_COUNT -> getInt(index) + COL_CPU_USAGE -> getDouble(index) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn<Boolean>): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn<Int>): Int { + override fun getInt(index: Int): Int { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int + return when (index) { + COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn<Long>): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn<Double>): Double { + override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_STATE_CPU_USAGE -> (record[COL_CPU_USAGE] as Number).toDouble() + return when (index) { + COL_CPU_USAGE -> (record[AVRO_COL_CPU_USAGE] as Number).toDouble() else -> throw IllegalArgumentException("Invalid column") } } @@ -118,20 +110,34 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea */ private fun initColumns(schema: Schema) { try { - COL_ID = schema.getField("id").pos() - COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos() - COL_DURATION = schema.getField("duration").pos() - COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos() - COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() + AVRO_COL_ID = schema.getField("id").pos() + AVRO_COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos() + AVRO_COL_DURATION = schema.getField("duration").pos() + AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos() + AVRO_COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() } catch (e: NullPointerException) { // This happens when the field we are trying to access does not exist throw IllegalArgumentException("Invalid schema", e) } } - private var COL_ID = -1 - private var COL_TIMESTAMP = -1 - private var COL_DURATION = -1 - private var COL_CPU_COUNT = -1 - private var COL_CPU_USAGE = -1 + private var AVRO_COL_ID = -1 + private var AVRO_COL_TIMESTAMP = -1 + private var AVRO_COL_DURATION = -1 + private var AVRO_COL_CPU_COUNT = -1 + private var AVRO_COL_CPU_USAGE = -1 + + private val COL_ID = 0 + private val COL_TIMESTAMP = 1 + private val COL_DURATION = 2 + private val COL_CPU_COUNT = 3 + private val COL_CPU_USAGE = 4 + + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, + RESOURCE_STATE_DURATION to COL_DURATION, + RESOURCE_CPU_COUNT to COL_CPU_COUNT, + RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, + ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt index c52da62d..d93929aa 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt @@ -54,56 +54,48 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G return record != null } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_START_TIME -> true - RESOURCE_STOP_TIME -> true - RESOURCE_CPU_COUNT -> true - RESOURCE_MEM_CAPACITY -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column index" } + return get(index) == null } - override fun <T> get(column: TableColumn<T>): T { + override fun get(index: Int): Any? { val record = checkNotNull(record) { "Reader in invalid state" } - @Suppress("UNCHECKED_CAST") - val res: Any = when (column) { - RESOURCE_ID -> record[COL_ID].toString() - RESOURCE_START_TIME -> Instant.ofEpochMilli(record[COL_START_TIME] as Long) - RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record[COL_STOP_TIME] as Long) - RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) - RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) + return when (index) { + COL_ID -> record[AVRO_COL_ID].toString() + COL_START_TIME -> Instant.ofEpochMilli(record[AVRO_COL_START_TIME] as Long) + COL_STOP_TIME -> Instant.ofEpochMilli(record[AVRO_COL_STOP_TIME] as Long) + COL_CPU_COUNT -> getInt(index) + COL_MEM_CAPACITY -> getDouble(index) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn<Boolean>): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn<Int>): Int { + override fun getInt(index: Int): Int { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int + return when (index) { + COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn<Long>): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn<Double>): Double { + override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_MEM_CAPACITY -> (record[COL_MEM_CAPACITY] as Number).toDouble() + return when (index) { + COL_MEM_CAPACITY -> (record[AVRO_COL_MEM_CAPACITY] as Number).toDouble() else -> throw IllegalArgumentException("Invalid column") } } @@ -119,20 +111,34 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G */ private fun initColumns(schema: Schema) { try { - COL_ID = schema.getField("id").pos() - COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos() - COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos() - COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos() - COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos() + AVRO_COL_ID = schema.getField("id").pos() + AVRO_COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos() + AVRO_COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos() + AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos() + AVRO_COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos() } catch (e: NullPointerException) { // This happens when the field we are trying to access does not exist throw IllegalArgumentException("Invalid schema") } } - private var COL_ID = -1 - private var COL_START_TIME = -1 - private var COL_STOP_TIME = -1 - private var COL_CPU_COUNT = -1 - private var COL_MEM_CAPACITY = -1 + private var AVRO_COL_ID = -1 + private var AVRO_COL_START_TIME = -1 + private var AVRO_COL_STOP_TIME = -1 + private var AVRO_COL_CPU_COUNT = -1 + private var AVRO_COL_MEM_CAPACITY = -1 + + private val COL_ID = 0 + private val COL_START_TIME = 1 + private val COL_STOP_TIME = 2 + private val COL_CPU_COUNT = 3 + private val COL_MEM_CAPACITY = 4 + + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_START_TIME to COL_START_TIME, + RESOURCE_STOP_TIME to COL_STOP_TIME, + RESOURCE_CPU_COUNT to COL_CPU_COUNT, + RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY, + ) } |
