diff options
Diffstat (limited to 'opendc-trace/opendc-trace-gwf/src/main')
| -rw-r--r-- | opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt | 21 | ||||
| -rw-r--r-- | opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt | 89 |
2 files changed, 48 insertions, 62 deletions
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt index 80a99d10..fd7bd068 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt @@ -34,18 +34,15 @@ internal class GwfTaskTable(private val factory: CsvFactory, private val url: UR override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - TASK_WORKFLOW_ID -> true - TASK_ID -> true - TASK_SUBMIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_ALLOC_NCPUS -> true - TASK_PARENTS -> true - else -> false - } - } + override val columns: List<TableColumn<*>> = listOf( + TASK_WORKFLOW_ID, + TASK_ID, + TASK_SUBMIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS + ) override fun newReader(): TableReader { return GwfTaskTableReader(factory.createParser(url)) diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index 64b7d465..39eb5520 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -26,24 +26,21 @@ import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* +import java.time.Duration +import java.time.Instant import java.util.regex.Pattern /** * A [TableReader] implementation for the GWF format. */ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { - /** - * The current parser state. - */ - private val state = RowState() - init { parser.schema = schema } override fun nextRow(): Boolean { // Reset the row state - state.reset() + reset() if (!nextStart()) { return false @@ -57,12 +54,12 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } when (parser.currentName) { - "WorkflowID" -> state.workflowId = parser.longValue - "JobID" -> state.jobId = parser.longValue - "SubmitTime" -> state.submitTime = parser.longValue - "RunTime" -> state.runtime = parser.longValue - "NProcs" -> state.nProcs = parser.intValue - "ReqNProcs" -> state.reqNProcs = parser.intValue + "WorkflowID" -> workflowId = parser.text + "JobID" -> jobId = parser.text + "SubmitTime" -> submitTime = Instant.ofEpochSecond(parser.longValue) + "RunTime" -> runtime = Duration.ofSeconds(parser.longValue) + "NProcs" -> nProcs = parser.intValue + "ReqNProcs" -> reqNProcs = parser.intValue "Dependencies" -> parseParents(parser.valueAsString) } } @@ -84,14 +81,14 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } override fun <T> get(column: TableColumn<T>): T { - val res: Any = when (column) { - TASK_WORKFLOW_ID -> state.workflowId - TASK_ID -> state.jobId - TASK_SUBMIT_TIME -> state.submitTime - TASK_RUNTIME -> state.runtime - TASK_REQ_NCPUS -> state.nProcs - TASK_ALLOC_NCPUS -> state.reqNProcs - TASK_PARENTS -> state.dependencies + val res: Any? = when (column) { + TASK_WORKFLOW_ID -> workflowId + TASK_ID -> jobId + TASK_SUBMIT_TIME -> submitTime + TASK_RUNTIME -> runtime + TASK_REQ_NCPUS -> nProcs + TASK_ALLOC_NCPUS -> reqNProcs + TASK_PARENTS -> dependencies else -> throw IllegalArgumentException("Invalid column") } @@ -105,20 +102,14 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { override fun getInt(column: TableColumn<Int>): Int { return when (column) { - TASK_REQ_NCPUS -> state.nProcs - TASK_ALLOC_NCPUS -> state.reqNProcs + TASK_REQ_NCPUS -> nProcs + TASK_ALLOC_NCPUS -> reqNProcs else -> throw IllegalArgumentException("Invalid column") } } override fun getLong(column: TableColumn<Long>): Long { - return when (column) { - TASK_WORKFLOW_ID -> state.workflowId - TASK_ID -> state.jobId - TASK_SUBMIT_TIME -> state.submitTime - TASK_RUNTIME -> state.runtime - else -> throw IllegalArgumentException("Invalid column") - } + throw IllegalArgumentException("Invalid column") } override fun getDouble(column: TableColumn<Double>): Double { @@ -166,29 +157,27 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } /** - * The current row state. + * Reader state fields. */ - private class RowState { - var workflowId = -1L - var jobId = -1L - var submitTime = -1L - var runtime = -1L - var nProcs = -1 - var reqNProcs = -1 - var dependencies = emptySet<Long>() + private var workflowId: String? = null + private var jobId: String? = null + private var submitTime: Instant? = null + private var runtime: Duration? = null + private var nProcs = -1 + private var reqNProcs = -1 + private var dependencies = emptySet<Long>() - /** - * Reset the state. - */ - fun reset() { - workflowId = -1 - jobId = -1 - submitTime = -1 - runtime = -1 - nProcs = -1 - reqNProcs = -1 - dependencies = emptySet() - } + /** + * Reset the state. + */ + private fun reset() { + workflowId = null + jobId = null + submitTime = null + runtime = null + nProcs = -1 + reqNProcs = -1 + dependencies = emptySet() } companion object { |
