From 5c6bf9739aa0ffd9651df4fcb4cd46a8545144f0 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 31 Aug 2021 18:08:14 +0200 Subject: refactor(trace): Implement trace API for SWF reader This change updates the SWF trace reader to support the new streaming trace API. --- .../kotlin/org/opendc/trace/swf/SwfTaskTable.kt | 63 ++++++++ .../org/opendc/trace/swf/SwfTaskTableReader.kt | 162 +++++++++++++++++++++ .../main/kotlin/org/opendc/trace/swf/SwfTrace.kt | 46 ++++++ .../kotlin/org/opendc/trace/swf/SwfTraceFormat.kt | 43 ++++++ .../services/org.opendc.trace.spi.TraceFormat | 1 + .../org/opendc/trace/swf/SwfTraceFormatTest.kt | 107 ++++++++++++++ .../opendc-trace-swf/src/test/resources/trace.swf | 6 + 7 files changed, 428 insertions(+) create mode 100644 opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt create mode 100644 opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt create mode 100644 opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt create mode 100644 opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt create mode 100644 opendc-trace/opendc-trace-swf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat create mode 100644 opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt create mode 100644 opendc-trace/opendc-trace-swf/src/test/resources/trace.swf (limited to 'opendc-trace/opendc-trace-swf/src') diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt new file mode 100644 index 00000000..12a51a2f --- /dev/null +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.swf + +import org.opendc.trace.* +import java.nio.file.Path +import kotlin.io.path.bufferedReader + +/** + * A [Table] containing the tasks in a SWF trace. + */ +internal class SwfTaskTable(private val path: Path) : Table { + override val name: String = TABLE_TASKS + + override val isSynthetic: Boolean = false + + override fun isSupported(column: TableColumn<*>): Boolean { + return when (column) { + TASK_ID -> true + TASK_SUBMIT_TIME -> true + TASK_WAIT_TIME -> true + TASK_RUNTIME -> true + TASK_REQ_NCPUS -> true + TASK_ALLOC_NCPUS -> true + TASK_PARENTS -> true + TASK_STATUS -> true + TASK_GROUP_ID -> true + TASK_USER_ID -> true + else -> false + } + } + + override fun newReader(): TableReader { + val reader = path.bufferedReader() + return SwfTaskTableReader(reader) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("Invalid partition $partition") + } + + override fun toString(): String = "SwfTaskTable" +} diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt new file mode 100644 index 00000000..5f879a54 --- /dev/null +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt @@ -0,0 +1,162 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.swf + +import org.opendc.trace.* +import java.io.BufferedReader + +/** + * A [TableReader] implementation for the SWF format. + */ +internal class SwfTaskTableReader(private val reader: BufferedReader) : TableReader { + /** + * The current row. + */ + private var fields = emptyList() + + /** + * A [Regex] object to match whitespace. + */ + private val whitespace = "\\s+".toRegex() + + override fun nextRow(): Boolean { + var line: String + var num = 0 + + while (true) { + line = reader.readLine() ?: return false + num++ + + if (line.isBlank()) { + // Ignore empty lines + continue + } else if (line.startsWith(";")) { + // Ignore comments for now + continue + } + + break + } + + fields = line.trim().split(whitespace) + + if (fields.size < 18) { + throw IllegalArgumentException("Invalid format at line $line") + } + + return true + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + TASK_ID -> true + TASK_SUBMIT_TIME -> true + TASK_WAIT_TIME -> true + TASK_RUNTIME -> true + TASK_REQ_NCPUS -> true + TASK_ALLOC_NCPUS -> true + TASK_PARENTS -> true + TASK_STATUS -> true + TASK_GROUP_ID -> true + TASK_USER_ID -> true + else -> false + } + } + + override fun get(column: TableColumn): T { + val res: Any = when (column) { + TASK_ID -> getLong(TASK_ID) + TASK_SUBMIT_TIME -> getLong(TASK_SUBMIT_TIME) + TASK_WAIT_TIME -> getLong(TASK_WAIT_TIME) + TASK_RUNTIME -> getLong(TASK_RUNTIME) + TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS) + TASK_ALLOC_NCPUS -> getInt(TASK_ALLOC_NCPUS) + TASK_PARENTS -> { + val parent = fields[COL_PARENT_JOB].toLong(10) + if (parent < 0) emptySet() else setOf(parent) + } + TASK_STATUS -> getInt(TASK_STATUS) + TASK_GROUP_ID -> getInt(TASK_GROUP_ID) + TASK_USER_ID -> getInt(TASK_USER_ID) + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn): Int { + return when (column) { + TASK_REQ_NCPUS -> fields[COL_REQ_NCPUS].toInt(10) + TASK_ALLOC_NCPUS -> fields[COL_ALLOC_NCPUS].toInt(10) + TASK_STATUS -> fields[COL_STATUS].toInt(10) + TASK_GROUP_ID -> fields[COL_GROUP_ID].toInt(10) + TASK_USER_ID -> fields[COL_USER_ID].toInt(10) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn): Long { + return when (column) { + TASK_ID -> fields[COL_JOB_ID].toLong(10) + TASK_SUBMIT_TIME -> fields[COL_SUBMIT_TIME].toLong(10) + TASK_WAIT_TIME -> fields[COL_WAIT_TIME].toLong(10) + TASK_RUNTIME -> fields[COL_RUN_TIME].toLong(10) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDouble(column: TableColumn): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + reader.close() + } + + /** + * Default column indices for the SWF format. + */ + private val COL_JOB_ID = 0 + private val COL_SUBMIT_TIME = 1 + private val COL_WAIT_TIME = 2 + private val COL_RUN_TIME = 3 + private val COL_ALLOC_NCPUS = 4 + private val COL_AVG_CPU_TIME = 5 + private val COL_USED_MEM = 6 + private val COL_REQ_NCPUS = 7 + private val COL_REQ_TIME = 8 + private val COL_REQ_MEM = 9 + private val COL_STATUS = 10 + private val COL_USER_ID = 11 + private val COL_GROUP_ID = 12 + private val COL_EXEC_NUM = 13 + private val COL_QUEUE_NUM = 14 + private val COL_PART_NUM = 15 + private val COL_PARENT_JOB = 16 + private val COL_PARENT_THINK_TIME = 17 +} diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt new file mode 100644 index 00000000..d4da735e --- /dev/null +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.swf + +import org.opendc.trace.TABLE_TASKS +import org.opendc.trace.Table +import org.opendc.trace.Trace +import java.nio.file.Path + +/** + * [Trace] implementation for the SWF format. + */ +public class SwfTrace internal constructor(private val path: Path) : Trace { + override val tables: List = listOf(TABLE_TASKS) + + override fun containsTable(name: String): Boolean = TABLE_TASKS == name + + override fun getTable(name: String): Table? { + if (!containsTable(name)) { + return null + } + return SwfTaskTable(path) + } + + override fun toString(): String = "SwfTrace[$path]" +} diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt new file mode 100644 index 00000000..36c3122e --- /dev/null +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.swf + +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * Support for the Standard Workload Format (SWF) in OpenDC. + * + * The standard is defined by the PWA, see here: https://www.cse.huji.ac.il/labs/parallel/workload/swf.html + */ +public class SwfTraceFormat : TraceFormat { + override val name: String = "swf" + + override fun open(url: URL): SwfTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return SwfTrace(path) + } +} diff --git a/opendc-trace/opendc-trace-swf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-swf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat new file mode 100644 index 00000000..6c6b0eb2 --- /dev/null +++ b/opendc-trace/opendc-trace-swf/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat @@ -0,0 +1 @@ +org.opendc.trace.swf.SwfTraceFormat diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt new file mode 100644 index 00000000..9686891b --- /dev/null +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.swf + +import org.junit.jupiter.api.* +import org.junit.jupiter.api.Assertions.* +import org.opendc.trace.TABLE_TASKS +import org.opendc.trace.TASK_ALLOC_NCPUS +import org.opendc.trace.TASK_ID +import java.net.URL + +/** + * Test suite for the [SwfTraceFormat] class. + */ +internal class SwfTraceFormatTest { + @Test + fun testTraceExists() { + val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) + val format = SwfTraceFormat() + assertDoesNotThrow { + format.open(input) + } + } + + @Test + fun testTraceDoesNotExists() { + val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) + val format = SwfTraceFormat() + assertThrows { + format.open(URL(input.toString() + "help")) + } + } + + @Test + fun testTables() { + val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) + val trace = SwfTraceFormat().open(input) + + assertEquals(listOf(TABLE_TASKS), trace.tables) + } + + @Test + fun testTableExists() { + val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) + val table = SwfTraceFormat().open(input).getTable(TABLE_TASKS) + + assertNotNull(table) + assertDoesNotThrow { table!!.newReader() } + } + + @Test + fun testTableDoesNotExist() { + val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) + val trace = SwfTraceFormat().open(input) + + assertFalse(trace.containsTable("test")) + assertNull(trace.getTable("test")) + } + + @Test + fun testReader() { + val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) + val trace = SwfTraceFormat().open(input) + val reader = trace.getTable(TABLE_TASKS)!!.newReader() + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals(1, reader.getLong(TASK_ID)) }, + { assertEquals(306, reader.getInt(TASK_ALLOC_NCPUS)) }, + { assertTrue(reader.nextRow()) }, + { assertEquals(2, reader.getLong(TASK_ID)) }, + { assertEquals(17, reader.getInt(TASK_ALLOC_NCPUS)) }, + ) + + reader.close() + } + + @Test + fun testReaderPartition() { + val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) + val trace = SwfTraceFormat().open(input) + + assertThrows { + trace.getTable(TABLE_TASKS)!!.newReader("test") + } + } +} diff --git a/opendc-trace/opendc-trace-swf/src/test/resources/trace.swf b/opendc-trace/opendc-trace-swf/src/test/resources/trace.swf new file mode 100644 index 00000000..c3ecf890 --- /dev/null +++ b/opendc-trace/opendc-trace-swf/src/test/resources/trace.swf @@ -0,0 +1,6 @@ +; Excerpt from the PWA: CTC-SP2-1996-3.1-cln.swf + 1 0 588530 937 306 142.00 -1 -1 35100 -1 1 97 -1 307 3 -1 -1 -1 + 2 164472 356587 75 17 2.00 -1 -1 300 -1 1 81 -1 195 3 -1 -1 -1 + 3 197154 459987 35268 306 32792 -1 -1 35100 -1 0 97 -1 307 3 -1 -1 -1 + 4 310448 50431 29493 64 28745 -1 -1 64800 -1 1 38 -1 38 1 -1 -1 -1 + 5 310541 50766 29063 64 28191 -1 -1 64800 -1 1 38 -1 69 1 -1 -1 -1 -- cgit v1.2.3 From b7be3400bb4b21d0cd7021e2baf1f6ce43aba189 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 10 Sep 2021 22:10:22 +0200 Subject: feat(trace): Add support for WfCommons (WorkflowHub) traces This change adds support for reading WfCommons workflow traces in OpenDC. This functionality is available in the new `opendc-trace-wfformat` module. --- .../kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt | 18 +++++++----------- .../kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt | 4 ++-- 2 files changed, 9 insertions(+), 13 deletions(-) (limited to 'opendc-trace/opendc-trace-swf/src') diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt index 5f879a54..3f49c770 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt @@ -24,6 +24,8 @@ package org.opendc.trace.swf import org.opendc.trace.* import java.io.BufferedReader +import java.time.Duration +import java.time.Instant /** * A [TableReader] implementation for the SWF format. @@ -85,10 +87,10 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea override fun get(column: TableColumn): T { val res: Any = when (column) { - TASK_ID -> getLong(TASK_ID) - TASK_SUBMIT_TIME -> getLong(TASK_SUBMIT_TIME) - TASK_WAIT_TIME -> getLong(TASK_WAIT_TIME) - TASK_RUNTIME -> getLong(TASK_RUNTIME) + TASK_ID -> fields[COL_JOB_ID] + TASK_SUBMIT_TIME -> Instant.ofEpochSecond(fields[COL_SUBMIT_TIME].toLong(10)) + TASK_WAIT_TIME -> Duration.ofSeconds(fields[COL_WAIT_TIME].toLong(10)) + TASK_RUNTIME -> Duration.ofSeconds(fields[COL_RUN_TIME].toLong(10)) TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS) TASK_ALLOC_NCPUS -> getInt(TASK_ALLOC_NCPUS) TASK_PARENTS -> { @@ -121,13 +123,7 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea } override fun getLong(column: TableColumn): Long { - return when (column) { - TASK_ID -> fields[COL_JOB_ID].toLong(10) - TASK_SUBMIT_TIME -> fields[COL_SUBMIT_TIME].toLong(10) - TASK_WAIT_TIME -> fields[COL_WAIT_TIME].toLong(10) - TASK_RUNTIME -> fields[COL_RUN_TIME].toLong(10) - else -> throw IllegalArgumentException("Invalid column") - } + throw IllegalArgumentException("Invalid column") } override fun getDouble(column: TableColumn): Double { diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt index 9686891b..828c2bfa 100644 --- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -85,10 +85,10 @@ internal class SwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(1, reader.getLong(TASK_ID)) }, + { assertEquals("1", reader.get(TASK_ID)) }, { assertEquals(306, reader.getInt(TASK_ALLOC_NCPUS)) }, { assertTrue(reader.nextRow()) }, - { assertEquals(2, reader.getLong(TASK_ID)) }, + { assertEquals("2", reader.get(TASK_ID)) }, { assertEquals(17, reader.getInt(TASK_ALLOC_NCPUS)) }, ) -- cgit v1.2.3 From 49dd8377c8bfde1e64e411c6a6f921c768b9b53b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 12 Sep 2021 11:22:07 +0200 Subject: refactor(trace): Add API for accessing available table columns This change adds a new API to the Table interface for accessing the table columns that the table supports. This does not necessarily mean that the column will have a value for every row, but that the table format has defined this particular column. --- .../kotlin/org/opendc/trace/swf/SwfTaskTable.kt | 27 ++++++++++------------ 1 file changed, 12 insertions(+), 15 deletions(-) (limited to 'opendc-trace/opendc-trace-swf/src') diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt index 12a51a2f..7ec0d607 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt @@ -34,21 +34,18 @@ internal class SwfTaskTable(private val path: Path) : Table { override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - TASK_ID -> true - TASK_SUBMIT_TIME -> true - TASK_WAIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_ALLOC_NCPUS -> true - TASK_PARENTS -> true - TASK_STATUS -> true - TASK_GROUP_ID -> true - TASK_USER_ID -> true - else -> false - } - } + override val columns: List> = listOf( + TASK_ID, + TASK_SUBMIT_TIME, + TASK_WAIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS, + TASK_STATUS, + TASK_GROUP_ID, + TASK_USER_ID + ) override fun newReader(): TableReader { val reader = path.bufferedReader() -- cgit v1.2.3 From 768bfa0d2ae763e359d74612385ce43c41afb432 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 15:12:10 +0200 Subject: feat(trace): Support column lookup via index This change adds support for looking up the column value through the column index. This enables faster lookup when processing very large traces. --- .../org/opendc/trace/swf/SwfTaskTableReader.kt | 72 ++++++++++------------ 1 file changed, 32 insertions(+), 40 deletions(-) (limited to 'opendc-trace/opendc-trace-swf/src') diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt index 3f49c770..2f6ea6ee 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt @@ -69,64 +69,43 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea return true } - override fun hasColumn(column: TableColumn<*>): Boolean { - return when (column) { - TASK_ID -> true - TASK_SUBMIT_TIME -> true - TASK_WAIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_ALLOC_NCPUS -> true - TASK_PARENTS -> true - TASK_STATUS -> true - TASK_GROUP_ID -> true - TASK_USER_ID -> true - else -> false - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + + override fun isNull(index: Int): Boolean { + require(index in columns.values) { "Invalid column index" } + return false } - override fun get(column: TableColumn): T { - val res: Any = when (column) { - TASK_ID -> fields[COL_JOB_ID] - TASK_SUBMIT_TIME -> Instant.ofEpochSecond(fields[COL_SUBMIT_TIME].toLong(10)) - TASK_WAIT_TIME -> Duration.ofSeconds(fields[COL_WAIT_TIME].toLong(10)) - TASK_RUNTIME -> Duration.ofSeconds(fields[COL_RUN_TIME].toLong(10)) - TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS) - TASK_ALLOC_NCPUS -> getInt(TASK_ALLOC_NCPUS) - TASK_PARENTS -> { - val parent = fields[COL_PARENT_JOB].toLong(10) + override fun get(index: Int): Any? { + return when (index) { + COL_JOB_ID -> fields[index] + COL_SUBMIT_TIME -> Instant.ofEpochSecond(fields[index].toLong(10)) + COL_WAIT_TIME, COL_RUN_TIME -> Duration.ofSeconds(fields[index].toLong(10)) + COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> getInt(index) + COL_PARENT_JOB -> { + val parent = fields[index].toLong(10) if (parent < 0) emptySet() else setOf(parent) } - TASK_STATUS -> getInt(TASK_STATUS) - TASK_GROUP_ID -> getInt(TASK_GROUP_ID) - TASK_USER_ID -> getInt(TASK_USER_ID) else -> throw IllegalArgumentException("Invalid column") } - - @Suppress("UNCHECKED_CAST") - return res as T } - override fun getBoolean(column: TableColumn): Boolean { + override fun getBoolean(index: Int): Boolean { throw IllegalArgumentException("Invalid column") } - override fun getInt(column: TableColumn): Int { - return when (column) { - TASK_REQ_NCPUS -> fields[COL_REQ_NCPUS].toInt(10) - TASK_ALLOC_NCPUS -> fields[COL_ALLOC_NCPUS].toInt(10) - TASK_STATUS -> fields[COL_STATUS].toInt(10) - TASK_GROUP_ID -> fields[COL_GROUP_ID].toInt(10) - TASK_USER_ID -> fields[COL_USER_ID].toInt(10) + override fun getInt(index: Int): Int { + return when (index) { + COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> fields[index].toInt(10) else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(column: TableColumn): Long { + override fun getLong(index: Int): Long { throw IllegalArgumentException("Invalid column") } - override fun getDouble(column: TableColumn): Double { + override fun getDouble(index: Int): Double { throw IllegalArgumentException("Invalid column") } @@ -155,4 +134,17 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea private val COL_PART_NUM = 15 private val COL_PARENT_JOB = 16 private val COL_PARENT_THINK_TIME = 17 + + private val columns = mapOf( + TASK_ID to COL_JOB_ID, + TASK_SUBMIT_TIME to COL_SUBMIT_TIME, + TASK_WAIT_TIME to COL_WAIT_TIME, + TASK_RUNTIME to COL_RUN_TIME, + TASK_ALLOC_NCPUS to COL_ALLOC_NCPUS, + TASK_REQ_NCPUS to COL_REQ_NCPUS, + TASK_STATUS to COL_STATUS, + TASK_USER_ID to COL_USER_ID, + TASK_GROUP_ID to COL_GROUP_ID, + TASK_PARENTS to COL_PARENT_JOB + ) } -- cgit v1.2.3 From 140aafdaa711b0fdeacf99b9c7e70b706b8490f4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 15:40:13 +0200 Subject: feat(trace): Add property for describing partition keys --- .../src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt | 2 ++ 1 file changed, 2 insertions(+) (limited to 'opendc-trace/opendc-trace-swf/src') diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt index 7ec0d607..4898779d 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt @@ -47,6 +47,8 @@ internal class SwfTaskTable(private val path: Path) : Table { TASK_USER_ID ) + override val partitionKeys: List> = emptyList() + override fun newReader(): TableReader { val reader = path.bufferedReader() return SwfTaskTableReader(reader) -- cgit v1.2.3 From c7fff03408ee3109d0a39a96c043584a2d8f67ca Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 20 Sep 2021 22:04:23 +0200 Subject: refactor(trace): Simplify TraceFormat SPI interface This change simplifies the TraceFormat SPI interface by reducing the number of interfaces that implementors need to implement to only TraceFormat. --- .../kotlin/org/opendc/trace/swf/SwfTaskTable.kt | 62 ---------------------- .../main/kotlin/org/opendc/trace/swf/SwfTrace.kt | 46 ---------------- .../kotlin/org/opendc/trace/swf/SwfTraceFormat.kt | 39 +++++++++++--- .../org/opendc/trace/swf/SwfTraceFormatTest.kt | 53 ++++-------------- 4 files changed, 42 insertions(+), 158 deletions(-) delete mode 100644 opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt delete mode 100644 opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt (limited to 'opendc-trace/opendc-trace-swf/src') diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt deleted file mode 100644 index 4898779d..00000000 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.swf - -import org.opendc.trace.* -import java.nio.file.Path -import kotlin.io.path.bufferedReader - -/** - * A [Table] containing the tasks in a SWF trace. - */ -internal class SwfTaskTable(private val path: Path) : Table { - override val name: String = TABLE_TASKS - - override val isSynthetic: Boolean = false - - override val columns: List> = listOf( - TASK_ID, - TASK_SUBMIT_TIME, - TASK_WAIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_ALLOC_NCPUS, - TASK_PARENTS, - TASK_STATUS, - TASK_GROUP_ID, - TASK_USER_ID - ) - - override val partitionKeys: List> = emptyList() - - override fun newReader(): TableReader { - val reader = path.bufferedReader() - return SwfTaskTableReader(reader) - } - - override fun newReader(partition: String): TableReader { - throw IllegalArgumentException("Invalid partition $partition") - } - - override fun toString(): String = "SwfTaskTable" -} diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt deleted file mode 100644 index d4da735e..00000000 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.swf - -import org.opendc.trace.TABLE_TASKS -import org.opendc.trace.Table -import org.opendc.trace.Trace -import java.nio.file.Path - -/** - * [Trace] implementation for the SWF format. - */ -public class SwfTrace internal constructor(private val path: Path) : Trace { - override val tables: List = listOf(TABLE_TASKS) - - override fun containsTable(name: String): Boolean = TABLE_TASKS == name - - override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null - } - return SwfTaskTable(path) - } - - override fun toString(): String = "SwfTrace[$path]" -} diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt index 36c3122e..4cb7e49e 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt @@ -22,10 +22,11 @@ package org.opendc.trace.swf +import org.opendc.trace.* +import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import java.net.URL -import java.nio.file.Paths -import kotlin.io.path.exists +import java.nio.file.Path +import kotlin.io.path.bufferedReader /** * Support for the Standard Workload Format (SWF) in OpenDC. @@ -35,9 +36,33 @@ import kotlin.io.path.exists public class SwfTraceFormat : TraceFormat { override val name: String = "swf" - override fun open(url: URL): SwfTrace { - val path = Paths.get(url.toURI()) - require(path.exists()) { "URL $url does not exist" } - return SwfTrace(path) + override fun getTables(path: Path): List = listOf(TABLE_TASKS) + + override fun getDetails(path: Path, table: String): TableDetails { + return when (table) { + TABLE_TASKS -> TableDetails( + listOf( + TASK_ID, + TASK_SUBMIT_TIME, + TASK_WAIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS, + TASK_STATUS, + TASK_GROUP_ID, + TASK_USER_ID + ), + emptyList() + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader(path: Path, table: String): TableReader { + return when (table) { + TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader()) + else -> throw IllegalArgumentException("Table $table not supported") + } } } diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt index 828c2bfa..4dcd43f6 100644 --- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -27,61 +27,38 @@ import org.junit.jupiter.api.Assertions.* import org.opendc.trace.TABLE_TASKS import org.opendc.trace.TASK_ALLOC_NCPUS import org.opendc.trace.TASK_ID -import java.net.URL +import java.nio.file.Paths /** * Test suite for the [SwfTraceFormat] class. */ internal class SwfTraceFormatTest { - @Test - fun testTraceExists() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val format = SwfTraceFormat() - assertDoesNotThrow { - format.open(input) - } - } - - @Test - fun testTraceDoesNotExists() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val format = SwfTraceFormat() - assertThrows { - format.open(URL(input.toString() + "help")) - } - } + private val format = SwfTraceFormat() @Test fun testTables() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) - assertEquals(listOf(TABLE_TASKS), trace.tables) + assertEquals(listOf(TABLE_TASKS), format.getTables(path)) } @Test fun testTableExists() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val table = SwfTraceFormat().open(input).getTable(TABLE_TASKS) - - assertNotNull(table) - assertDoesNotThrow { table!!.newReader() } + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) + assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) } } @Test fun testTableDoesNotExist() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) - assertFalse(trace.containsTable("test")) - assertNull(trace.getTable("test")) + assertThrows { format.getDetails(path, "test") } } @Test fun testReader() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) - val reader = trace.getTable(TABLE_TASKS)!!.newReader() + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) + val reader = format.newReader(path, TABLE_TASKS) assertAll( { assertTrue(reader.nextRow()) }, @@ -94,14 +71,4 @@ internal class SwfTraceFormatTest { reader.close() } - - @Test - fun testReaderPartition() { - val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")) - val trace = SwfTraceFormat().open(input) - - assertThrows { - trace.getTable(TABLE_TASKS)!!.newReader("test") - } - } } -- cgit v1.2.3 From 68ef3700ed2f69bcf0118bb69eda71e6b1f4d54f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 21 Sep 2021 11:34:34 +0200 Subject: feat(trace): Add support for writing traces This change adds a new API for writing traces in a trace format. Currently, writing is only supported by the OpenDC VM format, but over time the other formats will also have support for writing added. --- .../src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'opendc-trace/opendc-trace-swf/src') diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt index 4cb7e49e..1fd076d5 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt @@ -36,6 +36,10 @@ import kotlin.io.path.bufferedReader public class SwfTraceFormat : TraceFormat { override val name: String = "swf" + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + override fun getTables(path: Path): List = listOf(TABLE_TASKS) override fun getDetails(path: Path, table: String): TableDetails { @@ -65,4 +69,8 @@ public class SwfTraceFormat : TraceFormat { else -> throw IllegalArgumentException("Table $table not supported") } } + + override fun newWriter(path: Path, table: String): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } } -- cgit v1.2.3