diff options
Diffstat (limited to 'opendc-trace/opendc-trace-swf/src')
3 files changed, 127 insertions, 45 deletions
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt index 40b604c3..b2734fe7 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt @@ -27,12 +27,18 @@ import org.opendc.trace.conv.* import java.io.BufferedReader import java.time.Duration import java.time.Instant +import java.util.* /** * A [TableReader] implementation for the SWF format. */ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableReader { /** + * A flag to indicate the state of the reader + */ + private var state = State.Pending + + /** * The current row. */ private var fields = emptyList<String>() @@ -43,11 +49,23 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea private val whitespace = "\\s+".toRegex() override fun nextRow(): Boolean { - var line: String + var line: String? var num = 0 + val state = state + if (state == State.Closed) { + return false + } else if (state == State.Pending) { + this.state = State.Active + } + while (true) { - line = reader.readLine() ?: return false + line = reader.readLine() + + if (line == null) { + this.state = State.Closed + return false + } num++ if (line.isBlank()) { @@ -61,7 +79,7 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea break } - fields = line.trim().split(whitespace) + fields = line!!.trim().split(whitespace) if (fields.size < 18) { throw IllegalArgumentException("Invalid format at line $line") @@ -70,48 +88,103 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea return true } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> COL_JOB_ID + TASK_SUBMIT_TIME -> COL_SUBMIT_TIME + TASK_WAIT_TIME -> COL_WAIT_TIME + TASK_RUNTIME -> COL_RUN_TIME + TASK_ALLOC_NCPUS -> COL_ALLOC_NCPUS + TASK_REQ_NCPUS -> COL_REQ_NCPUS + TASK_STATUS -> COL_STATUS + TASK_USER_ID -> COL_USER_ID + TASK_GROUP_ID -> COL_GROUP_ID + TASK_PARENTS -> COL_PARENT_JOB + else -> -1 + } + } override fun isNull(index: Int): Boolean { - require(index in columns.values) { "Invalid column index" } + require(index in COL_JOB_ID..COL_PARENT_THINK_TIME) { "Invalid column index" } return false } - override fun get(index: Int): Any? { + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + check(state == State.Active) { "No active row" } + return when (index) { + COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> fields[index].toInt(10) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun getString(index: Int): String { + check(state == State.Active) { "No active row" } return when (index) { COL_JOB_ID -> fields[index] - COL_SUBMIT_TIME -> Instant.ofEpochSecond(fields[index].toLong(10)) - COL_WAIT_TIME, COL_RUN_TIME -> Duration.ofSeconds(fields[index].toLong(10)) - COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> getInt(index) - COL_PARENT_JOB -> { - val parent = fields[index].toLong(10) - if (parent < 0) emptySet() else setOf(parent) - } else -> throw IllegalArgumentException("Invalid column") } } - override fun getBoolean(index: Int): Boolean { + override fun getUUID(index: Int): UUID? { throw IllegalArgumentException("Invalid column") } - override fun getInt(index: Int): Int { + override fun getInstant(index: Int): Instant? { + check(state == State.Active) { "No active row" } return when (index) { - COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> fields[index].toInt(10) + COL_SUBMIT_TIME -> Instant.ofEpochSecond(fields[index].toLong(10)) else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(index: Int): Long { + override fun getDuration(index: Int): Duration? { + check(state == State.Active) { "No active row" } + return when (index) { + COL_WAIT_TIME, COL_RUN_TIME -> Duration.ofSeconds(fields[index].toLong(10)) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { throw IllegalArgumentException("Invalid column") } - override fun getDouble(index: Int): Double { + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + check(state == State.Active) { "No active row" } + @Suppress("UNCHECKED_CAST") + return when (index) { + COL_PARENT_JOB -> { + require(elementType.isAssignableFrom(String::class.java)) + val parent = fields[index].toLong(10) + if (parent < 0) emptySet() else setOf(parent) + } + else -> throw IllegalArgumentException("Invalid column") + } as Set<T>? + } + + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { throw IllegalArgumentException("Invalid column") } override fun close() { reader.close() + state = State.Closed } /** @@ -136,16 +209,7 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea private val COL_PARENT_JOB = 16 private val COL_PARENT_THINK_TIME = 17 - private val columns = mapOf( - TASK_ID to COL_JOB_ID, - TASK_SUBMIT_TIME to COL_SUBMIT_TIME, - TASK_WAIT_TIME to COL_WAIT_TIME, - TASK_RUNTIME to COL_RUN_TIME, - TASK_ALLOC_NCPUS to COL_ALLOC_NCPUS, - TASK_REQ_NCPUS to COL_REQ_NCPUS, - TASK_STATUS to COL_STATUS, - TASK_USER_ID to COL_USER_ID, - TASK_GROUP_ID to COL_GROUP_ID, - TASK_PARENTS to COL_PARENT_JOB - ) + private enum class State { + Pending, Active, Closed + } } diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt index 916a5eca..575a1740 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt @@ -47,24 +47,23 @@ public class SwfTraceFormat : TraceFormat { return when (table) { TABLE_TASKS -> TableDetails( listOf( - TASK_ID, - TASK_SUBMIT_TIME, - TASK_WAIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_ALLOC_NCPUS, - TASK_PARENTS, - TASK_STATUS, - TASK_GROUP_ID, - TASK_USER_ID - ), - emptyList() + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), + TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_STATUS, TableColumnType.Int), + TableColumn(TASK_GROUP_ID, TableColumnType.Int), + TableColumn(TASK_USER_ID, TableColumnType.Int) + ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { + override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { return when (table) { TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader()) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt index c3d644e8..06a500d8 100644 --- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -24,14 +24,18 @@ package org.opendc.trace.swf import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.* +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader import org.opendc.trace.conv.TABLE_TASKS import org.opendc.trace.conv.TASK_ALLOC_NCPUS import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.testkit.TableReaderTestKit import java.nio.file.Paths /** * Test suite for the [SwfTraceFormat] class. */ +@DisplayName("SWF TraceFormat") internal class SwfTraceFormatTest { private val format = SwfTraceFormat() @@ -62,13 +66,28 @@ internal class SwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1", reader.get(TASK_ID)) }, + { assertEquals("1", reader.getString(TASK_ID)) }, { assertEquals(306, reader.getInt(TASK_ALLOC_NCPUS)) }, { assertTrue(reader.nextRow()) }, - { assertEquals("2", reader.get(TASK_ID)) }, + { assertEquals("2", reader.getString(TASK_ID)) }, { assertEquals(17, reader.getInt(TASK_ALLOC_NCPUS)) }, ) reader.close() } + + @DisplayName("TableReader for Tasks") + @Nested + inner class TasksTableReaderTest : TableReaderTestKit() { + override lateinit var reader: TableReader + override lateinit var columns: List<TableColumn> + + @BeforeEach + fun setUp() { + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) + + columns = format.getDetails(path, TABLE_TASKS).columns + reader = format.newReader(path, TABLE_TASKS, null) + } + } } |
