From 2358257c1080b7ce78270535f82f0b960d48261a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 6 Jun 2022 16:21:21 +0200 Subject: refactor(trace/api): Introduce type system for trace API This change updates the trace API by introducing a limited type system for the table columns. Previously, the table columns could have any possible type representable by the JVM. With this change, we limit the available types to a small type system. --- .../trace/wfformat/WfFormatTaskTableReader.kt | 78 ++++++++++++++++------ .../opendc/trace/wfformat/WfFormatTraceFormat.kt | 17 +++-- .../trace/wfformat/WfFormatTaskTableReaderTest.kt | 6 +- .../trace/wfformat/WfFormatTraceFormatTest.kt | 16 ++--- 4 files changed, 77 insertions(+), 40 deletions(-) (limited to 'opendc-trace/opendc-trace-wfformat/src') diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt index d8eafa9c..0be9dec6 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt @@ -27,7 +27,10 @@ import com.fasterxml.jackson.core.JsonParser import com.fasterxml.jackson.core.JsonToken import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.util.convertTo import java.time.Duration +import java.time.Instant +import java.util.* import kotlin.math.roundToInt /** @@ -95,41 +98,82 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe return hasJob } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> COL_ID + TASK_WORKFLOW_ID -> COL_WORKFLOW_ID + TASK_RUNTIME -> COL_RUNTIME + TASK_REQ_NCPUS -> COL_NPROC + TASK_PARENTS -> COL_PARENTS + TASK_CHILDREN -> COL_CHILDREN + else -> -1 + } + } override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column value" } + check(index in 0..COL_CHILDREN) { "Invalid column value" } return false } - override fun get(index: Int): Any? { + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + return when (index) { + COL_NPROC -> cores + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun getString(index: Int): String? { return when (index) { COL_ID -> id COL_WORKFLOW_ID -> workflowId - COL_RUNTIME -> runtime - COL_PARENTS -> parents - COL_CHILDREN -> children - COL_NPROC -> getInt(index) else -> throw IllegalArgumentException("Invalid column") } } - override fun getBoolean(index: Int): Boolean { + override fun getUUID(index: Int): UUID? { throw IllegalArgumentException("Invalid column") } - override fun getInt(index: Int): Int { + override fun getInstant(index: Int): Instant? { + throw IllegalArgumentException("Invalid column") + } + + override fun getDuration(index: Int): Duration? { return when (index) { - COL_NPROC -> cores + COL_RUNTIME -> runtime else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(index: Int): Long { + override fun getList(index: Int, elementType: Class): List? { throw IllegalArgumentException("Invalid column") } - override fun getDouble(index: Int): Double { + override fun getSet(index: Int, elementType: Class): Set? { + return when (index) { + COL_PARENTS -> TYPE_PARENTS.convertTo(parents, elementType) + COL_CHILDREN -> TYPE_CHILDREN.convertTo(children, elementType) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { throw IllegalArgumentException("Invalid column") } @@ -232,12 +276,6 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe private val COL_PARENTS = 5 private val COL_CHILDREN = 6 - private val columns = mapOf( - TASK_ID to COL_ID, - TASK_WORKFLOW_ID to COL_WORKFLOW_ID, - TASK_RUNTIME to COL_RUNTIME, - TASK_REQ_NCPUS to COL_NPROC, - TASK_PARENTS to COL_PARENTS, - TASK_CHILDREN to COL_CHILDREN, - ) + private val TYPE_PARENTS = TableColumnType.Set(TableColumnType.String) + private val TYPE_CHILDREN = TableColumnType.Set(TableColumnType.String) } diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt index 8db4c169..154fa061 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt @@ -50,20 +50,19 @@ public class WfFormatTraceFormat : TraceFormat { return when (table) { TABLE_TASKS -> TableDetails( listOf( - TASK_ID, - TASK_WORKFLOW_ID, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_PARENTS, - TASK_CHILDREN - ), - emptyList() + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)) + ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List>?): TableReader { + override fun newReader(path: Path, table: String, projection: List?): TableReader { return when (table) { TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile())) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt index e27bc82c..9d9735b1 100644 --- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt @@ -210,7 +210,7 @@ internal class WfFormatTaskTableReaderTest { val reader = WfFormatTaskTableReader(parser) assertTrue(reader.nextRow()) - assertEquals("test", reader.get(TASK_ID)) + assertEquals("test", reader.getString(TASK_ID)) assertFalse(reader.nextRow()) reader.close() @@ -281,7 +281,7 @@ internal class WfFormatTaskTableReaderTest { val reader = WfFormatTaskTableReader(parser) assertTrue(reader.nextRow()) - assertEquals(setOf("1"), reader.get(TASK_PARENTS)) + assertEquals(setOf("1"), reader.getSet(TASK_PARENTS, String::class.java)) assertFalse(reader.nextRow()) reader.close() @@ -337,7 +337,7 @@ internal class WfFormatTaskTableReaderTest { assertTrue(reader.nextRow()) assertTrue(reader.nextRow()) - assertEquals("test2", reader.get(TASK_ID)) + assertEquals("test2", reader.getString(TASK_ID)) assertFalse(reader.nextRow()) reader.close() diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt index 4a8b2792..a460c5f6 100644 --- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt @@ -66,18 +66,18 @@ class WfFormatTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("makebwaindex_mammoth_mt_krause.fasta", reader.get(TASK_ID)) }, - { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals(172000, reader.get(TASK_RUNTIME).toMillis()) }, - { assertEquals(emptySet(), reader.get(TASK_PARENTS)) }, + { assertEquals("makebwaindex_mammoth_mt_krause.fasta", reader.getString(TASK_ID)) }, + { assertEquals("eager-nextflow-chameleon", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals(172000, reader.getDuration(TASK_RUNTIME)?.toMillis()) }, + { assertEquals(emptySet(), reader.getSet(TASK_PARENTS, String::class.java)) }, ) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("makeseqdict_mammoth_mt_krause.fasta", reader.get(TASK_ID)) }, - { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals(175000, reader.get(TASK_RUNTIME).toMillis()) }, - { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.get(TASK_PARENTS)) }, + { assertEquals("makeseqdict_mammoth_mt_krause.fasta", reader.getString(TASK_ID)) }, + { assertEquals("eager-nextflow-chameleon", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals(175000, reader.getDuration(TASK_RUNTIME)?.toMillis()) }, + { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.getSet(TASK_PARENTS, String::class.java)) }, ) reader.close() -- cgit v1.2.3 From 9d759c9bc987965fae8b0c16c000772c546bf3a2 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 8 Jun 2022 15:06:14 +0200 Subject: test(trace): Add conformance suite for OpenDC trace API This change adds a re-usable test suite for the interface of the OpenDC trace API, so implementors can verify whether they match the specification of the interfaces. --- .../trace/wfformat/WfFormatTaskTableReader.kt | 17 +++++++++++++-- .../trace/wfformat/WfFormatTraceFormatTest.kt | 24 +++++++++++++++++++--- 2 files changed, 36 insertions(+), 5 deletions(-) (limited to 'opendc-trace/opendc-trace-wfformat/src') diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt index 0be9dec6..ca1a29d0 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt @@ -54,6 +54,7 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe // Check whether the document is not empty and starts with an object if (token == null) { + parser.close() break } else if (token != JsonToken.START_OBJECT) { throw JsonParseException(parser, "Expected object", parser.currentLocation) @@ -64,6 +65,7 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe ParserLevel.TRACE -> { // Seek for the workflow object in the file if (!seekWorkflow()) { + parser.close() break } else if (!parser.isExpectedStartObjectToken) { throw JsonParseException(parser, "Expected object", parser.currentLocation) @@ -111,7 +113,7 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe } override fun isNull(index: Int): Boolean { - check(index in 0..COL_CHILDREN) { "Invalid column value" } + require(index in 0..COL_CHILDREN) { "Invalid column value" } return false } @@ -120,6 +122,7 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe } override fun getInt(index: Int): Int { + checkActive() return when (index) { COL_NPROC -> cores else -> throw IllegalArgumentException("Invalid column") @@ -139,6 +142,7 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe } override fun getString(index: Int): String? { + checkActive() return when (index) { COL_ID -> id COL_WORKFLOW_ID -> workflowId @@ -155,6 +159,7 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe } override fun getDuration(index: Int): Duration? { + checkActive() return when (index) { COL_RUNTIME -> runtime else -> throw IllegalArgumentException("Invalid column") @@ -166,6 +171,7 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe } override fun getSet(index: Int, elementType: Class): Set? { + checkActive() return when (index) { COL_PARENTS -> TYPE_PARENTS.convertTo(parents, elementType) COL_CHILDREN -> TYPE_CHILDREN.convertTo(children, elementType) @@ -181,11 +187,18 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe parser.close() } + /** + * Helper method to check if the reader is active. + */ + private fun checkActive() { + check(level != ParserLevel.TOP && !parser.isClosed) { "No active row. Did you call nextRow()?" } + } + /** * Parse the trace and seek until the workflow description. */ private fun seekWorkflow(): Boolean { - while (parser.nextValue() != JsonToken.END_OBJECT) { + while (parser.nextValue() != JsonToken.END_OBJECT && !parser.isClosed) { when (parser.currentName) { "name" -> workflowId = parser.text "workflow" -> return true diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt index a460c5f6..40506d59 100644 --- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt @@ -22,17 +22,20 @@ package org.opendc.trace.wfformat -import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.* -import org.junit.jupiter.api.Test +import org.junit.jupiter.api.Assertions.assertAll import org.junit.jupiter.api.assertDoesNotThrow -import org.junit.jupiter.api.assertThrows +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader import org.opendc.trace.conv.* +import org.opendc.trace.testkit.TableReaderTestKit import java.nio.file.Paths /** * Test suite for the [WfFormatTraceFormat] class. */ +@DisplayName("WfFormat TraceFormat") class WfFormatTraceFormatTest { private val format = WfFormatTraceFormat() @@ -98,4 +101,19 @@ class WfFormatTraceFormatTest { reader.close() } } + + @DisplayName("TableReader for Tasks") + @Nested + inner class TasksTableReaderTest : TableReaderTestKit() { + override lateinit var reader: TableReader + override lateinit var columns: List + + @BeforeEach + fun setUp() { + val path = Paths.get("src/test/resources/trace.json") + + columns = format.getDetails(path, TABLE_TASKS).columns + reader = format.newReader(path, TABLE_TASKS, null) + } + } } -- cgit v1.2.3