From 23c1502c2668305fd5f4c38c6c794c985d2037e3 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 31 Aug 2021 14:56:08 +0200 Subject: refactor(trace): Move GWF trace reader into separate module This change starts the process of moving the different trace formats into separate modules. This change in particular moves the GWF trace format into a new module, opendc-trace-gwf. Furthermore, this change also implements the trace API for the GWF module. --- .../kotlin/org/opendc/trace/gwf/GwfTaskTable.kt | 59 ++++++ .../org/opendc/trace/gwf/GwfTaskTableReader.kt | 211 +++++++++++++++++++++ .../main/kotlin/org/opendc/trace/gwf/GwfTrace.kt | 46 +++++ .../kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt | 56 ++++++ .../services/org.opendc.trace.spi.TraceFormat | 1 + .../org/opendc/trace/gwf/GwfTraceFormatTest.kt | 109 +++++++++++ .../opendc-trace-gwf/src/test/resources/trace.gwf | 71 +++++++ 7 files changed, 553 insertions(+) create mode 100644 opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt create mode 100644 opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt create mode 100644 opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt create mode 100644 opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt create mode 100644 opendc-trace/opendc-trace-gwf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat create mode 100644 opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt create mode 100644 opendc-trace/opendc-trace-gwf/src/test/resources/trace.gwf (limited to 'opendc-trace/opendc-trace-gwf/src') diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt new file mode 100644 index 00000000..80a99d10 --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.gwf + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.net.URL + +/** + * A [Table] containing the tasks in a GWF trace. + */ +internal class GwfTaskTable(private val factory: CsvFactory, private val url: URL) : Table { + override val name: String = TABLE_TASKS + + override val isSynthetic: Boolean = false + + override fun isSupported(column: TableColumn<*>): Boolean { + return when (column) { + TASK_WORKFLOW_ID -> true + TASK_ID -> true + TASK_SUBMIT_TIME -> true + TASK_RUNTIME -> true + TASK_REQ_NCPUS -> true + TASK_ALLOC_NCPUS -> true + TASK_PARENTS -> true + else -> false + } + } + + override fun newReader(): TableReader { + return GwfTaskTableReader(factory.createParser(url)) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("Invalid partition $partition") + } + + override fun toString(): String = "GwfTaskTable" +} diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt new file mode 100644 index 00000000..64b7d465 --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -0,0 +1,211 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.gwf + +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.dataformat.csv.CsvParser +import com.fasterxml.jackson.dataformat.csv.CsvSchema +import org.opendc.trace.* +import java.util.regex.Pattern + +/** + * A [TableReader] implementation for the GWF format. + */ +internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { + /** + * The current parser state. + */ + private val state = RowState() + + init { + parser.schema = schema + } + + override fun nextRow(): Boolean { + // Reset the row state + state.reset() + + if (!nextStart()) { + return false + } + + while (true) { + val token = parser.nextValue() + + if (token == null || token == JsonToken.END_OBJECT) { + break + } + + when (parser.currentName) { + "WorkflowID" -> state.workflowId = parser.longValue + "JobID" -> state.jobId = parser.longValue + "SubmitTime" -> state.submitTime = parser.longValue + "RunTime" -> state.runtime = parser.longValue + "NProcs" -> state.nProcs = parser.intValue + "ReqNProcs" -> state.reqNProcs = parser.intValue + "Dependencies" -> parseParents(parser.valueAsString) + } + } + + return true + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + TASK_WORKFLOW_ID -> true + TASK_ID -> true + TASK_SUBMIT_TIME -> true + TASK_RUNTIME -> true + TASK_REQ_NCPUS -> true + TASK_ALLOC_NCPUS -> true + TASK_PARENTS -> true + else -> false + } + } + + override fun get(column: TableColumn): T { + val res: Any = when (column) { + TASK_WORKFLOW_ID -> state.workflowId + TASK_ID -> state.jobId + TASK_SUBMIT_TIME -> state.submitTime + TASK_RUNTIME -> state.runtime + TASK_REQ_NCPUS -> state.nProcs + TASK_ALLOC_NCPUS -> state.reqNProcs + TASK_PARENTS -> state.dependencies + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn): Int { + return when (column) { + TASK_REQ_NCPUS -> state.nProcs + TASK_ALLOC_NCPUS -> state.reqNProcs + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn): Long { + return when (column) { + TASK_WORKFLOW_ID -> state.workflowId + TASK_ID -> state.jobId + TASK_SUBMIT_TIME -> state.submitTime + TASK_RUNTIME -> state.runtime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDouble(column: TableColumn): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + parser.close() + } + + /** + * The pattern used to parse the parents. + */ + private val pattern = Pattern.compile("\\s+") + + /** + * Parse the parents into a set of longs. + */ + private fun parseParents(value: String): Set { + val result = mutableSetOf() + val deps = value.split(pattern) + + for (dep in deps) { + if (dep.isBlank()) { + continue + } + + result.add(dep.toLong(10)) + } + + return result + } + + /** + * Advance the parser until the next object start. + */ + private fun nextStart(): Boolean { + var token = parser.nextValue() + + while (token != null && token != JsonToken.START_OBJECT) { + token = parser.nextValue() + } + + return token != null + } + + /** + * The current row state. + */ + private class RowState { + var workflowId = -1L + var jobId = -1L + var submitTime = -1L + var runtime = -1L + var nProcs = -1 + var reqNProcs = -1 + var dependencies = emptySet() + + /** + * Reset the state. + */ + fun reset() { + workflowId = -1 + jobId = -1 + submitTime = -1 + runtime = -1 + nProcs = -1 + reqNProcs = -1 + dependencies = emptySet() + } + } + + companion object { + /** + * The [CsvSchema] that is used to parse the trace. + */ + private val schema = CsvSchema.builder() + .addColumn("WorkflowID", CsvSchema.ColumnType.NUMBER) + .addColumn("JobID", CsvSchema.ColumnType.NUMBER) + .addColumn("SubmitTime", CsvSchema.ColumnType.NUMBER) + .addColumn("RunTime", CsvSchema.ColumnType.NUMBER) + .addColumn("NProcs", CsvSchema.ColumnType.NUMBER) + .addColumn("ReqNProcs", CsvSchema.ColumnType.NUMBER) + .addColumn("Dependencies", CsvSchema.ColumnType.STRING) + .setAllowComments(true) + .setUseHeader(true) + .setColumnSeparator(',') + .build() + } +} diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt new file mode 100644 index 00000000..166c1e56 --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.gwf + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.net.URL + +/** + * [Trace] implementation for the GWF format. + */ +public class GwfTrace internal constructor(private val factory: CsvFactory, private val url: URL) : Trace { + override val tables: List = listOf(TABLE_TASKS) + + override fun containsTable(name: String): Boolean = TABLE_TASKS == name + + override fun getTable(name: String): Table? { + if (!containsTable(name)) { + return null + } + + return GwfTaskTable(factory, url) + } + + override fun toString(): String = "GwfTrace[$url]" +} diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt new file mode 100644 index 00000000..6d542503 --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.gwf + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * A [TraceFormat] implementation for the GWF trace format. + */ +public class GwfTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "gwf" + + /** + * The [CsvFactory] used to create the parser. + */ + private val factory = CsvFactory() + .enable(CsvParser.Feature.ALLOW_COMMENTS) + .enable(CsvParser.Feature.TRIM_SPACES) + + /** + * Read the tasks in the GWF trace. + */ + public override fun open(url: URL): GwfTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return GwfTrace(factory, url) + } +} diff --git a/opendc-trace/opendc-trace-gwf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-gwf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat new file mode 100644 index 00000000..99a874c8 --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat @@ -0,0 +1 @@ +org.opendc.trace.gwf.GwfTraceFormat diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt new file mode 100644 index 00000000..6b0568fe --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.gwf + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows +import org.opendc.trace.* +import java.net.URL + +/** + * Test suite for the [GwfTraceFormat] class. + */ +internal class GwfTraceFormatTest { + @Test + fun testTraceExists() { + val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) + val format = GwfTraceFormat() + assertDoesNotThrow { + format.open(input) + } + } + + @Test + fun testTraceDoesNotExists() { + val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) + val format = GwfTraceFormat() + assertThrows { + format.open(URL(input.toString() + "help")) + } + } + + @Test + fun testTables() { + val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) + val format = GwfTraceFormat() + val trace = format.open(input) + + assertEquals(listOf(TABLE_TASKS), trace.tables) + } + + @Test + fun testTableExists() { + val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) + val format = GwfTraceFormat() + val table = format.open(input).getTable(TABLE_TASKS) + + assertNotNull(table) + assertDoesNotThrow { table!!.newReader() } + } + + @Test + fun testTableDoesNotExist() { + val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) + val format = GwfTraceFormat() + val trace = format.open(input) + + assertFalse(trace.containsTable("test")) + assertNull(trace.getTable("test")) + } + + @Test + fun testTableReader() { + val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) + val format = GwfTraceFormat() + val table = format.open(input).getTable(TABLE_TASKS)!! + val reader = table.newReader() + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals(0L, reader.getLong(TASK_WORKFLOW_ID)) }, + { assertEquals(1L, reader.getLong(TASK_ID)) }, + { assertEquals(16, reader.getLong(TASK_SUBMIT_TIME)) }, + { assertEquals(11, reader.getLong(TASK_RUNTIME)) }, + { assertEquals(setOf(), reader.get(TASK_PARENTS)) }, + ) + } + + @Test + fun testTableReaderPartition() { + val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) + val format = GwfTraceFormat() + val table = format.open(input).getTable(TABLE_TASKS)!! + + assertThrows { table.newReader("test") } + } +} diff --git a/opendc-trace/opendc-trace-gwf/src/test/resources/trace.gwf b/opendc-trace/opendc-trace-gwf/src/test/resources/trace.gwf new file mode 100644 index 00000000..2f99616d --- /dev/null +++ b/opendc-trace/opendc-trace-gwf/src/test/resources/trace.gwf @@ -0,0 +1,71 @@ +WorkflowID, JobID , SubmitTime, RunTime , NProcs , ReqNProcs , Dependencies +0 , 1 , 16 , 11 , 1 , 1 , +0 , 2 , 40 , 11 , 1 , 1 , 1 +0 , 3 , 40 , 11 , 1 , 1 , 1 +0 , 4 , 64 , 11 , 1 , 1 , 2 +0 , 5 , 63 , 11 , 1 , 1 , 3 +0 , 6 , 64 , 11 , 1 , 1 , 3 +0 , 7 , 87 , 11 , 1 , 1 , 4 5 6 +1 , 8 , 4 , 11 , 1 , 1 , +1 , 9 , 15 , 11 , 1 , 1 , 8 +1 , 10 , 15 , 11 , 1 , 1 , 8 +1 , 11 , 27 , 11 , 1 , 1 , 9 +1 , 12 , 27 , 11 , 1 , 1 , 10 +1 , 13 , 27 , 11 , 1 , 1 , 10 +1 , 14 , 38 , 11 , 1 , 1 , 12 11 13 +2 , 15 , 3 , 11 , 1 , 1 , +2 , 16 , 27 , 11 , 1 , 1 , 15 +2 , 17 , 27 , 11 , 1 , 1 , 15 +2 , 18 , 52 , 11 , 1 , 1 , 16 +2 , 19 , 51 , 11 , 1 , 1 , 17 +2 , 20 , 51 , 11 , 1 , 1 , 17 +2 , 21 , 75 , 11 , 1 , 1 , 20 18 19 +3 , 22 , 3 , 11 , 1 , 1 , +3 , 23 , 27 , 11 , 1 , 1 , 22 +3 , 24 , 27 , 11 , 1 , 1 , 22 +3 , 25 , 51 , 11 , 1 , 1 , 23 +3 , 26 , 50 , 11 , 1 , 1 , 24 +3 , 27 , 51 , 11 , 1 , 1 , 24 +3 , 28 , 75 , 11 , 1 , 1 , 25 27 26 +4 , 29 , 3 , 11 , 1 , 1 , +4 , 30 , 27 , 11 , 1 , 1 , 29 +4 , 31 , 27 , 11 , 1 , 1 , 29 +4 , 32 , 50 , 11 , 1 , 1 , 30 +4 , 33 , 50 , 11 , 1 , 1 , 31 +4 , 34 , 51 , 11 , 1 , 1 , 31 +4 , 35 , 74 , 11 , 1 , 1 , 33 32 34 +5 , 36 , 3 , 11 , 1 , 1 , +5 , 37 , 27 , 11 , 1 , 1 , 36 +5 , 38 , 26 , 11 , 1 , 1 , 36 +5 , 39 , 51 , 11 , 1 , 1 , 37 +5 , 40 , 50 , 11 , 1 , 1 , 38 +5 , 41 , 50 , 11 , 1 , 1 , 38 +5 , 42 , 74 , 11 , 1 , 1 , 39 40 41 +6 , 43 , 4 , 11 , 1 , 1 , +6 , 44 , 27 , 11 , 1 , 1 , 43 +6 , 45 , 27 , 11 , 1 , 1 , 43 +6 , 46 , 51 , 11 , 1 , 1 , 44 +6 , 47 , 51 , 11 , 1 , 1 , 45 +6 , 48 , 51 , 11 , 1 , 1 , 45 +6 , 49 , 75 , 11 , 1 , 1 , 46 47 48 +7 , 50 , 3 , 0 , 1 , 1 , +7 , 51 , 17 , 0 , 1 , 1 , 50 +7 , 52 , 17 , 0 , 1 , 1 , 50 +7 , 53 , 30 , 0 , 1 , 1 , 51 +7 , 54 , 30 , 0 , 1 , 1 , 52 +7 , 55 , 31 , 0 , 1 , 1 , 52 +7 , 56 , 44 , 0 , 1 , 1 , 55 54 53 +8 , 57 , 3 , 11 , 1 , 1 , +8 , 58 , 26 , 11 , 1 , 1 , 57 +8 , 59 , 27 , 11 , 1 , 1 , 57 +8 , 60 , 50 , 11 , 1 , 1 , 58 +8 , 61 , 51 , 11 , 1 , 1 , 59 +8 , 62 , 50 , 11 , 1 , 1 , 59 +8 , 63 , 74 , 11 , 1 , 1 , 62 61 60 +9 , 64 , 3 , 11 , 1 , 1 , +9 , 65 , 27 , 11 , 1 , 1 , 64 +9 , 66 , 27 , 11 , 1 , 1 , 64 +9 , 67 , 51 , 11 , 1 , 1 , 65 +9 , 68 , 50 , 11 , 1 , 1 , 66 +9 , 69 , 51 , 11 , 1 , 1 , 66 +9 , 70 , 74 , 11 , 1 , 1 , 68 69 67 -- cgit v1.2.3 From fa08b63bd749e9fbe1a1d04ef2ebd7a86453fa4b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 11 Sep 2021 10:52:30 +0200 Subject: perf(trace): Keep reader state in own class This change removes the external class that holds the state of the reader and instead puts the state in the reader implementation. Maintaining a separate class for the state increases the complexity and has worse performance characteristics due to the bytecode produced by Kotlin for property accesses. --- .../org/opendc/trace/gwf/GwfTaskTableReader.kt | 85 ++++++++++------------ 1 file changed, 39 insertions(+), 46 deletions(-) (limited to 'opendc-trace/opendc-trace-gwf/src') diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index 64b7d465..fb9099bf 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -32,18 +32,13 @@ import java.util.regex.Pattern * A [TableReader] implementation for the GWF format. */ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { - /** - * The current parser state. - */ - private val state = RowState() - init { parser.schema = schema } override fun nextRow(): Boolean { // Reset the row state - state.reset() + reset() if (!nextStart()) { return false @@ -57,12 +52,12 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } when (parser.currentName) { - "WorkflowID" -> state.workflowId = parser.longValue - "JobID" -> state.jobId = parser.longValue - "SubmitTime" -> state.submitTime = parser.longValue - "RunTime" -> state.runtime = parser.longValue - "NProcs" -> state.nProcs = parser.intValue - "ReqNProcs" -> state.reqNProcs = parser.intValue + "WorkflowID" -> workflowId = parser.longValue + "JobID" -> jobId = parser.longValue + "SubmitTime" -> submitTime = parser.longValue + "RunTime" -> runtime = parser.longValue + "NProcs" -> nProcs = parser.intValue + "ReqNProcs" -> reqNProcs = parser.intValue "Dependencies" -> parseParents(parser.valueAsString) } } @@ -85,13 +80,13 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { override fun get(column: TableColumn): T { val res: Any = when (column) { - TASK_WORKFLOW_ID -> state.workflowId - TASK_ID -> state.jobId - TASK_SUBMIT_TIME -> state.submitTime - TASK_RUNTIME -> state.runtime - TASK_REQ_NCPUS -> state.nProcs - TASK_ALLOC_NCPUS -> state.reqNProcs - TASK_PARENTS -> state.dependencies + TASK_WORKFLOW_ID -> workflowId + TASK_ID -> jobId + TASK_SUBMIT_TIME -> submitTime + TASK_RUNTIME -> runtime + TASK_REQ_NCPUS -> nProcs + TASK_ALLOC_NCPUS -> reqNProcs + TASK_PARENTS -> dependencies else -> throw IllegalArgumentException("Invalid column") } @@ -105,18 +100,18 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { override fun getInt(column: TableColumn): Int { return when (column) { - TASK_REQ_NCPUS -> state.nProcs - TASK_ALLOC_NCPUS -> state.reqNProcs + TASK_REQ_NCPUS -> nProcs + TASK_ALLOC_NCPUS -> reqNProcs else -> throw IllegalArgumentException("Invalid column") } } override fun getLong(column: TableColumn): Long { return when (column) { - TASK_WORKFLOW_ID -> state.workflowId - TASK_ID -> state.jobId - TASK_SUBMIT_TIME -> state.submitTime - TASK_RUNTIME -> state.runtime + TASK_WORKFLOW_ID -> workflowId + TASK_ID -> jobId + TASK_SUBMIT_TIME -> submitTime + TASK_RUNTIME -> runtime else -> throw IllegalArgumentException("Invalid column") } } @@ -166,29 +161,27 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } /** - * The current row state. + * Reader state fields. */ - private class RowState { - var workflowId = -1L - var jobId = -1L - var submitTime = -1L - var runtime = -1L - var nProcs = -1 - var reqNProcs = -1 - var dependencies = emptySet() + private var workflowId = -1L + private var jobId = -1L + private var submitTime = -1L + private var runtime = -1L + private var nProcs = -1 + private var reqNProcs = -1 + private var dependencies = emptySet() - /** - * Reset the state. - */ - fun reset() { - workflowId = -1 - jobId = -1 - submitTime = -1 - runtime = -1 - nProcs = -1 - reqNProcs = -1 - dependencies = emptySet() - } + /** + * Reset the state. + */ + private fun reset() { + workflowId = -1 + jobId = -1 + submitTime = -1 + runtime = -1 + nProcs = -1 + reqNProcs = -1 + dependencies = emptySet() } companion object { -- cgit v1.2.3 From b7be3400bb4b21d0cd7021e2baf1f6ce43aba189 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 10 Sep 2021 22:10:22 +0200 Subject: feat(trace): Add support for WfCommons (WorkflowHub) traces This change adds support for reading WfCommons workflow traces in OpenDC. This functionality is available in the new `opendc-trace-wfformat` module. --- .../org/opendc/trace/gwf/GwfTaskTableReader.kt | 36 ++++++++++------------ .../org/opendc/trace/gwf/GwfTraceFormatTest.kt | 12 +++++--- 2 files changed, 23 insertions(+), 25 deletions(-) (limited to 'opendc-trace/opendc-trace-gwf/src') diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index fb9099bf..39eb5520 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -26,6 +26,8 @@ import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* +import java.time.Duration +import java.time.Instant import java.util.regex.Pattern /** @@ -52,10 +54,10 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } when (parser.currentName) { - "WorkflowID" -> workflowId = parser.longValue - "JobID" -> jobId = parser.longValue - "SubmitTime" -> submitTime = parser.longValue - "RunTime" -> runtime = parser.longValue + "WorkflowID" -> workflowId = parser.text + "JobID" -> jobId = parser.text + "SubmitTime" -> submitTime = Instant.ofEpochSecond(parser.longValue) + "RunTime" -> runtime = Duration.ofSeconds(parser.longValue) "NProcs" -> nProcs = parser.intValue "ReqNProcs" -> reqNProcs = parser.intValue "Dependencies" -> parseParents(parser.valueAsString) @@ -79,7 +81,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } override fun get(column: TableColumn): T { - val res: Any = when (column) { + val res: Any? = when (column) { TASK_WORKFLOW_ID -> workflowId TASK_ID -> jobId TASK_SUBMIT_TIME -> submitTime @@ -107,13 +109,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } override fun getLong(column: TableColumn): Long { - return when (column) { - TASK_WORKFLOW_ID -> workflowId - TASK_ID -> jobId - TASK_SUBMIT_TIME -> submitTime - TASK_RUNTIME -> runtime - else -> throw IllegalArgumentException("Invalid column") - } + throw IllegalArgumentException("Invalid column") } override fun getDouble(column: TableColumn): Double { @@ -163,10 +159,10 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { /** * Reader state fields. */ - private var workflowId = -1L - private var jobId = -1L - private var submitTime = -1L - private var runtime = -1L + private var workflowId: String? = null + private var jobId: String? = null + private var submitTime: Instant? = null + private var runtime: Duration? = null private var nProcs = -1 private var reqNProcs = -1 private var dependencies = emptySet() @@ -175,10 +171,10 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { * Reset the state. */ private fun reset() { - workflowId = -1 - jobId = -1 - submitTime = -1 - runtime = -1 + workflowId = null + jobId = null + submitTime = null + runtime = null nProcs = -1 reqNProcs = -1 dependencies = emptySet() diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index 6b0568fe..b209b979 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -29,6 +29,8 @@ import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.opendc.trace.* import java.net.URL +import java.time.Duration +import java.time.Instant /** * Test suite for the [GwfTraceFormat] class. @@ -90,11 +92,11 @@ internal class GwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(0L, reader.getLong(TASK_WORKFLOW_ID)) }, - { assertEquals(1L, reader.getLong(TASK_ID)) }, - { assertEquals(16, reader.getLong(TASK_SUBMIT_TIME)) }, - { assertEquals(11, reader.getLong(TASK_RUNTIME)) }, - { assertEquals(setOf(), reader.get(TASK_PARENTS)) }, + { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals("1", reader.get(TASK_ID)) }, + { assertEquals(Instant.ofEpochSecond(16), reader.get(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) }, + { assertEquals(emptySet(), reader.get(TASK_PARENTS)) }, ) } -- cgit v1.2.3 From 49dd8377c8bfde1e64e411c6a6f921c768b9b53b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 12 Sep 2021 11:22:07 +0200 Subject: refactor(trace): Add API for accessing available table columns This change adds a new API to the Table interface for accessing the table columns that the table supports. This does not necessarily mean that the column will have a value for every row, but that the table format has defined this particular column. --- .../kotlin/org/opendc/trace/gwf/GwfTaskTable.kt | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) (limited to 'opendc-trace/opendc-trace-gwf/src') diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt index 80a99d10..fd7bd068 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt @@ -34,18 +34,15 @@ internal class GwfTaskTable(private val factory: CsvFactory, private val url: UR override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - TASK_WORKFLOW_ID -> true - TASK_ID -> true - TASK_SUBMIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_ALLOC_NCPUS -> true - TASK_PARENTS -> true - else -> false - } - } + override val columns: List> = listOf( + TASK_WORKFLOW_ID, + TASK_ID, + TASK_SUBMIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS + ) override fun newReader(): TableReader { return GwfTaskTableReader(factory.createParser(url)) -- cgit v1.2.3 From 768bfa0d2ae763e359d74612385ce43c41afb432 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 15:12:10 +0200 Subject: feat(trace): Support column lookup via index This change adds support for looking up the column value through the column index. This enables faster lookup when processing very large traces. --- .../org/opendc/trace/gwf/GwfTaskTableReader.kt | 69 ++++++++++++---------- 1 file changed, 39 insertions(+), 30 deletions(-) (limited to 'opendc-trace/opendc-trace-gwf/src') diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index 39eb5520..aa4c543b 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -67,52 +67,43 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { return true } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - TASK_WORKFLOW_ID -> true - TASK_ID -> true - TASK_SUBMIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_ALLOC_NCPUS -> true - TASK_PARENTS -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + check(index in 0..columns.size) { "Invalid column" } + return false } - override fun get(column: TableColumn): T { - val res: Any? = when (column) { - TASK_WORKFLOW_ID -> workflowId - TASK_ID -> jobId - TASK_SUBMIT_TIME -> submitTime - TASK_RUNTIME -> runtime - TASK_REQ_NCPUS -> nProcs - TASK_ALLOC_NCPUS -> reqNProcs - TASK_PARENTS -> dependencies + override fun get(index: Int): Any? { + return when (index) { + COL_JOB_ID -> jobId + COL_WORKFLOW_ID -> workflowId + COL_SUBMIT_TIME -> submitTime + COL_RUNTIME -> runtime + COL_REQ_NPROC -> getInt(index) + COL_NPROC -> getInt(index) + COL_DEPS -> dependencies else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn): Int { - return when (column) { - TASK_REQ_NCPUS -> nProcs - TASK_ALLOC_NCPUS -> reqNProcs + override fun getInt(index: Int): Int { + return when (index) { + COL_REQ_NPROC -> reqNProcs + COL_NPROC -> nProcs else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn): Double { + override fun getDouble(index: Int): Double { throw IllegalArgumentException("Invalid column") } @@ -180,6 +171,24 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { dependencies = emptySet() } + private val COL_WORKFLOW_ID = 0 + private val COL_JOB_ID = 1 + private val COL_SUBMIT_TIME = 2 + private val COL_RUNTIME = 3 + private val COL_NPROC = 4 + private val COL_REQ_NPROC = 5 + private val COL_DEPS = 6 + + private val columns = mapOf( + TASK_ID to COL_JOB_ID, + TASK_WORKFLOW_ID to COL_WORKFLOW_ID, + TASK_SUBMIT_TIME to COL_SUBMIT_TIME, + TASK_RUNTIME to COL_RUNTIME, + TASK_ALLOC_NCPUS to COL_NPROC, + TASK_REQ_NCPUS to COL_REQ_NPROC, + TASK_PARENTS to COL_DEPS + ) + companion object { /** * The [CsvSchema] that is used to parse the trace. -- cgit v1.2.3 From 140aafdaa711b0fdeacf99b9c7e70b706b8490f4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 15:40:13 +0200 Subject: feat(trace): Add property for describing partition keys --- .../src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt | 2 ++ 1 file changed, 2 insertions(+) (limited to 'opendc-trace/opendc-trace-gwf/src') diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt index fd7bd068..ca720de4 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt @@ -44,6 +44,8 @@ internal class GwfTaskTable(private val factory: CsvFactory, private val url: UR TASK_PARENTS ) + override val partitionKeys: List> = listOf(TASK_WORKFLOW_ID) + override fun newReader(): TableReader { return GwfTaskTableReader(factory.createParser(url)) } -- cgit v1.2.3 From c7fff03408ee3109d0a39a96c043584a2d8f67ca Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 22:04:23 +0200 Subject: refactor(trace): Simplify TraceFormat SPI interface This change simplifies the TraceFormat SPI interface by reducing the number of interfaces that implementors need to implement to only TraceFormat. --- .../kotlin/org/opendc/trace/gwf/GwfTaskTable.kt | 58 -------------------- .../main/kotlin/org/opendc/trace/gwf/GwfTrace.kt | 46 ---------------- .../kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt | 38 ++++++++++---- .../org/opendc/trace/gwf/GwfTraceFormatTest.kt | 61 ++++------------------ 4 files changed, 39 insertions(+), 164 deletions(-) delete mode 100644 opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt delete mode 100644 opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt (limited to 'opendc-trace/opendc-trace-gwf/src') diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt deleted file mode 100644 index ca720de4..00000000 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.gwf - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.net.URL - -/** - * A [Table] containing the tasks in a GWF trace. - */ -internal class GwfTaskTable(private val factory: CsvFactory, private val url: URL) : Table { - override val name: String = TABLE_TASKS - - override val isSynthetic: Boolean = false - - override val columns: List> = listOf( - TASK_WORKFLOW_ID, - TASK_ID, - TASK_SUBMIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_ALLOC_NCPUS, - TASK_PARENTS - ) - - override val partitionKeys: List> = listOf(TASK_WORKFLOW_ID) - - override fun newReader(): TableReader { - return GwfTaskTableReader(factory.createParser(url)) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Invalid partition $partition") - } - - override fun toString(): String = "GwfTaskTable" -} diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt deleted file mode 100644 index 166c1e56..00000000 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTrace.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.gwf - -import com.fasterxml.jackson.dataformat.csv.CsvFactory -import org.opendc.trace.* -import java.net.URL - -/** - * [Trace] implementation for the GWF format. - */ -public class GwfTrace internal constructor(private val factory: CsvFactory, private val url: URL) : Trace { - override val tables: List = listOf(TABLE_TASKS) - - override fun containsTable(name: String): Boolean = TABLE_TASKS == name - - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null - } - - return GwfTaskTable(factory, url) - } - - override fun toString(): String = "GwfTrace[$url]" -} diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt index 6d542503..0f7b9d6e 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt @@ -24,10 +24,10 @@ package org.opendc.trace.gwf import com.fasterxml.jackson.dataformat.csv.CsvFactory import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import java.nio.file.Path /** * A [TraceFormat] implementation for the GWF trace format. @@ -45,12 +45,30 @@ public class GwfTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) - /** - * Read the tasks in the GWF trace. - */ - public override fun open(url: URL): GwfTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return GwfTrace(factory, url) + override fun getTables(path: Path): List = listOf(TABLE_TASKS) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_TASKS -> TableDetails( + listOf( + TASK_WORKFLOW_ID, + TASK_ID, + TASK_SUBMIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS, + ), + listOf(TASK_WORKFLOW_ID) + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile())) + else -> throw IllegalArgumentException("Table $table not supported") + } } } diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index b209b979..7fe403b2 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -22,13 +22,10 @@ package org.opendc.trace.gwf +import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.* -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.junit.jupiter.api.assertDoesNotThrow -import org.junit.jupiter.api.assertThrows import org.opendc.trace.* -import java.net.URL +import java.nio.file.Paths import java.time.Duration import java.time.Instant @@ -36,59 +33,32 @@ import java.time.Instant * Test suite for the [GwfTraceFormat] class. */ internal class GwfTraceFormatTest { - @Test - fun testTraceExists() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - assertDoesNotThrow { - format.open(input) - } - } - - @Test - fun testTraceDoesNotExists() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - assertThrows { - format.open(URL(input.toString() + "help")) - } - } + private val format = GwfTraceFormat() @Test fun testTables() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val trace = format.open(input) + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) - assertEquals(listOf(TABLE_TASKS), trace.tables) + assertEquals(listOf(TABLE_TASKS), format.getTables(path)) } @Test fun testTableExists() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS) - - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) + assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } } @Test fun testTableDoesNotExist() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val trace = format.open(input) + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + assertThrows { format.getDetails(path, "test") } } @Test fun testTableReader() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS)!! - val reader = table.newReader() + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) + val reader = format.newReader(path, TABLE_TASKS) assertAll( { assertTrue(reader.nextRow()) }, @@ -99,13 +69,4 @@ internal class GwfTraceFormatTest { { assertEquals(emptySet(), reader.get(TASK_PARENTS)) }, ) } - - @Test - fun testTableReaderPartition() { - val input = checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")) - val format = GwfTraceFormat() - val table = format.open(input).getTable(TABLE_TASKS)!! - - assertThrows { table.newReader("test") } - } } -- cgit v1.2.3 From 68ef3700ed2f69bcf0118bb69eda71e6b1f4d54f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 21 Sep 2021 11:34:34 +0200 Subject: feat(trace): Add support for writing traces This change adds a new API for writing traces in a trace format. Currently, writing is only supported by the OpenDC VM format, but over time the other formats will also have support for writing added. --- .../src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'opendc-trace/opendc-trace-gwf/src') diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt index 0f7b9d6e..d4287420 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt @@ -45,6 +45,10 @@ public class GwfTraceFormat : TraceFormat { .enable(CsvParser.Feature.ALLOW_COMMENTS) .enable(CsvParser.Feature.TRIM_SPACES) + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + override fun getTables(path: Path): List = listOf(TABLE_TASKS) override fun getDetails(path: Path, table: String): TableDetails { @@ -71,4 +75,8 @@ public class GwfTraceFormat : TraceFormat { else -> throw IllegalArgumentException("Table $table not supported") } } + + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } } -- cgit v1.2.3