summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-gwf/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-12 12:08:55 +0200
committerGitHub <noreply@github.com>2021-09-12 12:08:55 +0200
commit2cd3bd18e548a72d64afe0e7f59487f4747d722f (patch)
treedc9e2fba5ca4d19a90934a8b68dbb8110ee34bb7 /opendc-trace/opendc-trace-gwf/src
parentcae193284570d6ee9dbacdde57b3e4e367aa9d9f (diff)
parent992b65396f55c0e12b36823d191dea8e03dd45ba (diff)
merge: Add support for new trace formats
This pull request updates the trace API with the addition of several new trace formats. - Add support for Materna traces from GWA - Keep reader state in own class - Parse last column in Solvinity trace format - Add support Azure VM traces - Add support for WfCommons (WorkflowHub) traces - Add API for accessing available table columns - Add synthetic resource table for Bitbrains format - Support dynamic resolving of trace formats **Breaking API Changes** - Replace `isSupported` by a list of `TableColumns`
Diffstat (limited to 'opendc-trace/opendc-trace-gwf/src')
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt21
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt89
-rw-r--r--opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt12
3 files changed, 55 insertions, 67 deletions
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt
index 80a99d10..fd7bd068 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt
@@ -34,18 +34,15 @@ internal class GwfTaskTable(private val factory: CsvFactory, private val url: UR
override val isSynthetic: Boolean = false
- override fun isSupported(column: TableColumn<*>): Boolean {
- return when (column) {
- TASK_WORKFLOW_ID -> true
- TASK_ID -> true
- TASK_SUBMIT_TIME -> true
- TASK_RUNTIME -> true
- TASK_REQ_NCPUS -> true
- TASK_ALLOC_NCPUS -> true
- TASK_PARENTS -> true
- else -> false
- }
- }
+ override val columns: List<TableColumn<*>> = listOf(
+ TASK_WORKFLOW_ID,
+ TASK_ID,
+ TASK_SUBMIT_TIME,
+ TASK_RUNTIME,
+ TASK_REQ_NCPUS,
+ TASK_ALLOC_NCPUS,
+ TASK_PARENTS
+ )
override fun newReader(): TableReader {
return GwfTaskTableReader(factory.createParser(url))
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
index 64b7d465..39eb5520 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
@@ -26,24 +26,21 @@ import com.fasterxml.jackson.core.JsonToken
import com.fasterxml.jackson.dataformat.csv.CsvParser
import com.fasterxml.jackson.dataformat.csv.CsvSchema
import org.opendc.trace.*
+import java.time.Duration
+import java.time.Instant
import java.util.regex.Pattern
/**
* A [TableReader] implementation for the GWF format.
*/
internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
- /**
- * The current parser state.
- */
- private val state = RowState()
-
init {
parser.schema = schema
}
override fun nextRow(): Boolean {
// Reset the row state
- state.reset()
+ reset()
if (!nextStart()) {
return false
@@ -57,12 +54,12 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
}
when (parser.currentName) {
- "WorkflowID" -> state.workflowId = parser.longValue
- "JobID" -> state.jobId = parser.longValue
- "SubmitTime" -> state.submitTime = parser.longValue
- "RunTime" -> state.runtime = parser.longValue
- "NProcs" -> state.nProcs = parser.intValue
- "ReqNProcs" -> state.reqNProcs = parser.intValue
+ "WorkflowID" -> workflowId = parser.text
+ "JobID" -> jobId = parser.text
+ "SubmitTime" -> submitTime = Instant.ofEpochSecond(parser.longValue)
+ "RunTime" -> runtime = Duration.ofSeconds(parser.longValue)
+ "NProcs" -> nProcs = parser.intValue
+ "ReqNProcs" -> reqNProcs = parser.intValue
"Dependencies" -> parseParents(parser.valueAsString)
}
}
@@ -84,14 +81,14 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
}
override fun <T> get(column: TableColumn<T>): T {
- val res: Any = when (column) {
- TASK_WORKFLOW_ID -> state.workflowId
- TASK_ID -> state.jobId
- TASK_SUBMIT_TIME -> state.submitTime
- TASK_RUNTIME -> state.runtime
- TASK_REQ_NCPUS -> state.nProcs
- TASK_ALLOC_NCPUS -> state.reqNProcs
- TASK_PARENTS -> state.dependencies
+ val res: Any? = when (column) {
+ TASK_WORKFLOW_ID -> workflowId
+ TASK_ID -> jobId
+ TASK_SUBMIT_TIME -> submitTime
+ TASK_RUNTIME -> runtime
+ TASK_REQ_NCPUS -> nProcs
+ TASK_ALLOC_NCPUS -> reqNProcs
+ TASK_PARENTS -> dependencies
else -> throw IllegalArgumentException("Invalid column")
}
@@ -105,20 +102,14 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
override fun getInt(column: TableColumn<Int>): Int {
return when (column) {
- TASK_REQ_NCPUS -> state.nProcs
- TASK_ALLOC_NCPUS -> state.reqNProcs
+ TASK_REQ_NCPUS -> nProcs
+ TASK_ALLOC_NCPUS -> reqNProcs
else -> throw IllegalArgumentException("Invalid column")
}
}
override fun getLong(column: TableColumn<Long>): Long {
- return when (column) {
- TASK_WORKFLOW_ID -> state.workflowId
- TASK_ID -> state.jobId
- TASK_SUBMIT_TIME -> state.submitTime
- TASK_RUNTIME -> state.runtime
- else -> throw IllegalArgumentException("Invalid column")
- }
+ throw IllegalArgumentException("Invalid column")
}
override fun getDouble(column: TableColumn<Double>): Double {
@@ -166,29 +157,27 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
}
/**
- * The current row state.
+ * Reader state fields.
*/
- private class RowState {
- var workflowId = -1L
- var jobId = -1L
- var submitTime = -1L
- var runtime = -1L
- var nProcs = -1
- var reqNProcs = -1
- var dependencies = emptySet<Long>()
+ private var workflowId: String? = null
+ private var jobId: String? = null
+ private var submitTime: Instant? = null
+ private var runtime: Duration? = null
+ private var nProcs = -1
+ private var reqNProcs = -1
+ private var dependencies = emptySet<Long>()
- /**
- * Reset the state.
- */
- fun reset() {
- workflowId = -1
- jobId = -1
- submitTime = -1
- runtime = -1
- nProcs = -1
- reqNProcs = -1
- dependencies = emptySet()
- }
+ /**
+ * Reset the state.
+ */
+ private fun reset() {
+ workflowId = null
+ jobId = null
+ submitTime = null
+ runtime = null
+ nProcs = -1
+ reqNProcs = -1
+ dependencies = emptySet()
}
companion object {
diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
index 6b0568fe..b209b979 100644
--- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
@@ -29,6 +29,8 @@ import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
import org.opendc.trace.*
import java.net.URL
+import java.time.Duration
+import java.time.Instant
/**
* Test suite for the [GwfTraceFormat] class.
@@ -90,11 +92,11 @@ internal class GwfTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals(0L, reader.getLong(TASK_WORKFLOW_ID)) },
- { assertEquals(1L, reader.getLong(TASK_ID)) },
- { assertEquals(16, reader.getLong(TASK_SUBMIT_TIME)) },
- { assertEquals(11, reader.getLong(TASK_RUNTIME)) },
- { assertEquals(setOf<Long>(), reader.get(TASK_PARENTS)) },
+ { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) },
+ { assertEquals("1", reader.get(TASK_ID)) },
+ { assertEquals(Instant.ofEpochSecond(16), reader.get(TASK_SUBMIT_TIME)) },
+ { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) },
+ { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) },
)
}