summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-gwf
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-06-06 16:21:21 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-06-07 15:46:53 +0200
commit2358257c1080b7ce78270535f82f0b960d48261a (patch)
treebced69c02698e85f995aa9935ddcfb54df23a64f /opendc-trace/opendc-trace-gwf
parent61b6550d7a476ab1aae45a5b9385dfd6ca4f6b6f (diff)
refactor(trace/api): Introduce type system for trace API
This change updates the trace API by introducing a limited type system for the table columns. Previously, the table columns could have any possible type representable by the JVM. With this change, we limit the available types to a small type system.
Diffstat (limited to 'opendc-trace/opendc-trace-gwf')
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt83
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt19
-rw-r--r--opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt20
3 files changed, 79 insertions, 43 deletions
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
index 42a9469e..007ab90a 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
@@ -27,8 +27,10 @@ import com.fasterxml.jackson.dataformat.csv.CsvParser
import com.fasterxml.jackson.dataformat.csv.CsvSchema
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.util.convertTo
import java.time.Duration
import java.time.Instant
+import java.util.*
import java.util.regex.Pattern
/**
@@ -68,46 +70,89 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
return true
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ override fun resolve(name: String): Int {
+ return when (name) {
+ TASK_ID -> COL_JOB_ID
+ TASK_WORKFLOW_ID -> COL_WORKFLOW_ID
+ TASK_SUBMIT_TIME -> COL_SUBMIT_TIME
+ TASK_RUNTIME -> COL_RUNTIME
+ TASK_ALLOC_NCPUS -> COL_NPROC
+ TASK_REQ_NCPUS -> COL_REQ_NPROC
+ TASK_PARENTS -> COL_DEPS
+ else -> -1
+ }
+ }
override fun isNull(index: Int): Boolean {
- check(index in 0..columns.size) { "Invalid column" }
+ check(index in 0..COL_DEPS) { "Invalid column" }
return false
}
- override fun get(index: Int): Any? {
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(index: Int): Int {
+ return when (index) {
+ COL_REQ_NPROC -> reqNProcs
+ COL_NPROC -> nProcs
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(index: Int): Double {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getString(index: Int): String? {
return when (index) {
COL_JOB_ID -> jobId
COL_WORKFLOW_ID -> workflowId
- COL_SUBMIT_TIME -> submitTime
- COL_RUNTIME -> runtime
- COL_REQ_NPROC -> getInt(index)
- COL_NPROC -> getInt(index)
- COL_DEPS -> dependencies
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getBoolean(index: Int): Boolean {
+ override fun getUUID(index: Int): UUID? {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(index: Int): Int {
+ override fun getInstant(index: Int): Instant? {
return when (index) {
- COL_REQ_NPROC -> reqNProcs
- COL_NPROC -> nProcs
+ COL_SUBMIT_TIME -> submitTime
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(index: Int): Long {
+ override fun getDuration(index: Int): Duration? {
+ return when (index) {
+ COL_RUNTIME -> runtime
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(index: Int): Double {
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
throw IllegalArgumentException("Invalid column")
}
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ return when (index) {
+ COL_DEPS -> TYPE_DEPS.convertTo(dependencies, elementType)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
override fun close() {
parser.close()
}
@@ -180,15 +225,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
private val COL_REQ_NPROC = 5
private val COL_DEPS = 6
- private val columns = mapOf(
- TASK_ID to COL_JOB_ID,
- TASK_WORKFLOW_ID to COL_WORKFLOW_ID,
- TASK_SUBMIT_TIME to COL_SUBMIT_TIME,
- TASK_RUNTIME to COL_RUNTIME,
- TASK_ALLOC_NCPUS to COL_NPROC,
- TASK_REQ_NCPUS to COL_REQ_NPROC,
- TASK_PARENTS to COL_DEPS
- )
+ private val TYPE_DEPS = TableColumnType.Set(TableColumnType.String)
companion object {
/**
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
index 8d9eab82..ca63b624 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
@@ -56,21 +56,20 @@ public class GwfTraceFormat : TraceFormat {
return when (table) {
TABLE_TASKS -> TableDetails(
listOf(
- TASK_WORKFLOW_ID,
- TASK_ID,
- TASK_SUBMIT_TIME,
- TASK_RUNTIME,
- TASK_REQ_NCPUS,
- TASK_ALLOC_NCPUS,
- TASK_PARENTS,
- ),
- listOf(TASK_WORKFLOW_ID)
+ TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
+ TableColumn(TASK_ID, TableColumnType.String),
+ TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
+ TableColumn(TASK_RUNTIME, TableColumnType.Duration),
+ TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
+ )
)
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
return when (table) {
TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile()))
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
index 411d45d0..dd0e6066 100644
--- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
@@ -62,11 +62,11 @@ internal class GwfTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) },
- { assertEquals("1", reader.get(TASK_ID)) },
- { assertEquals(Instant.ofEpochSecond(16), reader.get(TASK_SUBMIT_TIME)) },
- { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) },
- { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) },
+ { assertEquals("0", reader.getString(TASK_WORKFLOW_ID)) },
+ { assertEquals("1", reader.getString(TASK_ID)) },
+ { assertEquals(Instant.ofEpochSecond(16), reader.getInstant(TASK_SUBMIT_TIME)) },
+ { assertEquals(Duration.ofSeconds(11), reader.getDuration(TASK_RUNTIME)) },
+ { assertEquals(emptySet<String>(), reader.getSet(TASK_PARENTS, String::class.java)) },
)
}
@@ -81,11 +81,11 @@ internal class GwfTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) },
- { assertEquals("7", reader.get(TASK_ID)) },
- { assertEquals(Instant.ofEpochSecond(87), reader.get(TASK_SUBMIT_TIME)) },
- { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) },
- { assertEquals(setOf("4", "5", "6"), reader.get(TASK_PARENTS)) },
+ { assertEquals("0", reader.getString(TASK_WORKFLOW_ID)) },
+ { assertEquals("7", reader.getString(TASK_ID)) },
+ { assertEquals(Instant.ofEpochSecond(87), reader.getInstant(TASK_SUBMIT_TIME)) },
+ { assertEquals(Duration.ofSeconds(11), reader.getDuration(TASK_RUNTIME)) },
+ { assertEquals(setOf("4", "5", "6"), reader.getSet(TASK_PARENTS, String::class.java)) },
)
}
}