diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-06-09 10:31:41 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-06-09 10:31:41 +0200 |
| commit | d146814bbbb86bfcb19ccb94250424703e9179e5 (patch) | |
| tree | bf20f51b434d56e60ad013568ac1a32b912a3b5e /opendc-trace/opendc-trace-wtf | |
| parent | 61b6550d7a476ab1aae45a5b9385dfd6ca4f6b6f (diff) | |
| parent | 9d759c9bc987965fae8b0c16c000772c546bf3a2 (diff) | |
merge: Introduce schema for trace API (#88)
This pull request updates the OpenDC trace API to support proper specification
of a schema of the tables exposed by the traces. This functionality makes it easier
for the API consumer to understand the types exposed by the API.
## Implementation Notes :hammer_and_pick:
* Introduce type system for trace API
* Add benchmarks for odcvm trace format
* Add benchmarks for Azure trace format
* Add conformance suite for OpenDC trace API
## External Dependencies :four_leaf_clover:
* N/A
## Breaking API Changes :warning:
* Removal of typed `TableColumn`. Instead, `TableColumn` instances are now
used to describe the columns belonging to some table.
* `TableReader` and `TableWriter` do not support accessing arbitrary objects
anymore. Instead, only the types supported by the type system are exposed.
Diffstat (limited to 'opendc-trace/opendc-trace-wtf')
5 files changed, 131 insertions, 73 deletions
diff --git a/opendc-trace/opendc-trace-wtf/build.gradle.kts b/opendc-trace/opendc-trace-wtf/build.gradle.kts index 35eb32e5..599087e1 100644 --- a/opendc-trace/opendc-trace-wtf/build.gradle.kts +++ b/opendc-trace/opendc-trace-wtf/build.gradle.kts @@ -32,5 +32,6 @@ dependencies { implementation(projects.opendcTrace.opendcTraceParquet) + testImplementation(projects.opendcTrace.opendcTraceTestkit) testRuntimeOnly(libs.slf4j.simple) } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt index f0db78b7..7d2005b2 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt @@ -24,8 +24,12 @@ package org.opendc.trace.wtf import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.util.convertTo import org.opendc.trace.util.parquet.LocalParquetReader import org.opendc.trace.wtf.parquet.Task +import java.time.Duration +import java.time.Instant +import java.util.* /** * A [TableReader] implementation for the WTF format. @@ -48,26 +52,39 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) } } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + private val COL_ID = 0 + private val COL_WORKFLOW_ID = 1 + private val COL_SUBMIT_TIME = 2 + private val COL_WAIT_TIME = 3 + private val COL_RUNTIME = 4 + private val COL_REQ_NCPUS = 5 + private val COL_PARENTS = 6 + private val COL_CHILDREN = 7 + private val COL_GROUP_ID = 8 + private val COL_USER_ID = 9 - override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column index" } - return get(index) == null + private val TYPE_PARENTS = TableColumnType.Set(TableColumnType.String) + private val TYPE_CHILDREN = TableColumnType.Set(TableColumnType.String) + + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> COL_ID + TASK_WORKFLOW_ID -> COL_WORKFLOW_ID + TASK_SUBMIT_TIME -> COL_SUBMIT_TIME + TASK_WAIT_TIME -> COL_WAIT_TIME + TASK_RUNTIME -> COL_RUNTIME + TASK_REQ_NCPUS -> COL_REQ_NCPUS + TASK_PARENTS -> COL_PARENTS + TASK_CHILDREN -> COL_CHILDREN + TASK_GROUP_ID -> COL_GROUP_ID + TASK_USER_ID -> COL_USER_ID + else -> -1 + } } - override fun get(index: Int): Any? { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - COL_ID -> record.id - COL_WORKFLOW_ID -> record.workflowId - COL_SUBMIT_TIME -> record.submitTime - COL_WAIT_TIME -> record.waitTime - COL_RUNTIME -> record.runtime - COL_REQ_NCPUS, COL_GROUP_ID, COL_USER_ID -> getInt(index) - COL_PARENTS -> record.parents - COL_CHILDREN -> record.children - else -> throw IllegalArgumentException("Invalid column") - } + override fun isNull(index: Int): Boolean { + require(index in COL_ID..COL_USER_ID) { "Invalid column index" } + return false } override fun getBoolean(index: Int): Boolean { @@ -89,35 +106,62 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) throw IllegalArgumentException("Invalid column") } + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + override fun getDouble(index: Int): Double { throw IllegalArgumentException("Invalid column") } - override fun close() { - reader.close() + override fun getString(index: Int): String { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + COL_ID -> record.id + COL_WORKFLOW_ID -> record.workflowId + else -> throw IllegalArgumentException("Invalid column") + } } - private val COL_ID = 0 - private val COL_WORKFLOW_ID = 1 - private val COL_SUBMIT_TIME = 2 - private val COL_WAIT_TIME = 3 - private val COL_RUNTIME = 4 - private val COL_REQ_NCPUS = 5 - private val COL_PARENTS = 6 - private val COL_CHILDREN = 7 - private val COL_GROUP_ID = 8 - private val COL_USER_ID = 9 + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } - private val columns = mapOf( - TASK_ID to COL_ID, - TASK_WORKFLOW_ID to COL_WORKFLOW_ID, - TASK_SUBMIT_TIME to COL_SUBMIT_TIME, - TASK_WAIT_TIME to COL_WAIT_TIME, - TASK_RUNTIME to COL_RUNTIME, - TASK_REQ_NCPUS to COL_REQ_NCPUS, - TASK_PARENTS to COL_PARENTS, - TASK_CHILDREN to COL_CHILDREN, - TASK_GROUP_ID to COL_GROUP_ID, - TASK_USER_ID to COL_USER_ID, - ) + override fun getInstant(index: Int): Instant { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + COL_SUBMIT_TIME -> record.submitTime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + COL_WAIT_TIME -> record.waitTime + COL_RUNTIME -> record.runtime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + throw IllegalArgumentException("Invalid column") + } + + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + COL_PARENTS -> TYPE_PARENTS.convertTo(record.parents, elementType) + COL_CHILDREN -> TYPE_CHILDREN.convertTo(record.children, elementType) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + reader.close() + } } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index e71253ac..c8408626 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -46,24 +46,23 @@ public class WtfTraceFormat : TraceFormat { return when (table) { TABLE_TASKS -> TableDetails( listOf( - TASK_ID, - TASK_WORKFLOW_ID, - TASK_SUBMIT_TIME, - TASK_WAIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_PARENTS, - TASK_CHILDREN, - TASK_GROUP_ID, - TASK_USER_ID - ), - listOf(TASK_SUBMIT_TIME) + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), + TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), + TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_GROUP_ID, TableColumnType.Int), + TableColumn(TASK_USER_ID, TableColumnType.Int) + ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { + override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { return when (table) { TABLE_TASKS -> { val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection)) diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt index 8e7325de..a6087d9f 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt @@ -27,7 +27,6 @@ import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.* -import org.opendc.trace.TableColumn import org.opendc.trace.conv.* /** @@ -35,11 +34,11 @@ import org.opendc.trace.conv.* * * @param projection The projection of the table to read. */ -internal class TaskReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Task>() { +internal class TaskReadSupport(private val projection: List<String>?) : ReadSupport<Task>() { /** * Mapping of table columns to their Parquet column names. */ - private val colMap = mapOf<TableColumn<*>, String>( + private val colMap = mapOf( TASK_ID to "id", TASK_WORKFLOW_ID to "workflow_id", TASK_SUBMIT_TIME to "ts_submit", diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index c0eb3f08..f6b821c2 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -22,10 +22,13 @@ package org.opendc.trace.wtf +import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.* -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows +import org.junit.jupiter.api.Assertions.assertAll +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader import org.opendc.trace.conv.* +import org.opendc.trace.testkit.TableReaderTestKit import java.nio.file.Paths import java.time.Duration import java.time.Instant @@ -33,6 +36,7 @@ import java.time.Instant /** * Test suite for the [WtfTraceFormat] class. */ +@DisplayName("WTF TraceFormat") class WtfTraceFormatTest { private val format = WtfTraceFormat() @@ -65,36 +69,47 @@ class WtfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("362334516345962206", reader.get(TASK_ID)) }, - { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals(Instant.ofEpochMilli(245604), reader.get(TASK_SUBMIT_TIME)) }, - { assertEquals(Duration.ofMillis(8163), reader.get(TASK_RUNTIME)) }, + { assertEquals("362334516345962206", reader.getString(TASK_ID)) }, + { assertEquals("1078341553348591493", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals(Instant.ofEpochMilli(245604), reader.getInstant(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofMillis(8163), reader.getDuration(TASK_RUNTIME)) }, { assertEquals( setOf("584055316413447529", "133113685133695608", "1008582348422865408"), - reader.get( - TASK_PARENTS - ) + reader.getSet(TASK_PARENTS, String::class.java) ) }, ) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("502010169100446658", reader.get(TASK_ID)) }, - { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals(Instant.ofEpochMilli(251325), reader.get(TASK_SUBMIT_TIME)) }, - { assertEquals(Duration.ofMillis(8216), reader.get(TASK_RUNTIME)) }, + { assertEquals("502010169100446658", reader.getString(TASK_ID)) }, + { assertEquals("1078341553348591493", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals(Instant.ofEpochMilli(251325), reader.getInstant(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofMillis(8216), reader.getDuration(TASK_RUNTIME)) }, { assertEquals( setOf("584055316413447529", "133113685133695608", "1008582348422865408"), - reader.get( - TASK_PARENTS - ) + reader.getSet(TASK_PARENTS, String::class.java) ) }, ) reader.close() } + + @DisplayName("TableReader for Tasks") + @Nested + inner class TasksTableReaderTest : TableReaderTestKit() { + override lateinit var reader: TableReader + override lateinit var columns: List<TableColumn> + + @BeforeEach + fun setUp() { + val path = Paths.get("src/test/resources/wtf-trace") + + columns = format.getDetails(path, TABLE_TASKS).columns + reader = format.newReader(path, TABLE_TASKS, null) + } + } } |
