summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-wtf/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-02 14:17:55 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-02 15:37:04 +0200
commit670cd279ea7789e07b6d778a21fdec68347ab305 (patch)
treedf91a7fd81a5d1afc51a0380fd938adf701bc43b /opendc-trace/opendc-trace-wtf/src
parent9411845b3f26536a1e6ea40504e396f19d25a09a (diff)
feat(trace/api): Add support for projecting tables
This change adds support for projecting certain columns of a table. This enables faster reading for tables with high number of columns. Currently, we support projection in the Parquet-based workload formats. Other formats are text-based and will probably not benefit much from projection.
Diffstat (limited to 'opendc-trace/opendc-trace-wtf/src')
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt4
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt39
-rw-r--r--opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt2
3 files changed, 40 insertions, 5 deletions
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
index d6e42c8c..e71253ac 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
@@ -63,10 +63,10 @@ public class WtfTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_TASKS -> {
- val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport())
+ val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection))
WtfTaskTableReader(reader)
}
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
index 0017a4a9..8e7325de 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
@@ -27,13 +27,48 @@ import org.apache.parquet.hadoop.api.InitContext
import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.io.api.RecordMaterializer
import org.apache.parquet.schema.*
+import org.opendc.trace.TableColumn
+import org.opendc.trace.conv.*
/**
* A [ReadSupport] instance for [Task] objects.
+ *
+ * @param projection The projection of the table to read.
*/
-internal class TaskReadSupport : ReadSupport<Task>() {
+internal class TaskReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Task>() {
+ /**
+ * Mapping of table columns to their Parquet column names.
+ */
+ private val colMap = mapOf<TableColumn<*>, String>(
+ TASK_ID to "id",
+ TASK_WORKFLOW_ID to "workflow_id",
+ TASK_SUBMIT_TIME to "ts_submit",
+ TASK_WAIT_TIME to "wait_time",
+ TASK_RUNTIME to "runtime",
+ TASK_REQ_NCPUS to "resource_amount_requested",
+ TASK_PARENTS to "parents",
+ TASK_CHILDREN to "children",
+ TASK_GROUP_ID to "group_id",
+ TASK_USER_ID to "user_id"
+ )
+
override fun init(context: InitContext): ReadContext {
- return ReadContext(READ_SCHEMA)
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val fieldByName = READ_SCHEMA.fields.associateBy { it.name }
+
+ for (col in projection) {
+ val fieldName = colMap[col] ?: continue
+ addField(fieldByName.getValue(fieldName))
+ }
+ }
+ .named(READ_SCHEMA.name)
+ } else {
+ READ_SCHEMA
+ }
+ return ReadContext(projectedSchema)
}
override fun prepareForRead(
diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
index 0f0e422d..c0eb3f08 100644
--- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
@@ -61,7 +61,7 @@ class WtfTraceFormatTest {
@Test
fun testTableReader() {
val path = Paths.get("src/test/resources/wtf-trace")
- val reader = format.newReader(path, TABLE_TASKS)
+ val reader = format.newReader(path, TABLE_TASKS, listOf(TASK_ID, TASK_WORKFLOW_ID, TASK_SUBMIT_TIME, TASK_RUNTIME, TASK_PARENTS))
assertAll(
{ assertTrue(reader.nextRow()) },