diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-06-06 16:21:21 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-06-07 15:46:53 +0200 |
| commit | 2358257c1080b7ce78270535f82f0b960d48261a (patch) | |
| tree | bced69c02698e85f995aa9935ddcfb54df23a64f /opendc-trace/opendc-trace-wtf/src | |
| parent | 61b6550d7a476ab1aae45a5b9385dfd6ca4f6b6f (diff) | |
refactor(trace/api): Introduce type system for trace API
This change updates the trace API by introducing a limited type system
for the table columns. Previously, the table columns could have any
possible type representable by the JVM. With this change, we limit the
available types to a small type system.
Diffstat (limited to 'opendc-trace/opendc-trace-wtf/src')
4 files changed, 109 insertions, 71 deletions
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt index f0db78b7..bb5eb668 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt @@ -24,8 +24,12 @@ package org.opendc.trace.wtf import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.util.convertTo import org.opendc.trace.util.parquet.LocalParquetReader import org.opendc.trace.wtf.parquet.Task +import java.time.Duration +import java.time.Instant +import java.util.* /** * A [TableReader] implementation for the WTF format. @@ -48,26 +52,39 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) } } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + private val COL_ID = 0 + private val COL_WORKFLOW_ID = 1 + private val COL_SUBMIT_TIME = 2 + private val COL_WAIT_TIME = 3 + private val COL_RUNTIME = 4 + private val COL_REQ_NCPUS = 5 + private val COL_PARENTS = 6 + private val COL_CHILDREN = 7 + private val COL_GROUP_ID = 8 + private val COL_USER_ID = 9 - override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column index" } - return get(index) == null + private val TYPE_PARENTS = TableColumnType.Set(TableColumnType.String) + private val TYPE_CHILDREN = TableColumnType.Set(TableColumnType.String) + + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> COL_ID + TASK_WORKFLOW_ID -> COL_WORKFLOW_ID + TASK_SUBMIT_TIME -> COL_SUBMIT_TIME + TASK_WAIT_TIME -> COL_WAIT_TIME + TASK_RUNTIME -> COL_RUNTIME + TASK_REQ_NCPUS -> COL_REQ_NCPUS + TASK_PARENTS -> COL_PARENTS + TASK_CHILDREN -> COL_CHILDREN + TASK_GROUP_ID -> COL_GROUP_ID + TASK_USER_ID -> COL_USER_ID + else -> -1 + } } - override fun get(index: Int): Any? { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - COL_ID -> record.id - COL_WORKFLOW_ID -> record.workflowId - COL_SUBMIT_TIME -> record.submitTime - COL_WAIT_TIME -> record.waitTime - COL_RUNTIME -> record.runtime - COL_REQ_NCPUS, COL_GROUP_ID, COL_USER_ID -> getInt(index) - COL_PARENTS -> record.parents - COL_CHILDREN -> record.children - else -> throw IllegalArgumentException("Invalid column") - } + override fun isNull(index: Int): Boolean { + check(index in COL_ID..COL_USER_ID) { "Invalid column index" } + return false } override fun getBoolean(index: Int): Boolean { @@ -89,35 +106,62 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) throw IllegalArgumentException("Invalid column") } + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + override fun getDouble(index: Int): Double { throw IllegalArgumentException("Invalid column") } - override fun close() { - reader.close() + override fun getString(index: Int): String { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + COL_ID -> record.id + COL_WORKFLOW_ID -> record.workflowId + else -> throw IllegalArgumentException("Invalid column") + } } - private val COL_ID = 0 - private val COL_WORKFLOW_ID = 1 - private val COL_SUBMIT_TIME = 2 - private val COL_WAIT_TIME = 3 - private val COL_RUNTIME = 4 - private val COL_REQ_NCPUS = 5 - private val COL_PARENTS = 6 - private val COL_CHILDREN = 7 - private val COL_GROUP_ID = 8 - private val COL_USER_ID = 9 + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } - private val columns = mapOf( - TASK_ID to COL_ID, - TASK_WORKFLOW_ID to COL_WORKFLOW_ID, - TASK_SUBMIT_TIME to COL_SUBMIT_TIME, - TASK_WAIT_TIME to COL_WAIT_TIME, - TASK_RUNTIME to COL_RUNTIME, - TASK_REQ_NCPUS to COL_REQ_NCPUS, - TASK_PARENTS to COL_PARENTS, - TASK_CHILDREN to COL_CHILDREN, - TASK_GROUP_ID to COL_GROUP_ID, - TASK_USER_ID to COL_USER_ID, - ) + override fun getInstant(index: Int): Instant { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + COL_SUBMIT_TIME -> record.submitTime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + COL_WAIT_TIME -> record.waitTime + COL_RUNTIME -> record.runtime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + throw IllegalArgumentException("Invalid column") + } + + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + COL_PARENTS -> TYPE_PARENTS.convertTo(record.parents, elementType) + COL_CHILDREN -> TYPE_CHILDREN.convertTo(record.children, elementType) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + reader.close() + } } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index e71253ac..c8408626 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -46,24 +46,23 @@ public class WtfTraceFormat : TraceFormat { return when (table) { TABLE_TASKS -> TableDetails( listOf( - TASK_ID, - TASK_WORKFLOW_ID, - TASK_SUBMIT_TIME, - TASK_WAIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_PARENTS, - TASK_CHILDREN, - TASK_GROUP_ID, - TASK_USER_ID - ), - listOf(TASK_SUBMIT_TIME) + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), + TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), + TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_GROUP_ID, TableColumnType.Int), + TableColumn(TASK_USER_ID, TableColumnType.Int) + ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { + override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { return when (table) { TABLE_TASKS -> { val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection)) diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt index 8e7325de..a6087d9f 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt @@ -27,7 +27,6 @@ import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.* -import org.opendc.trace.TableColumn import org.opendc.trace.conv.* /** @@ -35,11 +34,11 @@ import org.opendc.trace.conv.* * * @param projection The projection of the table to read. */ -internal class TaskReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Task>() { +internal class TaskReadSupport(private val projection: List<String>?) : ReadSupport<Task>() { /** * Mapping of table columns to their Parquet column names. */ - private val colMap = mapOf<TableColumn<*>, String>( + private val colMap = mapOf( TASK_ID to "id", TASK_WORKFLOW_ID to "workflow_id", TASK_SUBMIT_TIME to "ts_submit", diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index c0eb3f08..2312035a 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -65,32 +65,28 @@ class WtfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("362334516345962206", reader.get(TASK_ID)) }, - { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals(Instant.ofEpochMilli(245604), reader.get(TASK_SUBMIT_TIME)) }, - { assertEquals(Duration.ofMillis(8163), reader.get(TASK_RUNTIME)) }, + { assertEquals("362334516345962206", reader.getString(TASK_ID)) }, + { assertEquals("1078341553348591493", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals(Instant.ofEpochMilli(245604), reader.getInstant(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofMillis(8163), reader.getDuration(TASK_RUNTIME)) }, { assertEquals( setOf("584055316413447529", "133113685133695608", "1008582348422865408"), - reader.get( - TASK_PARENTS - ) + reader.getSet(TASK_PARENTS, String::class.java) ) }, ) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("502010169100446658", reader.get(TASK_ID)) }, - { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals(Instant.ofEpochMilli(251325), reader.get(TASK_SUBMIT_TIME)) }, - { assertEquals(Duration.ofMillis(8216), reader.get(TASK_RUNTIME)) }, + { assertEquals("502010169100446658", reader.getString(TASK_ID)) }, + { assertEquals("1078341553348591493", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals(Instant.ofEpochMilli(251325), reader.getInstant(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofMillis(8216), reader.getDuration(TASK_RUNTIME)) }, { assertEquals( setOf("584055316413447529", "133113685133695608", "1008582348422865408"), - reader.get( - TASK_PARENTS - ) + reader.getSet(TASK_PARENTS, String::class.java) ) }, ) |
