summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-opendc/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-trace/opendc-trace-opendc/src/main')
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt82
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt82
2 files changed, 88 insertions, 76 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
index e4b18735..b5043f82 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
@@ -55,54 +55,46 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
return record != null
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_DURATION -> true
- RESOURCE_CPU_COUNT -> true
- RESOURCE_STATE_CPU_USAGE -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..columns.size) { "Invalid column index" }
+ return get(index) == null
}
- override fun <T> get(column: TableColumn<T>): T {
+ override fun get(index: Int): Any? {
val record = checkNotNull(record) { "Reader in invalid state" }
- @Suppress("UNCHECKED_CAST")
- val res: Any = when (column) {
- RESOURCE_ID -> record[COL_ID].toString()
- RESOURCE_STATE_TIMESTAMP -> Instant.ofEpochMilli(record[COL_TIMESTAMP] as Long)
- RESOURCE_STATE_DURATION -> Duration.ofMillis(record[COL_DURATION] as Long)
- RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT)
- RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE)
+ return when (index) {
+ COL_ID -> record[AVRO_COL_ID].toString()
+ COL_TIMESTAMP -> Instant.ofEpochMilli(record[AVRO_COL_TIMESTAMP] as Long)
+ COL_DURATION -> Duration.ofMillis(record[AVRO_COL_DURATION] as Long)
+ COL_CPU_COUNT -> getInt(index)
+ COL_CPU_USAGE -> getDouble(index)
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
+ override fun getInt(index: Int): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
- return when (column) {
- RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int
+ return when (index) {
+ COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
+ override fun getDouble(index: Int): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
- return when (column) {
- RESOURCE_STATE_CPU_USAGE -> (record[COL_CPU_USAGE] as Number).toDouble()
+ return when (index) {
+ COL_CPU_USAGE -> (record[AVRO_COL_CPU_USAGE] as Number).toDouble()
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -118,20 +110,34 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
*/
private fun initColumns(schema: Schema) {
try {
- COL_ID = schema.getField("id").pos()
- COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos()
- COL_DURATION = schema.getField("duration").pos()
- COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
- COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
+ AVRO_COL_ID = schema.getField("id").pos()
+ AVRO_COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos()
+ AVRO_COL_DURATION = schema.getField("duration").pos()
+ AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
+ AVRO_COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
} catch (e: NullPointerException) {
// This happens when the field we are trying to access does not exist
throw IllegalArgumentException("Invalid schema", e)
}
}
- private var COL_ID = -1
- private var COL_TIMESTAMP = -1
- private var COL_DURATION = -1
- private var COL_CPU_COUNT = -1
- private var COL_CPU_USAGE = -1
+ private var AVRO_COL_ID = -1
+ private var AVRO_COL_TIMESTAMP = -1
+ private var AVRO_COL_DURATION = -1
+ private var AVRO_COL_CPU_COUNT = -1
+ private var AVRO_COL_CPU_USAGE = -1
+
+ private val COL_ID = 0
+ private val COL_TIMESTAMP = 1
+ private val COL_DURATION = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_CPU_USAGE = 4
+
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
+ RESOURCE_STATE_DURATION to COL_DURATION,
+ RESOURCE_CPU_COUNT to COL_CPU_COUNT,
+ RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE,
+ )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
index c52da62d..d93929aa 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
@@ -54,56 +54,48 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
return record != null
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_ID -> true
- RESOURCE_START_TIME -> true
- RESOURCE_STOP_TIME -> true
- RESOURCE_CPU_COUNT -> true
- RESOURCE_MEM_CAPACITY -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..columns.size) { "Invalid column index" }
+ return get(index) == null
}
- override fun <T> get(column: TableColumn<T>): T {
+ override fun get(index: Int): Any? {
val record = checkNotNull(record) { "Reader in invalid state" }
- @Suppress("UNCHECKED_CAST")
- val res: Any = when (column) {
- RESOURCE_ID -> record[COL_ID].toString()
- RESOURCE_START_TIME -> Instant.ofEpochMilli(record[COL_START_TIME] as Long)
- RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record[COL_STOP_TIME] as Long)
- RESOURCE_CPU_COUNT -> getInt(RESOURCE_CPU_COUNT)
- RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY)
+ return when (index) {
+ COL_ID -> record[AVRO_COL_ID].toString()
+ COL_START_TIME -> Instant.ofEpochMilli(record[AVRO_COL_START_TIME] as Long)
+ COL_STOP_TIME -> Instant.ofEpochMilli(record[AVRO_COL_STOP_TIME] as Long)
+ COL_CPU_COUNT -> getInt(index)
+ COL_MEM_CAPACITY -> getDouble(index)
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
+ override fun getInt(index: Int): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
- return when (column) {
- RESOURCE_CPU_COUNT -> record[COL_CPU_COUNT] as Int
+ return when (index) {
+ COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
+ override fun getDouble(index: Int): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
- return when (column) {
- RESOURCE_MEM_CAPACITY -> (record[COL_MEM_CAPACITY] as Number).toDouble()
+ return when (index) {
+ COL_MEM_CAPACITY -> (record[AVRO_COL_MEM_CAPACITY] as Number).toDouble()
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -119,20 +111,34 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
*/
private fun initColumns(schema: Schema) {
try {
- COL_ID = schema.getField("id").pos()
- COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
- COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
- COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
- COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
+ AVRO_COL_ID = schema.getField("id").pos()
+ AVRO_COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
+ AVRO_COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
+ AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
+ AVRO_COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
} catch (e: NullPointerException) {
// This happens when the field we are trying to access does not exist
throw IllegalArgumentException("Invalid schema")
}
}
- private var COL_ID = -1
- private var COL_START_TIME = -1
- private var COL_STOP_TIME = -1
- private var COL_CPU_COUNT = -1
- private var COL_MEM_CAPACITY = -1
+ private var AVRO_COL_ID = -1
+ private var AVRO_COL_START_TIME = -1
+ private var AVRO_COL_STOP_TIME = -1
+ private var AVRO_COL_CPU_COUNT = -1
+ private var AVRO_COL_MEM_CAPACITY = -1
+
+ private val COL_ID = 0
+ private val COL_START_TIME = 1
+ private val COL_STOP_TIME = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_MEM_CAPACITY = 4
+
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_START_TIME to COL_START_TIME,
+ RESOURCE_STOP_TIME to COL_STOP_TIME,
+ RESOURCE_CPU_COUNT to COL_CPU_COUNT,
+ RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY,
+ )
}