summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-wtf/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-20 15:12:10 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-20 15:12:10 +0200
commit768bfa0d2ae763e359d74612385ce43c41afb432 (patch)
tree1ca870d5dd75f7250e91ae7dec41a5e68a77856f /opendc-trace/opendc-trace-wtf/src
parent55a4c8208cc44ac626f7b8c61a19d5ec725ec936 (diff)
feat(trace): Support column lookup via index
This change adds support for looking up the column value through the column index. This enables faster lookup when processing very large traces.
Diffstat (limited to 'opendc-trace/opendc-trace-wtf/src')
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt132
1 files changed, 93 insertions, 39 deletions
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
index 5e2463f8..45ec25dd 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
@@ -22,6 +22,7 @@
package org.opendc.trace.wtf
+import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.util.parquet.LocalParquetReader
@@ -37,73 +38,126 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
*/
private var record: GenericRecord? = null
+ /**
+ * A flag to indicate that the columns have been initialized.
+ */
+ private var hasInitializedColumns = false
+
override fun nextRow(): Boolean {
- record = reader.read()
+ val record = reader.read()
+ this.record = record
+
+ if (!hasInitializedColumns && record != null) {
+ initColumns(record.schema)
+ hasInitializedColumns = true
+ }
+
return record != null
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- TASK_ID -> true
- TASK_WORKFLOW_ID -> true
- TASK_SUBMIT_TIME -> true
- TASK_WAIT_TIME -> true
- TASK_RUNTIME -> true
- TASK_REQ_NCPUS -> true
- TASK_PARENTS -> true
- TASK_CHILDREN -> true
- TASK_GROUP_ID -> true
- TASK_USER_ID -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..columns.size) { "Invalid column index" }
+ return get(index) == null
}
- override fun <T> get(column: TableColumn<T>): T {
+ override fun get(index: Int): Any? {
val record = checkNotNull(record) { "Reader in invalid state" }
-
@Suppress("UNCHECKED_CAST")
- val res: Any = when (column) {
- TASK_ID -> (record["id"] as Long).toString()
- TASK_WORKFLOW_ID -> (record["workflow_id"] as Long).toString()
- TASK_SUBMIT_TIME -> Instant.ofEpochMilli(record["ts_submit"] as Long)
- TASK_WAIT_TIME -> Duration.ofMillis(record["wait_time"] as Long)
- TASK_RUNTIME -> Duration.ofMillis(record["runtime"] as Long)
- TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt()
- TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
- TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
- TASK_GROUP_ID -> record["group_id"]
- TASK_USER_ID -> record["user_id"]
+ return when (index) {
+ COL_ID -> (record[AVRO_COL_ID] as Long).toString()
+ COL_WORKFLOW_ID -> (record[AVRO_COL_WORKFLOW_ID] as Long).toString()
+ COL_SUBMIT_TIME -> Instant.ofEpochMilli(record[AVRO_COL_SUBMIT_TIME] as Long)
+ COL_WAIT_TIME -> Duration.ofMillis(record[AVRO_COL_WAIT_TIME] as Long)
+ COL_RUNTIME -> Duration.ofMillis(record[AVRO_COL_RUNTIME] as Long)
+ COL_REQ_NCPUS, COL_GROUP_ID, COL_USER_ID -> getInt(index)
+ COL_PARENTS -> (record[AVRO_COL_PARENTS] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
+ COL_CHILDREN -> (record[AVRO_COL_CHILDREN] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
+ override fun getInt(index: Int): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
- return when (column) {
- TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt()
- TASK_GROUP_ID -> record["group_id"] as Int
- TASK_USER_ID -> record["user_id"] as Int
+ return when (index) {
+ COL_REQ_NCPUS -> (record[AVRO_COL_REQ_NCPUS] as Double).toInt()
+ COL_GROUP_ID -> record[AVRO_COL_GROUP_ID] as Int
+ COL_USER_ID -> record[AVRO_COL_USER_ID] as Int
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
+ override fun getDouble(index: Int): Double {
throw IllegalArgumentException("Invalid column")
}
override fun close() {
reader.close()
}
+
+ /**
+ * Initialize the columns for the reader based on [schema].
+ */
+ private fun initColumns(schema: Schema) {
+ try {
+ AVRO_COL_ID = schema.getField("id").pos()
+ AVRO_COL_WORKFLOW_ID = schema.getField("workflow_id").pos()
+ AVRO_COL_SUBMIT_TIME = schema.getField("ts_submit").pos()
+ AVRO_COL_WAIT_TIME = schema.getField("wait_time").pos()
+ AVRO_COL_RUNTIME = schema.getField("runtime").pos()
+ AVRO_COL_REQ_NCPUS = schema.getField("resource_amount_requested").pos()
+ AVRO_COL_PARENTS = schema.getField("parents").pos()
+ AVRO_COL_CHILDREN = schema.getField("children").pos()
+ AVRO_COL_GROUP_ID = schema.getField("group_id").pos()
+ AVRO_COL_USER_ID = schema.getField("user_id").pos()
+ } catch (e: NullPointerException) {
+ // This happens when the field we are trying to access does not exist
+ throw IllegalArgumentException("Invalid schema", e)
+ }
+ }
+
+ private var AVRO_COL_ID = -1
+ private var AVRO_COL_WORKFLOW_ID = -1
+ private var AVRO_COL_SUBMIT_TIME = -1
+ private var AVRO_COL_WAIT_TIME = -1
+ private var AVRO_COL_RUNTIME = -1
+ private var AVRO_COL_REQ_NCPUS = -1
+ private var AVRO_COL_PARENTS = -1
+ private var AVRO_COL_CHILDREN = -1
+ private var AVRO_COL_GROUP_ID = -1
+ private var AVRO_COL_USER_ID = -1
+
+ private val COL_ID = 0
+ private val COL_WORKFLOW_ID = 1
+ private val COL_SUBMIT_TIME = 2
+ private val COL_WAIT_TIME = 3
+ private val COL_RUNTIME = 4
+ private val COL_REQ_NCPUS = 5
+ private val COL_PARENTS = 6
+ private val COL_CHILDREN = 7
+ private val COL_GROUP_ID = 8
+ private val COL_USER_ID = 9
+
+ private val columns = mapOf(
+ TASK_ID to COL_ID,
+ TASK_WORKFLOW_ID to COL_WORKFLOW_ID,
+ TASK_SUBMIT_TIME to COL_SUBMIT_TIME,
+ TASK_WAIT_TIME to COL_WAIT_TIME,
+ TASK_RUNTIME to COL_RUNTIME,
+ TASK_REQ_NCPUS to COL_REQ_NCPUS,
+ TASK_PARENTS to COL_PARENTS,
+ TASK_CHILDREN to COL_CHILDREN,
+ TASK_GROUP_ID to COL_GROUP_ID,
+ TASK_USER_ID to COL_USER_ID,
+ )
}