summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-swf/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-06-09 10:31:41 +0200
committerGitHub <noreply@github.com>2022-06-09 10:31:41 +0200
commitd146814bbbb86bfcb19ccb94250424703e9179e5 (patch)
treebf20f51b434d56e60ad013568ac1a32b912a3b5e /opendc-trace/opendc-trace-swf/src
parent61b6550d7a476ab1aae45a5b9385dfd6ca4f6b6f (diff)
parent9d759c9bc987965fae8b0c16c000772c546bf3a2 (diff)
merge: Introduce schema for trace API (#88)
This pull request updates the OpenDC trace API to support proper specification of a schema of the tables exposed by the traces. This functionality makes it easier for the API consumer to understand the types exposed by the API. ## Implementation Notes :hammer_and_pick: * Introduce type system for trace API * Add benchmarks for odcvm trace format * Add benchmarks for Azure trace format * Add conformance suite for OpenDC trace API ## External Dependencies :four_leaf_clover: * N/A ## Breaking API Changes :warning: * Removal of typed `TableColumn`. Instead, `TableColumn` instances are now used to describe the columns belonging to some table. * `TableReader` and `TableWriter` do not support accessing arbitrary objects anymore. Instead, only the types supported by the type system are exposed.
Diffstat (limited to 'opendc-trace/opendc-trace-swf/src')
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt124
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt25
-rw-r--r--opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt23
3 files changed, 127 insertions, 45 deletions
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
index 40b604c3..b2734fe7 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
@@ -27,12 +27,18 @@ import org.opendc.trace.conv.*
import java.io.BufferedReader
import java.time.Duration
import java.time.Instant
+import java.util.*
/**
* A [TableReader] implementation for the SWF format.
*/
internal class SwfTaskTableReader(private val reader: BufferedReader) : TableReader {
/**
+ * A flag to indicate the state of the reader
+ */
+ private var state = State.Pending
+
+ /**
* The current row.
*/
private var fields = emptyList<String>()
@@ -43,11 +49,23 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
private val whitespace = "\\s+".toRegex()
override fun nextRow(): Boolean {
- var line: String
+ var line: String?
var num = 0
+ val state = state
+ if (state == State.Closed) {
+ return false
+ } else if (state == State.Pending) {
+ this.state = State.Active
+ }
+
while (true) {
- line = reader.readLine() ?: return false
+ line = reader.readLine()
+
+ if (line == null) {
+ this.state = State.Closed
+ return false
+ }
num++
if (line.isBlank()) {
@@ -61,7 +79,7 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
break
}
- fields = line.trim().split(whitespace)
+ fields = line!!.trim().split(whitespace)
if (fields.size < 18) {
throw IllegalArgumentException("Invalid format at line $line")
@@ -70,48 +88,103 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
return true
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ override fun resolve(name: String): Int {
+ return when (name) {
+ TASK_ID -> COL_JOB_ID
+ TASK_SUBMIT_TIME -> COL_SUBMIT_TIME
+ TASK_WAIT_TIME -> COL_WAIT_TIME
+ TASK_RUNTIME -> COL_RUN_TIME
+ TASK_ALLOC_NCPUS -> COL_ALLOC_NCPUS
+ TASK_REQ_NCPUS -> COL_REQ_NCPUS
+ TASK_STATUS -> COL_STATUS
+ TASK_USER_ID -> COL_USER_ID
+ TASK_GROUP_ID -> COL_GROUP_ID
+ TASK_PARENTS -> COL_PARENT_JOB
+ else -> -1
+ }
+ }
override fun isNull(index: Int): Boolean {
- require(index in columns.values) { "Invalid column index" }
+ require(index in COL_JOB_ID..COL_PARENT_THINK_TIME) { "Invalid column index" }
return false
}
- override fun get(index: Int): Any? {
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(index: Int): Int {
+ check(state == State.Active) { "No active row" }
+ return when (index) {
+ COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> fields[index].toInt(10)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(index: Int): Double {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getString(index: Int): String {
+ check(state == State.Active) { "No active row" }
return when (index) {
COL_JOB_ID -> fields[index]
- COL_SUBMIT_TIME -> Instant.ofEpochSecond(fields[index].toLong(10))
- COL_WAIT_TIME, COL_RUN_TIME -> Duration.ofSeconds(fields[index].toLong(10))
- COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> getInt(index)
- COL_PARENT_JOB -> {
- val parent = fields[index].toLong(10)
- if (parent < 0) emptySet() else setOf(parent)
- }
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getBoolean(index: Int): Boolean {
+ override fun getUUID(index: Int): UUID? {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(index: Int): Int {
+ override fun getInstant(index: Int): Instant? {
+ check(state == State.Active) { "No active row" }
return when (index) {
- COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> fields[index].toInt(10)
+ COL_SUBMIT_TIME -> Instant.ofEpochSecond(fields[index].toLong(10))
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(index: Int): Long {
+ override fun getDuration(index: Int): Duration? {
+ check(state == State.Active) { "No active row" }
+ return when (index) {
+ COL_WAIT_TIME, COL_RUN_TIME -> Duration.ofSeconds(fields[index].toLong(10))
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(index: Int): Double {
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ check(state == State.Active) { "No active row" }
+ @Suppress("UNCHECKED_CAST")
+ return when (index) {
+ COL_PARENT_JOB -> {
+ require(elementType.isAssignableFrom(String::class.java))
+ val parent = fields[index].toLong(10)
+ if (parent < 0) emptySet() else setOf(parent)
+ }
+ else -> throw IllegalArgumentException("Invalid column")
+ } as Set<T>?
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
throw IllegalArgumentException("Invalid column")
}
override fun close() {
reader.close()
+ state = State.Closed
}
/**
@@ -136,16 +209,7 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
private val COL_PARENT_JOB = 16
private val COL_PARENT_THINK_TIME = 17
- private val columns = mapOf(
- TASK_ID to COL_JOB_ID,
- TASK_SUBMIT_TIME to COL_SUBMIT_TIME,
- TASK_WAIT_TIME to COL_WAIT_TIME,
- TASK_RUNTIME to COL_RUN_TIME,
- TASK_ALLOC_NCPUS to COL_ALLOC_NCPUS,
- TASK_REQ_NCPUS to COL_REQ_NCPUS,
- TASK_STATUS to COL_STATUS,
- TASK_USER_ID to COL_USER_ID,
- TASK_GROUP_ID to COL_GROUP_ID,
- TASK_PARENTS to COL_PARENT_JOB
- )
+ private enum class State {
+ Pending, Active, Closed
+ }
}
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
index 916a5eca..575a1740 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
@@ -47,24 +47,23 @@ public class SwfTraceFormat : TraceFormat {
return when (table) {
TABLE_TASKS -> TableDetails(
listOf(
- TASK_ID,
- TASK_SUBMIT_TIME,
- TASK_WAIT_TIME,
- TASK_RUNTIME,
- TASK_REQ_NCPUS,
- TASK_ALLOC_NCPUS,
- TASK_PARENTS,
- TASK_STATUS,
- TASK_GROUP_ID,
- TASK_USER_ID
- ),
- emptyList()
+ TableColumn(TASK_ID, TableColumnType.String),
+ TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
+ TableColumn(TASK_WAIT_TIME, TableColumnType.Duration),
+ TableColumn(TASK_RUNTIME, TableColumnType.Duration),
+ TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_STATUS, TableColumnType.Int),
+ TableColumn(TASK_GROUP_ID, TableColumnType.Int),
+ TableColumn(TASK_USER_ID, TableColumnType.Int)
+ )
)
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
return when (table) {
TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader())
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
index c3d644e8..06a500d8 100644
--- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
@@ -24,14 +24,18 @@ package org.opendc.trace.swf
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.*
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableReader
import org.opendc.trace.conv.TABLE_TASKS
import org.opendc.trace.conv.TASK_ALLOC_NCPUS
import org.opendc.trace.conv.TASK_ID
+import org.opendc.trace.testkit.TableReaderTestKit
import java.nio.file.Paths
/**
* Test suite for the [SwfTraceFormat] class.
*/
+@DisplayName("SWF TraceFormat")
internal class SwfTraceFormatTest {
private val format = SwfTraceFormat()
@@ -62,13 +66,28 @@ internal class SwfTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1", reader.get(TASK_ID)) },
+ { assertEquals("1", reader.getString(TASK_ID)) },
{ assertEquals(306, reader.getInt(TASK_ALLOC_NCPUS)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals("2", reader.get(TASK_ID)) },
+ { assertEquals("2", reader.getString(TASK_ID)) },
{ assertEquals(17, reader.getInt(TASK_ALLOC_NCPUS)) },
)
reader.close()
}
+
+ @DisplayName("TableReader for Tasks")
+ @Nested
+ inner class TasksTableReaderTest : TableReaderTestKit() {
+ override lateinit var reader: TableReader
+ override lateinit var columns: List<TableColumn>
+
+ @BeforeEach
+ fun setUp() {
+ val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI())
+
+ columns = format.getDetails(path, TABLE_TASKS).columns
+ reader = format.newReader(path, TABLE_TASKS, null)
+ }
+ }
}