From 2358257c1080b7ce78270535f82f0b960d48261a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 6 Jun 2022 16:21:21 +0200 Subject: refactor(trace/api): Introduce type system for trace API This change updates the trace API by introducing a limited type system for the table columns. Previously, the table columns could have any possible type representable by the JVM. With this change, we limit the available types to a small type system. --- .../org/opendc/trace/gwf/GwfTaskTableReader.kt | 83 ++++++++++++++++------ .../kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt | 19 +++-- .../org/opendc/trace/gwf/GwfTraceFormatTest.kt | 20 +++--- 3 files changed, 79 insertions(+), 43 deletions(-) (limited to 'opendc-trace/opendc-trace-gwf/src') diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index 42a9469e..007ab90a 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -27,8 +27,10 @@ import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.util.convertTo import java.time.Duration import java.time.Instant +import java.util.* import java.util.regex.Pattern /** @@ -68,46 +70,89 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { return true } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> COL_JOB_ID + TASK_WORKFLOW_ID -> COL_WORKFLOW_ID + TASK_SUBMIT_TIME -> COL_SUBMIT_TIME + TASK_RUNTIME -> COL_RUNTIME + TASK_ALLOC_NCPUS -> COL_NPROC + TASK_REQ_NCPUS -> COL_REQ_NPROC + TASK_PARENTS -> COL_DEPS + else -> -1 + } + } override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column" } + check(index in 0..COL_DEPS) { "Invalid column" } return false } - override fun get(index: Int): Any? { + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + return when (index) { + COL_REQ_NPROC -> reqNProcs + COL_NPROC -> nProcs + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun getString(index: Int): String? { return when (index) { COL_JOB_ID -> jobId COL_WORKFLOW_ID -> workflowId - COL_SUBMIT_TIME -> submitTime - COL_RUNTIME -> runtime - COL_REQ_NPROC -> getInt(index) - COL_NPROC -> getInt(index) - COL_DEPS -> dependencies else -> throw IllegalArgumentException("Invalid column") } } - override fun getBoolean(index: Int): Boolean { + override fun getUUID(index: Int): UUID? { throw IllegalArgumentException("Invalid column") } - override fun getInt(index: Int): Int { + override fun getInstant(index: Int): Instant? { return when (index) { - COL_REQ_NPROC -> reqNProcs - COL_NPROC -> nProcs + COL_SUBMIT_TIME -> submitTime else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(index: Int): Long { + override fun getDuration(index: Int): Duration? { + return when (index) { + COL_RUNTIME -> runtime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getList(index: Int, elementType: Class): List? { throw IllegalArgumentException("Invalid column") } - override fun getDouble(index: Int): Double { + override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { throw IllegalArgumentException("Invalid column") } + override fun getSet(index: Int, elementType: Class): Set? { + return when (index) { + COL_DEPS -> TYPE_DEPS.convertTo(dependencies, elementType) + else -> throw IllegalArgumentException("Invalid column") + } + } + override fun close() { parser.close() } @@ -180,15 +225,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { private val COL_REQ_NPROC = 5 private val COL_DEPS = 6 - private val columns = mapOf( - TASK_ID to COL_JOB_ID, - TASK_WORKFLOW_ID to COL_WORKFLOW_ID, - TASK_SUBMIT_TIME to COL_SUBMIT_TIME, - TASK_RUNTIME to COL_RUNTIME, - TASK_ALLOC_NCPUS to COL_NPROC, - TASK_REQ_NCPUS to COL_REQ_NPROC, - TASK_PARENTS to COL_DEPS - ) + private val TYPE_DEPS = TableColumnType.Set(TableColumnType.String) companion object { /** diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt index 8d9eab82..ca63b624 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt @@ -56,21 +56,20 @@ public class GwfTraceFormat : TraceFormat { return when (table) { TABLE_TASKS -> TableDetails( listOf( - TASK_WORKFLOW_ID, - TASK_ID, - TASK_SUBMIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_ALLOC_NCPUS, - TASK_PARENTS, - ), - listOf(TASK_WORKFLOW_ID) + TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List>?): TableReader { + override fun newReader(path: Path, table: String, projection: List?): TableReader { return when (table) { TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile())) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index 411d45d0..dd0e6066 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -62,11 +62,11 @@ internal class GwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals("1", reader.get(TASK_ID)) }, - { assertEquals(Instant.ofEpochSecond(16), reader.get(TASK_SUBMIT_TIME)) }, - { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) }, - { assertEquals(emptySet(), reader.get(TASK_PARENTS)) }, + { assertEquals("0", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals("1", reader.getString(TASK_ID)) }, + { assertEquals(Instant.ofEpochSecond(16), reader.getInstant(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofSeconds(11), reader.getDuration(TASK_RUNTIME)) }, + { assertEquals(emptySet(), reader.getSet(TASK_PARENTS, String::class.java)) }, ) } @@ -81,11 +81,11 @@ internal class GwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals("7", reader.get(TASK_ID)) }, - { assertEquals(Instant.ofEpochSecond(87), reader.get(TASK_SUBMIT_TIME)) }, - { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) }, - { assertEquals(setOf("4", "5", "6"), reader.get(TASK_PARENTS)) }, + { assertEquals("0", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals("7", reader.getString(TASK_ID)) }, + { assertEquals(Instant.ofEpochSecond(87), reader.getInstant(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofSeconds(11), reader.getDuration(TASK_RUNTIME)) }, + { assertEquals(setOf("4", "5", "6"), reader.getSet(TASK_PARENTS, String::class.java)) }, ) } } -- cgit v1.2.3 From 9d759c9bc987965fae8b0c16c000772c546bf3a2 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 8 Jun 2022 15:06:14 +0200 Subject: test(trace): Add conformance suite for OpenDC trace API This change adds a re-usable test suite for the interface of the OpenDC trace API, so implementors can verify whether they match the specification of the interfaces. --- .../org/opendc/trace/gwf/GwfTaskTableReader.kt | 25 ++++++++++++++++++++-- .../org/opendc/trace/gwf/GwfTraceFormatTest.kt | 19 ++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) (limited to 'opendc-trace/opendc-trace-gwf/src') diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index 007ab90a..f9a171e9 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -37,15 +37,24 @@ import java.util.regex.Pattern * A [TableReader] implementation for the GWF format. */ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { + /** + * A flag to indicate whether a single row has been read already. + */ + private var isStarted = false + init { parser.schema = schema } override fun nextRow(): Boolean { + if (!isStarted) { + isStarted = true + } + // Reset the row state reset() - if (!nextStart()) { + if (parser.isClosed || !nextStart()) { return false } @@ -84,7 +93,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } override fun isNull(index: Int): Boolean { - check(index in 0..COL_DEPS) { "Invalid column" } + require(index in 0..COL_DEPS) { "Invalid column" } return false } @@ -93,6 +102,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } override fun getInt(index: Int): Int { + checkActive() return when (index) { COL_REQ_NPROC -> reqNProcs COL_NPROC -> nProcs @@ -113,6 +123,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } override fun getString(index: Int): String? { + checkActive() return when (index) { COL_JOB_ID -> jobId COL_WORKFLOW_ID -> workflowId @@ -125,6 +136,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } override fun getInstant(index: Int): Instant? { + checkActive() return when (index) { COL_SUBMIT_TIME -> submitTime else -> throw IllegalArgumentException("Invalid column") @@ -132,6 +144,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } override fun getDuration(index: Int): Duration? { + checkActive() return when (index) { COL_RUNTIME -> runtime else -> throw IllegalArgumentException("Invalid column") @@ -147,6 +160,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } override fun getSet(index: Int, elementType: Class): Set? { + checkActive() return when (index) { COL_DEPS -> TYPE_DEPS.convertTo(dependencies, elementType) else -> throw IllegalArgumentException("Invalid column") @@ -157,6 +171,13 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { parser.close() } + /** + * Helper method to check if the reader is active. + */ + private fun checkActive() { + check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } + } + /** * The pattern used to parse the parents. */ diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index dd0e6066..a8c3a715 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -24,7 +24,10 @@ package org.opendc.trace.gwf import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.* +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader import org.opendc.trace.conv.* +import org.opendc.trace.testkit.TableReaderTestKit import java.nio.file.Paths import java.time.Duration import java.time.Instant @@ -32,6 +35,7 @@ import java.time.Instant /** * Test suite for the [GwfTraceFormat] class. */ +@DisplayName("GWF TraceFormat") internal class GwfTraceFormatTest { private val format = GwfTraceFormat() @@ -88,4 +92,19 @@ internal class GwfTraceFormatTest { { assertEquals(setOf("4", "5", "6"), reader.getSet(TASK_PARENTS, String::class.java)) }, ) } + + @DisplayName("TableReader for Tasks") + @Nested + inner class TasksTableReaderTest : TableReaderTestKit() { + override lateinit var reader: TableReader + override lateinit var columns: List + + @BeforeEach + fun setUp() { + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) + + columns = format.getDetails(path, TABLE_TASKS).columns + reader = format.newReader(path, TABLE_TASKS, null) + } + } } -- cgit v1.2.3