summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-wtf/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-06-06 16:21:21 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-06-07 15:46:53 +0200
commit2358257c1080b7ce78270535f82f0b960d48261a (patch)
treebced69c02698e85f995aa9935ddcfb54df23a64f /opendc-trace/opendc-trace-wtf/src
parent61b6550d7a476ab1aae45a5b9385dfd6ca4f6b6f (diff)
refactor(trace/api): Introduce type system for trace API
This change updates the trace API by introducing a limited type system for the table columns. Previously, the table columns could have any possible type representable by the JVM. With this change, we limit the available types to a small type system.
Diffstat (limited to 'opendc-trace/opendc-trace-wtf/src')
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt126
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt25
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt5
-rw-r--r--opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt24
4 files changed, 109 insertions, 71 deletions
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
index f0db78b7..bb5eb668 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
@@ -24,8 +24,12 @@ package org.opendc.trace.wtf
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.util.convertTo
import org.opendc.trace.util.parquet.LocalParquetReader
import org.opendc.trace.wtf.parquet.Task
+import java.time.Duration
+import java.time.Instant
+import java.util.*
/**
* A [TableReader] implementation for the WTF format.
@@ -48,26 +52,39 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>)
}
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ private val COL_ID = 0
+ private val COL_WORKFLOW_ID = 1
+ private val COL_SUBMIT_TIME = 2
+ private val COL_WAIT_TIME = 3
+ private val COL_RUNTIME = 4
+ private val COL_REQ_NCPUS = 5
+ private val COL_PARENTS = 6
+ private val COL_CHILDREN = 7
+ private val COL_GROUP_ID = 8
+ private val COL_USER_ID = 9
- override fun isNull(index: Int): Boolean {
- check(index in 0..columns.size) { "Invalid column index" }
- return get(index) == null
+ private val TYPE_PARENTS = TableColumnType.Set(TableColumnType.String)
+ private val TYPE_CHILDREN = TableColumnType.Set(TableColumnType.String)
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ TASK_ID -> COL_ID
+ TASK_WORKFLOW_ID -> COL_WORKFLOW_ID
+ TASK_SUBMIT_TIME -> COL_SUBMIT_TIME
+ TASK_WAIT_TIME -> COL_WAIT_TIME
+ TASK_RUNTIME -> COL_RUNTIME
+ TASK_REQ_NCPUS -> COL_REQ_NCPUS
+ TASK_PARENTS -> COL_PARENTS
+ TASK_CHILDREN -> COL_CHILDREN
+ TASK_GROUP_ID -> COL_GROUP_ID
+ TASK_USER_ID -> COL_USER_ID
+ else -> -1
+ }
}
- override fun get(index: Int): Any? {
- val record = checkNotNull(record) { "Reader in invalid state" }
- return when (index) {
- COL_ID -> record.id
- COL_WORKFLOW_ID -> record.workflowId
- COL_SUBMIT_TIME -> record.submitTime
- COL_WAIT_TIME -> record.waitTime
- COL_RUNTIME -> record.runtime
- COL_REQ_NCPUS, COL_GROUP_ID, COL_USER_ID -> getInt(index)
- COL_PARENTS -> record.parents
- COL_CHILDREN -> record.children
- else -> throw IllegalArgumentException("Invalid column")
- }
+ override fun isNull(index: Int): Boolean {
+ check(index in COL_ID..COL_USER_ID) { "Invalid column index" }
+ return false
}
override fun getBoolean(index: Int): Boolean {
@@ -89,35 +106,62 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>)
throw IllegalArgumentException("Invalid column")
}
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
override fun getDouble(index: Int): Double {
throw IllegalArgumentException("Invalid column")
}
- override fun close() {
- reader.close()
+ override fun getString(index: Int): String {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (index) {
+ COL_ID -> record.id
+ COL_WORKFLOW_ID -> record.workflowId
+ else -> throw IllegalArgumentException("Invalid column")
+ }
}
- private val COL_ID = 0
- private val COL_WORKFLOW_ID = 1
- private val COL_SUBMIT_TIME = 2
- private val COL_WAIT_TIME = 3
- private val COL_RUNTIME = 4
- private val COL_REQ_NCPUS = 5
- private val COL_PARENTS = 6
- private val COL_CHILDREN = 7
- private val COL_GROUP_ID = 8
- private val COL_USER_ID = 9
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
- private val columns = mapOf(
- TASK_ID to COL_ID,
- TASK_WORKFLOW_ID to COL_WORKFLOW_ID,
- TASK_SUBMIT_TIME to COL_SUBMIT_TIME,
- TASK_WAIT_TIME to COL_WAIT_TIME,
- TASK_RUNTIME to COL_RUNTIME,
- TASK_REQ_NCPUS to COL_REQ_NCPUS,
- TASK_PARENTS to COL_PARENTS,
- TASK_CHILDREN to COL_CHILDREN,
- TASK_GROUP_ID to COL_GROUP_ID,
- TASK_USER_ID to COL_USER_ID,
- )
+ override fun getInstant(index: Int): Instant {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (index) {
+ COL_SUBMIT_TIME -> record.submitTime
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (index) {
+ COL_WAIT_TIME -> record.waitTime
+ COL_RUNTIME -> record.runtime
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (index) {
+ COL_PARENTS -> TYPE_PARENTS.convertTo(record.parents, elementType)
+ COL_CHILDREN -> TYPE_CHILDREN.convertTo(record.children, elementType)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {
+ reader.close()
+ }
}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
index e71253ac..c8408626 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
@@ -46,24 +46,23 @@ public class WtfTraceFormat : TraceFormat {
return when (table) {
TABLE_TASKS -> TableDetails(
listOf(
- TASK_ID,
- TASK_WORKFLOW_ID,
- TASK_SUBMIT_TIME,
- TASK_WAIT_TIME,
- TASK_RUNTIME,
- TASK_REQ_NCPUS,
- TASK_PARENTS,
- TASK_CHILDREN,
- TASK_GROUP_ID,
- TASK_USER_ID
- ),
- listOf(TASK_SUBMIT_TIME)
+ TableColumn(TASK_ID, TableColumnType.String),
+ TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
+ TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
+ TableColumn(TASK_WAIT_TIME, TableColumnType.Duration),
+ TableColumn(TASK_RUNTIME, TableColumnType.Duration),
+ TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_GROUP_ID, TableColumnType.Int),
+ TableColumn(TASK_USER_ID, TableColumnType.Int)
+ )
)
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
return when (table) {
TABLE_TASKS -> {
val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection))
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
index 8e7325de..a6087d9f 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
@@ -27,7 +27,6 @@ import org.apache.parquet.hadoop.api.InitContext
import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.io.api.RecordMaterializer
import org.apache.parquet.schema.*
-import org.opendc.trace.TableColumn
import org.opendc.trace.conv.*
/**
@@ -35,11 +34,11 @@ import org.opendc.trace.conv.*
*
* @param projection The projection of the table to read.
*/
-internal class TaskReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Task>() {
+internal class TaskReadSupport(private val projection: List<String>?) : ReadSupport<Task>() {
/**
* Mapping of table columns to their Parquet column names.
*/
- private val colMap = mapOf<TableColumn<*>, String>(
+ private val colMap = mapOf(
TASK_ID to "id",
TASK_WORKFLOW_ID to "workflow_id",
TASK_SUBMIT_TIME to "ts_submit",
diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
index c0eb3f08..2312035a 100644
--- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
@@ -65,32 +65,28 @@ class WtfTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("362334516345962206", reader.get(TASK_ID)) },
- { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) },
- { assertEquals(Instant.ofEpochMilli(245604), reader.get(TASK_SUBMIT_TIME)) },
- { assertEquals(Duration.ofMillis(8163), reader.get(TASK_RUNTIME)) },
+ { assertEquals("362334516345962206", reader.getString(TASK_ID)) },
+ { assertEquals("1078341553348591493", reader.getString(TASK_WORKFLOW_ID)) },
+ { assertEquals(Instant.ofEpochMilli(245604), reader.getInstant(TASK_SUBMIT_TIME)) },
+ { assertEquals(Duration.ofMillis(8163), reader.getDuration(TASK_RUNTIME)) },
{
assertEquals(
setOf("584055316413447529", "133113685133695608", "1008582348422865408"),
- reader.get(
- TASK_PARENTS
- )
+ reader.getSet(TASK_PARENTS, String::class.java)
)
},
)
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("502010169100446658", reader.get(TASK_ID)) },
- { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) },
- { assertEquals(Instant.ofEpochMilli(251325), reader.get(TASK_SUBMIT_TIME)) },
- { assertEquals(Duration.ofMillis(8216), reader.get(TASK_RUNTIME)) },
+ { assertEquals("502010169100446658", reader.getString(TASK_ID)) },
+ { assertEquals("1078341553348591493", reader.getString(TASK_WORKFLOW_ID)) },
+ { assertEquals(Instant.ofEpochMilli(251325), reader.getInstant(TASK_SUBMIT_TIME)) },
+ { assertEquals(Duration.ofMillis(8216), reader.getDuration(TASK_RUNTIME)) },
{
assertEquals(
setOf("584055316413447529", "133113685133695608", "1008582348422865408"),
- reader.get(
- TASK_PARENTS
- )
+ reader.getSet(TASK_PARENTS, String::class.java)
)
},
)