summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-wfformat
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-06-09 10:31:41 +0200
committerGitHub <noreply@github.com>2022-06-09 10:31:41 +0200
commitd146814bbbb86bfcb19ccb94250424703e9179e5 (patch)
treebf20f51b434d56e60ad013568ac1a32b912a3b5e /opendc-trace/opendc-trace-wfformat
parent61b6550d7a476ab1aae45a5b9385dfd6ca4f6b6f (diff)
parent9d759c9bc987965fae8b0c16c000772c546bf3a2 (diff)
merge: Introduce schema for trace API (#88)
This pull request updates the OpenDC trace API to support proper specification of a schema of the tables exposed by the traces. This functionality makes it easier for the API consumer to understand the types exposed by the API. ## Implementation Notes :hammer_and_pick: * Introduce type system for trace API * Add benchmarks for odcvm trace format * Add benchmarks for Azure trace format * Add conformance suite for OpenDC trace API ## External Dependencies :four_leaf_clover: * N/A ## Breaking API Changes :warning: * Removal of typed `TableColumn`. Instead, `TableColumn` instances are now used to describe the columns belonging to some table. * `TableReader` and `TableWriter` do not support accessing arbitrary objects anymore. Instead, only the types supported by the type system are exposed.
Diffstat (limited to 'opendc-trace/opendc-trace-wfformat')
-rw-r--r--opendc-trace/opendc-trace-wfformat/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt93
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt17
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt6
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt40
5 files changed, 114 insertions, 44 deletions
diff --git a/opendc-trace/opendc-trace-wfformat/build.gradle.kts b/opendc-trace/opendc-trace-wfformat/build.gradle.kts
index 875f7915..a0e22b16 100644
--- a/opendc-trace/opendc-trace-wfformat/build.gradle.kts
+++ b/opendc-trace/opendc-trace-wfformat/build.gradle.kts
@@ -31,4 +31,6 @@ dependencies {
api(projects.opendcTrace.opendcTraceApi)
implementation(libs.jackson.core)
+
+ testImplementation(projects.opendcTrace.opendcTraceTestkit)
}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
index d8eafa9c..ca1a29d0 100644
--- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
@@ -27,7 +27,10 @@ import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.core.JsonToken
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.util.convertTo
import java.time.Duration
+import java.time.Instant
+import java.util.*
import kotlin.math.roundToInt
/**
@@ -51,6 +54,7 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe
// Check whether the document is not empty and starts with an object
if (token == null) {
+ parser.close()
break
} else if (token != JsonToken.START_OBJECT) {
throw JsonParseException(parser, "Expected object", parser.currentLocation)
@@ -61,6 +65,7 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe
ParserLevel.TRACE -> {
// Seek for the workflow object in the file
if (!seekWorkflow()) {
+ parser.close()
break
} else if (!parser.isExpectedStartObjectToken) {
throw JsonParseException(parser, "Expected object", parser.currentLocation)
@@ -95,41 +100,86 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe
return hasJob
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ override fun resolve(name: String): Int {
+ return when (name) {
+ TASK_ID -> COL_ID
+ TASK_WORKFLOW_ID -> COL_WORKFLOW_ID
+ TASK_RUNTIME -> COL_RUNTIME
+ TASK_REQ_NCPUS -> COL_NPROC
+ TASK_PARENTS -> COL_PARENTS
+ TASK_CHILDREN -> COL_CHILDREN
+ else -> -1
+ }
+ }
override fun isNull(index: Int): Boolean {
- check(index in 0..columns.size) { "Invalid column value" }
+ require(index in 0..COL_CHILDREN) { "Invalid column value" }
return false
}
- override fun get(index: Int): Any? {
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(index: Int): Int {
+ checkActive()
+ return when (index) {
+ COL_NPROC -> cores
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(index: Int): Double {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getString(index: Int): String? {
+ checkActive()
return when (index) {
COL_ID -> id
COL_WORKFLOW_ID -> workflowId
- COL_RUNTIME -> runtime
- COL_PARENTS -> parents
- COL_CHILDREN -> children
- COL_NPROC -> getInt(index)
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getBoolean(index: Int): Boolean {
+ override fun getUUID(index: Int): UUID? {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(index: Int): Int {
+ override fun getInstant(index: Int): Instant? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ checkActive()
return when (index) {
- COL_NPROC -> cores
+ COL_RUNTIME -> runtime
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(index: Int): Long {
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(index: Int): Double {
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ checkActive()
+ return when (index) {
+ COL_PARENTS -> TYPE_PARENTS.convertTo(parents, elementType)
+ COL_CHILDREN -> TYPE_CHILDREN.convertTo(children, elementType)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
throw IllegalArgumentException("Invalid column")
}
@@ -138,10 +188,17 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe
}
/**
+ * Helper method to check if the reader is active.
+ */
+ private fun checkActive() {
+ check(level != ParserLevel.TOP && !parser.isClosed) { "No active row. Did you call nextRow()?" }
+ }
+
+ /**
* Parse the trace and seek until the workflow description.
*/
private fun seekWorkflow(): Boolean {
- while (parser.nextValue() != JsonToken.END_OBJECT) {
+ while (parser.nextValue() != JsonToken.END_OBJECT && !parser.isClosed) {
when (parser.currentName) {
"name" -> workflowId = parser.text
"workflow" -> return true
@@ -232,12 +289,6 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe
private val COL_PARENTS = 5
private val COL_CHILDREN = 6
- private val columns = mapOf(
- TASK_ID to COL_ID,
- TASK_WORKFLOW_ID to COL_WORKFLOW_ID,
- TASK_RUNTIME to COL_RUNTIME,
- TASK_REQ_NCPUS to COL_NPROC,
- TASK_PARENTS to COL_PARENTS,
- TASK_CHILDREN to COL_CHILDREN,
- )
+ private val TYPE_PARENTS = TableColumnType.Set(TableColumnType.String)
+ private val TYPE_CHILDREN = TableColumnType.Set(TableColumnType.String)
}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
index 8db4c169..154fa061 100644
--- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
@@ -50,20 +50,19 @@ public class WfFormatTraceFormat : TraceFormat {
return when (table) {
TABLE_TASKS -> TableDetails(
listOf(
- TASK_ID,
- TASK_WORKFLOW_ID,
- TASK_RUNTIME,
- TASK_REQ_NCPUS,
- TASK_PARENTS,
- TASK_CHILDREN
- ),
- emptyList()
+ TableColumn(TASK_ID, TableColumnType.String),
+ TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
+ TableColumn(TASK_RUNTIME, TableColumnType.Duration),
+ TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String))
+ )
)
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
return when (table) {
TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile()))
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt
index e27bc82c..9d9735b1 100644
--- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt
@@ -210,7 +210,7 @@ internal class WfFormatTaskTableReaderTest {
val reader = WfFormatTaskTableReader(parser)
assertTrue(reader.nextRow())
- assertEquals("test", reader.get(TASK_ID))
+ assertEquals("test", reader.getString(TASK_ID))
assertFalse(reader.nextRow())
reader.close()
@@ -281,7 +281,7 @@ internal class WfFormatTaskTableReaderTest {
val reader = WfFormatTaskTableReader(parser)
assertTrue(reader.nextRow())
- assertEquals(setOf("1"), reader.get(TASK_PARENTS))
+ assertEquals(setOf("1"), reader.getSet(TASK_PARENTS, String::class.java))
assertFalse(reader.nextRow())
reader.close()
@@ -337,7 +337,7 @@ internal class WfFormatTaskTableReaderTest {
assertTrue(reader.nextRow())
assertTrue(reader.nextRow())
- assertEquals("test2", reader.get(TASK_ID))
+ assertEquals("test2", reader.getString(TASK_ID))
assertFalse(reader.nextRow())
reader.close()
diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
index 4a8b2792..40506d59 100644
--- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
@@ -22,17 +22,20 @@
package org.opendc.trace.wfformat
-import org.junit.jupiter.api.Assertions
+import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.*
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.Assertions.assertAll
import org.junit.jupiter.api.assertDoesNotThrow
-import org.junit.jupiter.api.assertThrows
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableReader
import org.opendc.trace.conv.*
+import org.opendc.trace.testkit.TableReaderTestKit
import java.nio.file.Paths
/**
* Test suite for the [WfFormatTraceFormat] class.
*/
+@DisplayName("WfFormat TraceFormat")
class WfFormatTraceFormatTest {
private val format = WfFormatTraceFormat()
@@ -66,18 +69,18 @@ class WfFormatTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("makebwaindex_mammoth_mt_krause.fasta", reader.get(TASK_ID)) },
- { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) },
- { assertEquals(172000, reader.get(TASK_RUNTIME).toMillis()) },
- { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) },
+ { assertEquals("makebwaindex_mammoth_mt_krause.fasta", reader.getString(TASK_ID)) },
+ { assertEquals("eager-nextflow-chameleon", reader.getString(TASK_WORKFLOW_ID)) },
+ { assertEquals(172000, reader.getDuration(TASK_RUNTIME)?.toMillis()) },
+ { assertEquals(emptySet<String>(), reader.getSet(TASK_PARENTS, String::class.java)) },
)
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("makeseqdict_mammoth_mt_krause.fasta", reader.get(TASK_ID)) },
- { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) },
- { assertEquals(175000, reader.get(TASK_RUNTIME).toMillis()) },
- { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.get(TASK_PARENTS)) },
+ { assertEquals("makeseqdict_mammoth_mt_krause.fasta", reader.getString(TASK_ID)) },
+ { assertEquals("eager-nextflow-chameleon", reader.getString(TASK_WORKFLOW_ID)) },
+ { assertEquals(175000, reader.getDuration(TASK_RUNTIME)?.toMillis()) },
+ { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.getSet(TASK_PARENTS, String::class.java)) },
)
reader.close()
@@ -98,4 +101,19 @@ class WfFormatTraceFormatTest {
reader.close()
}
}
+
+ @DisplayName("TableReader for Tasks")
+ @Nested
+ inner class TasksTableReaderTest : TableReaderTestKit() {
+ override lateinit var reader: TableReader
+ override lateinit var columns: List<TableColumn>
+
+ @BeforeEach
+ fun setUp() {
+ val path = Paths.get("src/test/resources/trace.json")
+
+ columns = format.getDetails(path, TABLE_TASKS).columns
+ reader = format.newReader(path, TABLE_TASKS, null)
+ }
+ }
}