From fa08b63bd749e9fbe1a1d04ef2ebd7a86453fa4b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 11 Sep 2021 10:52:30 +0200 Subject: perf(trace): Keep reader state in own class This change removes the external class that holds the state of the reader and instead puts the state in the reader implementation. Maintaining a separate class for the state increases the complexity and has worse performance characteristics due to the bytecode produced by Kotlin for property accesses. --- .../org/opendc/trace/gwf/GwfTaskTableReader.kt | 85 ++++++++++------------ 1 file changed, 39 insertions(+), 46 deletions(-) (limited to 'opendc-trace/opendc-trace-gwf/src') diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index 64b7d465..fb9099bf 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -32,18 +32,13 @@ import java.util.regex.Pattern * A [TableReader] implementation for the GWF format. */ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { - /** - * The current parser state. - */ - private val state = RowState() - init { parser.schema = schema } override fun nextRow(): Boolean { // Reset the row state - state.reset() + reset() if (!nextStart()) { return false @@ -57,12 +52,12 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } when (parser.currentName) { - "WorkflowID" -> state.workflowId = parser.longValue - "JobID" -> state.jobId = parser.longValue - "SubmitTime" -> state.submitTime = parser.longValue - "RunTime" -> state.runtime = parser.longValue - "NProcs" -> state.nProcs = parser.intValue - "ReqNProcs" -> state.reqNProcs = parser.intValue + "WorkflowID" -> workflowId = parser.longValue + "JobID" -> jobId = parser.longValue + "SubmitTime" -> submitTime = parser.longValue + "RunTime" -> runtime = parser.longValue + "NProcs" -> nProcs = parser.intValue + "ReqNProcs" -> reqNProcs = parser.intValue "Dependencies" -> parseParents(parser.valueAsString) } } @@ -85,13 +80,13 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { override fun get(column: TableColumn): T { val res: Any = when (column) { - TASK_WORKFLOW_ID -> state.workflowId - TASK_ID -> state.jobId - TASK_SUBMIT_TIME -> state.submitTime - TASK_RUNTIME -> state.runtime - TASK_REQ_NCPUS -> state.nProcs - TASK_ALLOC_NCPUS -> state.reqNProcs - TASK_PARENTS -> state.dependencies + TASK_WORKFLOW_ID -> workflowId + TASK_ID -> jobId + TASK_SUBMIT_TIME -> submitTime + TASK_RUNTIME -> runtime + TASK_REQ_NCPUS -> nProcs + TASK_ALLOC_NCPUS -> reqNProcs + TASK_PARENTS -> dependencies else -> throw IllegalArgumentException("Invalid column") } @@ -105,18 +100,18 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { override fun getInt(column: TableColumn): Int { return when (column) { - TASK_REQ_NCPUS -> state.nProcs - TASK_ALLOC_NCPUS -> state.reqNProcs + TASK_REQ_NCPUS -> nProcs + TASK_ALLOC_NCPUS -> reqNProcs else -> throw IllegalArgumentException("Invalid column") } } override fun getLong(column: TableColumn): Long { return when (column) { - TASK_WORKFLOW_ID -> state.workflowId - TASK_ID -> state.jobId - TASK_SUBMIT_TIME -> state.submitTime - TASK_RUNTIME -> state.runtime + TASK_WORKFLOW_ID -> workflowId + TASK_ID -> jobId + TASK_SUBMIT_TIME -> submitTime + TASK_RUNTIME -> runtime else -> throw IllegalArgumentException("Invalid column") } } @@ -166,29 +161,27 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } /** - * The current row state. + * Reader state fields. */ - private class RowState { - var workflowId = -1L - var jobId = -1L - var submitTime = -1L - var runtime = -1L - var nProcs = -1 - var reqNProcs = -1 - var dependencies = emptySet() + private var workflowId = -1L + private var jobId = -1L + private var submitTime = -1L + private var runtime = -1L + private var nProcs = -1 + private var reqNProcs = -1 + private var dependencies = emptySet() - /** - * Reset the state. - */ - fun reset() { - workflowId = -1 - jobId = -1 - submitTime = -1 - runtime = -1 - nProcs = -1 - reqNProcs = -1 - dependencies = emptySet() - } + /** + * Reset the state. + */ + private fun reset() { + workflowId = -1 + jobId = -1 + submitTime = -1 + runtime = -1 + nProcs = -1 + reqNProcs = -1 + dependencies = emptySet() } companion object { -- cgit v1.2.3 From b7be3400bb4b21d0cd7021e2baf1f6ce43aba189 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 10 Sep 2021 22:10:22 +0200 Subject: feat(trace): Add support for WfCommons (WorkflowHub) traces This change adds support for reading WfCommons workflow traces in OpenDC. This functionality is available in the new `opendc-trace-wfformat` module. --- .../org/opendc/trace/gwf/GwfTaskTableReader.kt | 36 ++++++++++------------ .../org/opendc/trace/gwf/GwfTraceFormatTest.kt | 12 +++++--- 2 files changed, 23 insertions(+), 25 deletions(-) (limited to 'opendc-trace/opendc-trace-gwf/src') diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index fb9099bf..39eb5520 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -26,6 +26,8 @@ import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* +import java.time.Duration +import java.time.Instant import java.util.regex.Pattern /** @@ -52,10 +54,10 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } when (parser.currentName) { - "WorkflowID" -> workflowId = parser.longValue - "JobID" -> jobId = parser.longValue - "SubmitTime" -> submitTime = parser.longValue - "RunTime" -> runtime = parser.longValue + "WorkflowID" -> workflowId = parser.text + "JobID" -> jobId = parser.text + "SubmitTime" -> submitTime = Instant.ofEpochSecond(parser.longValue) + "RunTime" -> runtime = Duration.ofSeconds(parser.longValue) "NProcs" -> nProcs = parser.intValue "ReqNProcs" -> reqNProcs = parser.intValue "Dependencies" -> parseParents(parser.valueAsString) @@ -79,7 +81,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } override fun get(column: TableColumn): T { - val res: Any = when (column) { + val res: Any? = when (column) { TASK_WORKFLOW_ID -> workflowId TASK_ID -> jobId TASK_SUBMIT_TIME -> submitTime @@ -107,13 +109,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } override fun getLong(column: TableColumn): Long { - return when (column) { - TASK_WORKFLOW_ID -> workflowId - TASK_ID -> jobId - TASK_SUBMIT_TIME -> submitTime - TASK_RUNTIME -> runtime - else -> throw IllegalArgumentException("Invalid column") - } + throw IllegalArgumentException("Invalid column") } override fun getDouble(column: TableColumn): Double { @@ -163,10 +159,10 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { /** * Reader state fields. */ - private var workflowId = -1L - private var jobId = -1L - private var submitTime = -1L - private var runtime = -1L + private var workflowId: String? = null + private var jobId: String? = null + private var submitTime: Instant? = null + private var runtime: Duration? = null private var nProcs = -1 private var reqNProcs = -1 private var dependencies = emptySet() @@ -175,10 +171,10 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { * Reset the state. */ private fun reset() { - workflowId = -1 - jobId = -1 - submitTime = -1 - runtime = -1 + workflowId = null + jobId = null + submitTime = null + runtime = null nProcs = -1 reqNProcs = -1 dependencies = emptySet() diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index 6b0568fe..b209b979 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -29,6 +29,8 @@ import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.opendc.trace.* import java.net.URL +import java.time.Duration +import java.time.Instant /** * Test suite for the [GwfTraceFormat] class. @@ -90,11 +92,11 @@ internal class GwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(0L, reader.getLong(TASK_WORKFLOW_ID)) }, - { assertEquals(1L, reader.getLong(TASK_ID)) }, - { assertEquals(16, reader.getLong(TASK_SUBMIT_TIME)) }, - { assertEquals(11, reader.getLong(TASK_RUNTIME)) }, - { assertEquals(setOf(), reader.get(TASK_PARENTS)) }, + { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals("1", reader.get(TASK_ID)) }, + { assertEquals(Instant.ofEpochSecond(16), reader.get(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) }, + { assertEquals(emptySet(), reader.get(TASK_PARENTS)) }, ) } -- cgit v1.2.3 From 49dd8377c8bfde1e64e411c6a6f921c768b9b53b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 12 Sep 2021 11:22:07 +0200 Subject: refactor(trace): Add API for accessing available table columns This change adds a new API to the Table interface for accessing the table columns that the table supports. This does not necessarily mean that the column will have a value for every row, but that the table format has defined this particular column. --- .../kotlin/org/opendc/trace/gwf/GwfTaskTable.kt | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) (limited to 'opendc-trace/opendc-trace-gwf/src') diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt index 80a99d10..fd7bd068 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt @@ -34,18 +34,15 @@ internal class GwfTaskTable(private val factory: CsvFactory, private val url: UR override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - TASK_WORKFLOW_ID -> true - TASK_ID -> true - TASK_SUBMIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_ALLOC_NCPUS -> true - TASK_PARENTS -> true - else -> false - } - } + override val columns: List> = listOf( + TASK_WORKFLOW_ID, + TASK_ID, + TASK_SUBMIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS + ) override fun newReader(): TableReader { return GwfTaskTableReader(factory.createParser(url)) -- cgit v1.2.3