diff options
Diffstat (limited to 'opendc-trace/opendc-trace-opendc/src/main')
2 files changed, 88 insertions, 76 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index e4b18735..b5043f82 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -55,54 +55,46 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea return record != null } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_DURATION -> true - RESOURCE_CPU_COUNT -> true - RESOURCE_STATE_CPU_USAGE -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column index" } + return get(index) == null } - override fun <T> get(column: TableColumn<T>): T { + override fun get(index: Int): Any? { val record = checkNotNull(record) { "Reader in invalid state" } - @Suppress("UNCHECKED_CAST") - val res: Any = when (column) { - RESOURCE_ID -> record[COL_ID].toString() - RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record[COL_TIMESTAMP] as Long) - RESOURCE_STATE_DURATION -> Duration.ofMillis(record[COL_DURATION] as Long) - RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) - RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) + return when (index) { + COL_ID -> record[AVRO_COL_ID].toString() + COL_TIMESTAMP -> Instant.ofEpochMilli(record[AVRO_COL_TIMESTAMP] as Long) + COL_DURATION -> Duration.ofMillis(record[AVRO_COL_DURATION] as Long) + COL_CPU_COUNT -> getInt(index) + COL_CPU_USAGE -> getDouble(index) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn<Boolean>): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn<Int>): Int { + override fun getInt(index: Int): Int { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int + return when (index) { + COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn<Long>): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn<Double>): Double { + override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_STATE_CPU_USAGE -> (record[COL_CPU_USAGE] as Number).toDouble() + return when (index) { + COL_CPU_USAGE -> (record[AVRO_COL_CPU_USAGE] as Number).toDouble() else -> throw IllegalArgumentException("Invalid column") } } @@ -118,20 +110,34 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea */ private fun initColumns(schema: Schema) { try { - COL_ID = schema.getField("id").pos() - COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos() - COL_DURATION = schema.getField("duration").pos() - COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos() - COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() + AVRO_COL_ID = schema.getField("id").pos() + AVRO_COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos() + AVRO_COL_DURATION = schema.getField("duration").pos() + AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos() + AVRO_COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() } catch (e: NullPointerException) { // This happens when the field we are trying to access does not exist throw IllegalArgumentException("Invalid schema", e) } } - private var COL_ID = -1 - private var COL_TIMESTAMP = -1 - private var COL_DURATION = -1 - private var COL_CPU_COUNT = -1 - private var COL_CPU_USAGE = -1 + private var AVRO_COL_ID = -1 + private var AVRO_COL_TIMESTAMP = -1 + private var AVRO_COL_DURATION = -1 + private var AVRO_COL_CPU_COUNT = -1 + private var AVRO_COL_CPU_USAGE = -1 + + private val COL_ID = 0 + private val COL_TIMESTAMP = 1 + private val COL_DURATION = 2 + private val COL_CPU_COUNT = 3 + private val COL_CPU_USAGE = 4 + + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, + RESOURCE_STATE_DURATION to COL_DURATION, + RESOURCE_CPU_COUNT to COL_CPU_COUNT, + RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, + ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt index c52da62d..d93929aa 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt @@ -54,56 +54,48 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G return record != null } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_START_TIME -> true - RESOURCE_STOP_TIME -> true - RESOURCE_CPU_COUNT -> true - RESOURCE_MEM_CAPACITY -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column index" } + return get(index) == null } - override fun <T> get(column: TableColumn<T>): T { + override fun get(index: Int): Any? { val record = checkNotNull(record) { "Reader in invalid state" } - @Suppress("UNCHECKED_CAST") - val res: Any = when (column) { - RESOURCE_ID -> record[COL_ID].toString() - RESOURCE_START_TIME -> Instant.ofEpochMilli(record[COL_START_TIME] as Long) - RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record[COL_STOP_TIME] as Long) - RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT) - RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) + return when (index) { + COL_ID -> record[AVRO_COL_ID].toString() + COL_START_TIME -> Instant.ofEpochMilli(record[AVRO_COL_START_TIME] as Long) + COL_STOP_TIME -> Instant.ofEpochMilli(record[AVRO_COL_STOP_TIME] as Long) + COL_CPU_COUNT -> getInt(index) + COL_MEM_CAPACITY -> getDouble(index) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn<Boolean>): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn<Int>): Int { + override fun getInt(index: Int): Int { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int + return when (index) { + COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn<Long>): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn<Double>): Double { + override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } - return when (column) { - RESOURCE_MEM_CAPACITY -> (record[COL_MEM_CAPACITY] as Number).toDouble() + return when (index) { + COL_MEM_CAPACITY -> (record[AVRO_COL_MEM_CAPACITY] as Number).toDouble() else -> throw IllegalArgumentException("Invalid column") } } @@ -119,20 +111,34 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G */ private fun initColumns(schema: Schema) { try { - COL_ID = schema.getField("id").pos() - COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos() - COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos() - COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos() - COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos() + AVRO_COL_ID = schema.getField("id").pos() + AVRO_COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos() + AVRO_COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos() + AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos() + AVRO_COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos() } catch (e: NullPointerException) { // This happens when the field we are trying to access does not exist throw IllegalArgumentException("Invalid schema") } } - private var COL_ID = -1 - private var COL_START_TIME = -1 - private var COL_STOP_TIME = -1 - private var COL_CPU_COUNT = -1 - private var COL_MEM_CAPACITY = -1 + private var AVRO_COL_ID = -1 + private var AVRO_COL_START_TIME = -1 + private var AVRO_COL_STOP_TIME = -1 + private var AVRO_COL_CPU_COUNT = -1 + private var AVRO_COL_MEM_CAPACITY = -1 + + private val COL_ID = 0 + private val COL_START_TIME = 1 + private val COL_STOP_TIME = 2 + private val COL_CPU_COUNT = 3 + private val COL_MEM_CAPACITY = 4 + + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_START_TIME to COL_START_TIME, + RESOURCE_STOP_TIME to COL_STOP_TIME, + RESOURCE_CPU_COUNT to COL_CPU_COUNT, + RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY, + ) } |
