diff options
Diffstat (limited to 'opendc-trace/opendc-trace-wtf/src')
3 files changed, 40 insertions, 5 deletions
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index d6e42c8c..e71253ac 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -63,10 +63,10 @@ public class WtfTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { return when (table) { TABLE_TASKS -> { - val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport()) + val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection)) WtfTaskTableReader(reader) } else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt index 0017a4a9..8e7325de 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt @@ -27,13 +27,48 @@ import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.* +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.* /** * A [ReadSupport] instance for [Task] objects. + * + * @param projection The projection of the table to read. */ -internal class TaskReadSupport : ReadSupport<Task>() { +internal class TaskReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Task>() { + /** + * Mapping of table columns to their Parquet column names. + */ + private val colMap = mapOf<TableColumn<*>, String>( + TASK_ID to "id", + TASK_WORKFLOW_ID to "workflow_id", + TASK_SUBMIT_TIME to "ts_submit", + TASK_WAIT_TIME to "wait_time", + TASK_RUNTIME to "runtime", + TASK_REQ_NCPUS to "resource_amount_requested", + TASK_PARENTS to "parents", + TASK_CHILDREN to "children", + TASK_GROUP_ID to "group_id", + TASK_USER_ID to "user_id" + ) + override fun init(context: InitContext): ReadContext { - return ReadContext(READ_SCHEMA) + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val fieldByName = READ_SCHEMA.fields.associateBy { it.name } + + for (col in projection) { + val fieldName = colMap[col] ?: continue + addField(fieldByName.getValue(fieldName)) + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + return ReadContext(projectedSchema) } override fun prepareForRead( diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index 0f0e422d..c0eb3f08 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -61,7 +61,7 @@ class WtfTraceFormatTest { @Test fun testTableReader() { val path = Paths.get("src/test/resources/wtf-trace") - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, listOf(TASK_ID, TASK_WORKFLOW_ID, TASK_SUBMIT_TIME, TASK_RUNTIME, TASK_PARENTS)) assertAll( { assertTrue(reader.nextRow()) }, |
