diff options
Diffstat (limited to 'opendc-trace/opendc-trace-gwf/src')
5 files changed, 435 insertions, 0 deletions
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt new file mode 100644 index 00000000..aa4c543b --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.gwf + +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.dataformat.csv.CsvParser +import com.fasterxml.jackson.dataformat.csv.CsvSchema +import org.opendc.trace.* +import java.time.Duration +import java.time.Instant +import java.util.regex.Pattern + +/** + * A [TableReader] implementation for the GWF format. + */ +internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { + init { + parser.schema = schema + } + + override fun nextRow(): Boolean { + // Reset the row state + reset() + + if (!nextStart()) { + return false + } + + while (true) { + val token = parser.nextValue() + + if (token == null || token == JsonToken.END_OBJECT) { + break + } + + when (parser.currentName) { + "WorkflowID" -> workflowId = parser.text + "JobID" -> jobId = parser.text + "SubmitTime" -> submitTime = Instant.ofEpochSecond(parser.longValue) + "RunTime" -> runtime = Duration.ofSeconds(parser.longValue) + "NProcs" -> nProcs = parser.intValue + "ReqNProcs" -> reqNProcs = parser.intValue + "Dependencies" -> parseParents(parser.valueAsString) + } + } + + return true + } + + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column" } + return false + } + + override fun get(index: Int): Any? { + return when (index) { + COL_JOB_ID -> jobId + COL_WORKFLOW_ID -> workflowId + COL_SUBMIT_TIME -> submitTime + COL_RUNTIME -> runtime + COL_REQ_NPROC -> getInt(index) + COL_NPROC -> getInt(index) + COL_DEPS -> dependencies + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + return when (index) { + COL_REQ_NPROC -> reqNProcs + COL_NPROC -> nProcs + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + parser.close() + } + + /** + * The pattern used to parse the parents. + */ + private val pattern = Pattern.compile("\\s+") + + /** + * Parse the parents into a set of longs. + */ + private fun parseParents(value: String): Set<Long> { + val result = mutableSetOf<Long>() + val deps = value.split(pattern) + + for (dep in deps) { + if (dep.isBlank()) { + continue + } + + result.add(dep.toLong(10)) + } + + return result + } + + /** + * Advance the parser until the next object start. + */ + private fun nextStart(): Boolean { + var token = parser.nextValue() + + while (token != null && token != JsonToken.START_OBJECT) { + token = parser.nextValue() + } + + return token != null + } + + /** + * Reader state fields. + */ + private var workflowId: String? = null + private var jobId: String? = null + private var submitTime: Instant? = null + private var runtime: Duration? = null + private var nProcs = -1 + private var reqNProcs = -1 + private var dependencies = emptySet<Long>() + + /** + * Reset the state. + */ + private fun reset() { + workflowId = null + jobId = null + submitTime = null + runtime = null + nProcs = -1 + reqNProcs = -1 + dependencies = emptySet() + } + + private val COL_WORKFLOW_ID = 0 + private val COL_JOB_ID = 1 + private val COL_SUBMIT_TIME = 2 + private val COL_RUNTIME = 3 + private val COL_NPROC = 4 + private val COL_REQ_NPROC = 5 + private val COL_DEPS = 6 + + private val columns = mapOf( + TASK_ID to COL_JOB_ID, + TASK_WORKFLOW_ID to COL_WORKFLOW_ID, + TASK_SUBMIT_TIME to COL_SUBMIT_TIME, + TASK_RUNTIME to COL_RUNTIME, + TASK_ALLOC_NCPUS to COL_NPROC, + TASK_REQ_NCPUS to COL_REQ_NPROC, + TASK_PARENTS to COL_DEPS + ) + + companion object { + /** + * The [CsvSchema] that is used to parse the trace. + */ + private val schema = CsvSchema.builder() + .addColumn("WorkflowID", CsvSchema.ColumnType.NUMBER) + .addColumn("JobID", CsvSchema.ColumnType.NUMBER) + .addColumn("SubmitTime", CsvSchema.ColumnType.NUMBER) + .addColumn("RunTime", CsvSchema.ColumnType.NUMBER) + .addColumn("NProcs", CsvSchema.ColumnType.NUMBER) + .addColumn("ReqNProcs", CsvSchema.ColumnType.NUMBER) + .addColumn("Dependencies", CsvSchema.ColumnType.STRING) + .setAllowComments(true) + .setUseHeader(true) + .setColumnSeparator(',') + .build() + } +} diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt new file mode 100644 index 00000000..d4287420 --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.gwf + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails +import org.opendc.trace.spi.TraceFormat +import java.nio.file.Path + +/** + * A [TraceFormat] implementation for the GWF trace format. + */ +public class GwfTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "gwf" + + /** + * The [CsvFactory] used to create the parser. + */ + private val factory = CsvFactory() + .enable(CsvParser.Feature.ALLOW_COMMENTS) + .enable(CsvParser.Feature.TRIM_SPACES) + + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + + override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_TASKS -> TableDetails( + listOf( + TASK_WORKFLOW_ID, + TASK_ID, + TASK_SUBMIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS, + ), + listOf(TASK_WORKFLOW_ID) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile())) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } +} diff --git a/opendc-trace/opendc-trace-gwf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-gwf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat new file mode 100644 index 00000000..99a874c8 --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat @@ -0,0 +1 @@ +org.opendc.trace.gwf.GwfTraceFormat diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt new file mode 100644 index 00000000..7fe403b2 --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.gwf + +import org.junit.jupiter.api.* +import org.junit.jupiter.api.Assertions.* +import org.opendc.trace.* +import java.nio.file.Paths +import java.time.Duration +import java.time.Instant + +/** + * Test suite for the [GwfTraceFormat] class. + */ +internal class GwfTraceFormatTest { + private val format = GwfTraceFormat() + + @Test + fun testTables() { + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) + + assertEquals(listOf(TABLE_TASKS), format.getTables(path)) + } + + @Test + fun testTableExists() { + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) + assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } + } + + @Test + fun testTableDoesNotExist() { + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) + + assertThrows<IllegalArgumentException> { format.getDetails(path, "test") } + } + + @Test + fun testTableReader() { + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) + val reader = format.newReader(path, TABLE_TASKS) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals("1", reader.get(TASK_ID)) }, + { assertEquals(Instant.ofEpochSecond(16), reader.get(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) }, + { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) }, + ) + } +} diff --git a/opendc-trace/opendc-trace-gwf/src/test/resources/trace.gwf b/opendc-trace/opendc-trace-gwf/src/test/resources/trace.gwf new file mode 100644 index 00000000..2f99616d --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/src/test/resources/trace.gwf @@ -0,0 +1,71 @@ +WorkflowID, JobID , SubmitTime, RunTime , NProcs , ReqNProcs , Dependencies +0 , 1 , 16 , 11 , 1 , 1 , +0 , 2 , 40 , 11 , 1 , 1 , 1 +0 , 3 , 40 , 11 , 1 , 1 , 1 +0 , 4 , 64 , 11 , 1 , 1 , 2 +0 , 5 , 63 , 11 , 1 , 1 , 3 +0 , 6 , 64 , 11 , 1 , 1 , 3 +0 , 7 , 87 , 11 , 1 , 1 , 4 5 6 +1 , 8 , 4 , 11 , 1 , 1 , +1 , 9 , 15 , 11 , 1 , 1 , 8 +1 , 10 , 15 , 11 , 1 , 1 , 8 +1 , 11 , 27 , 11 , 1 , 1 , 9 +1 , 12 , 27 , 11 , 1 , 1 , 10 +1 , 13 , 27 , 11 , 1 , 1 , 10 +1 , 14 , 38 , 11 , 1 , 1 , 12 11 13 +2 , 15 , 3 , 11 , 1 , 1 , +2 , 16 , 27 , 11 , 1 , 1 , 15 +2 , 17 , 27 , 11 , 1 , 1 , 15 +2 , 18 , 52 , 11 , 1 , 1 , 16 +2 , 19 , 51 , 11 , 1 , 1 , 17 +2 , 20 , 51 , 11 , 1 , 1 , 17 +2 , 21 , 75 , 11 , 1 , 1 , 20 18 19 +3 , 22 , 3 , 11 , 1 , 1 , +3 , 23 , 27 , 11 , 1 , 1 , 22 +3 , 24 , 27 , 11 , 1 , 1 , 22 +3 , 25 , 51 , 11 , 1 , 1 , 23 +3 , 26 , 50 , 11 , 1 , 1 , 24 +3 , 27 , 51 , 11 , 1 , 1 , 24 +3 , 28 , 75 , 11 , 1 , 1 , 25 27 26 +4 , 29 , 3 , 11 , 1 , 1 , +4 , 30 , 27 , 11 , 1 , 1 , 29 +4 , 31 , 27 , 11 , 1 , 1 , 29 +4 , 32 , 50 , 11 , 1 , 1 , 30 +4 , 33 , 50 , 11 , 1 , 1 , 31 +4 , 34 , 51 , 11 , 1 , 1 , 31 +4 , 35 , 74 , 11 , 1 , 1 , 33 32 34 +5 , 36 , 3 , 11 , 1 , 1 , +5 , 37 , 27 , 11 , 1 , 1 , 36 +5 , 38 , 26 , 11 , 1 , 1 , 36 +5 , 39 , 51 , 11 , 1 , 1 , 37 +5 , 40 , 50 , 11 , 1 , 1 , 38 +5 , 41 , 50 , 11 , 1 , 1 , 38 +5 , 42 , 74 , 11 , 1 , 1 , 39 40 41 +6 , 43 , 4 , 11 , 1 , 1 , +6 , 44 , 27 , 11 , 1 , 1 , 43 +6 , 45 , 27 , 11 , 1 , 1 , 43 +6 , 46 , 51 , 11 , 1 , 1 , 44 +6 , 47 , 51 , 11 , 1 , 1 , 45 +6 , 48 , 51 , 11 , 1 , 1 , 45 +6 , 49 , 75 , 11 , 1 , 1 , 46 47 48 +7 , 50 , 3 , 0 , 1 , 1 , +7 , 51 , 17 , 0 , 1 , 1 , 50 +7 , 52 , 17 , 0 , 1 , 1 , 50 +7 , 53 , 30 , 0 , 1 , 1 , 51 +7 , 54 , 30 , 0 , 1 , 1 , 52 +7 , 55 , 31 , 0 , 1 , 1 , 52 +7 , 56 , 44 , 0 , 1 , 1 , 55 54 53 +8 , 57 , 3 , 11 , 1 , 1 , +8 , 58 , 26 , 11 , 1 , 1 , 57 +8 , 59 , 27 , 11 , 1 , 1 , 57 +8 , 60 , 50 , 11 , 1 , 1 , 58 +8 , 61 , 51 , 11 , 1 , 1 , 59 +8 , 62 , 50 , 11 , 1 , 1 , 59 +8 , 63 , 74 , 11 , 1 , 1 , 62 61 60 +9 , 64 , 3 , 11 , 1 , 1 , +9 , 65 , 27 , 11 , 1 , 1 , 64 +9 , 66 , 27 , 11 , 1 , 1 , 64 +9 , 67 , 51 , 11 , 1 , 1 , 65 +9 , 68 , 50 , 11 , 1 , 1 , 66 +9 , 69 , 51 , 11 , 1 , 1 , 66 +9 , 70 , 74 , 11 , 1 , 1 , 68 69 67 |
