summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-swf
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-21 12:04:15 +0200
committerGitHub <noreply@github.com>2021-09-21 12:04:15 +0200
commit322d91db03a7d74a00ec623ce624f979c0b77c03 (patch)
tree73201888564accde4cfa107f4ffdb15e9f93d45c /opendc-trace/opendc-trace-swf
parent453c25c4b453fa0af26bebbd8863abfb79218119 (diff)
parent68ef3700ed2f69bcf0118bb69eda71e6b1f4d54f (diff)
merge: Add support for trace writing
This pull request extends the trace API to support writing new traces. - Unify columns of different tables - Support column lookup via index - Use index lookup in trace loader - Add property for describing partition keys - Simplify TraceFormat SPI interface - Add support for writing traces **Breaking API Changes** - `TraceFormat` SPI interface has been redesigned.
Diffstat (limited to 'opendc-trace/opendc-trace-swf')
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt60
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt72
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt46
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt47
-rw-r--r--opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt53
5 files changed, 82 insertions, 196 deletions
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt
deleted file mode 100644
index 7ec0d607..00000000
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.swf
-
-import org.opendc.trace.*
-import java.nio.file.Path
-import kotlin.io.path.bufferedReader
-
-/**
- * A [Table] containing the tasks in a SWF trace.
- */
-internal class SwfTaskTable(private val path: Path) : Table {
- override val name: String = TABLE_TASKS
-
- override val isSynthetic: Boolean = false
-
- override val columns: List<TableColumn<*>> = listOf(
- TASK_ID,
- TASK_SUBMIT_TIME,
- TASK_WAIT_TIME,
- TASK_RUNTIME,
- TASK_REQ_NCPUS,
- TASK_ALLOC_NCPUS,
- TASK_PARENTS,
- TASK_STATUS,
- TASK_GROUP_ID,
- TASK_USER_ID
- )
-
- override fun newReader(): TableReader {
- val reader = path.bufferedReader()
- return SwfTaskTableReader(reader)
- }
-
- override fun newReader(partition: String): TableReader {
- throw IllegalArgumentException("Invalid partition $partition")
- }
-
- override fun toString(): String = "SwfTaskTable"
-}
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
index 3f49c770..2f6ea6ee 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
@@ -69,64 +69,43 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
return true
}
- override fun hasColumn(column: TableColumn<*>): Boolean {
- return when (column) {
- TASK_ID -> true
- TASK_SUBMIT_TIME -> true
- TASK_WAIT_TIME -> true
- TASK_RUNTIME -> true
- TASK_REQ_NCPUS -> true
- TASK_ALLOC_NCPUS -> true
- TASK_PARENTS -> true
- TASK_STATUS -> true
- TASK_GROUP_ID -> true
- TASK_USER_ID -> true
- else -> false
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+
+ override fun isNull(index: Int): Boolean {
+ require(index in columns.values) { "Invalid column index" }
+ return false
}
- override fun <T> get(column: TableColumn<T>): T {
- val res: Any = when (column) {
- TASK_ID -> fields[COL_JOB_ID]
- TASK_SUBMIT_TIME -> Instant.ofEpochSecond(fields[COL_SUBMIT_TIME].toLong(10))
- TASK_WAIT_TIME -> Duration.ofSeconds(fields[COL_WAIT_TIME].toLong(10))
- TASK_RUNTIME -> Duration.ofSeconds(fields[COL_RUN_TIME].toLong(10))
- TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS)
- TASK_ALLOC_NCPUS -> getInt(TASK_ALLOC_NCPUS)
- TASK_PARENTS -> {
- val parent = fields[COL_PARENT_JOB].toLong(10)
+ override fun get(index: Int): Any? {
+ return when (index) {
+ COL_JOB_ID -> fields[index]
+ COL_SUBMIT_TIME -> Instant.ofEpochSecond(fields[index].toLong(10))
+ COL_WAIT_TIME, COL_RUN_TIME -> Duration.ofSeconds(fields[index].toLong(10))
+ COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> getInt(index)
+ COL_PARENT_JOB -> {
+ val parent = fields[index].toLong(10)
if (parent < 0) emptySet() else setOf(parent)
}
- TASK_STATUS -> getInt(TASK_STATUS)
- TASK_GROUP_ID -> getInt(TASK_GROUP_ID)
- TASK_USER_ID -> getInt(TASK_USER_ID)
else -> throw IllegalArgumentException("Invalid column")
}
-
- @Suppress("UNCHECKED_CAST")
- return res as T
}
- override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ override fun getBoolean(index: Int): Boolean {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(column: TableColumn<Int>): Int {
- return when (column) {
- TASK_REQ_NCPUS -> fields[COL_REQ_NCPUS].toInt(10)
- TASK_ALLOC_NCPUS -> fields[COL_ALLOC_NCPUS].toInt(10)
- TASK_STATUS -> fields[COL_STATUS].toInt(10)
- TASK_GROUP_ID -> fields[COL_GROUP_ID].toInt(10)
- TASK_USER_ID -> fields[COL_USER_ID].toInt(10)
+ override fun getInt(index: Int): Int {
+ return when (index) {
+ COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> fields[index].toInt(10)
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(column: TableColumn<Long>): Long {
+ override fun getLong(index: Int): Long {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(column: TableColumn<Double>): Double {
+ override fun getDouble(index: Int): Double {
throw IllegalArgumentException("Invalid column")
}
@@ -155,4 +134,17 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
private val COL_PART_NUM = 15
private val COL_PARENT_JOB = 16
private val COL_PARENT_THINK_TIME = 17
+
+ private val columns = mapOf(
+ TASK_ID to COL_JOB_ID,
+ TASK_SUBMIT_TIME to COL_SUBMIT_TIME,
+ TASK_WAIT_TIME to COL_WAIT_TIME,
+ TASK_RUNTIME to COL_RUN_TIME,
+ TASK_ALLOC_NCPUS to COL_ALLOC_NCPUS,
+ TASK_REQ_NCPUS to COL_REQ_NCPUS,
+ TASK_STATUS to COL_STATUS,
+ TASK_USER_ID to COL_USER_ID,
+ TASK_GROUP_ID to COL_GROUP_ID,
+ TASK_PARENTS to COL_PARENT_JOB
+ )
}
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt
deleted file mode 100644
index d4da735e..00000000
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTrace.kt
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.swf
-
-import org.opendc.trace.TABLE_TASKS
-import org.opendc.trace.Table
-import org.opendc.trace.Trace
-import java.nio.file.Path
-
-/**
- * [Trace] implementation for the SWF format.
- */
-public class SwfTrace internal constructor(private val path: Path) : Trace {
- override val tables: List<String> = listOf(TABLE_TASKS)
-
- override fun containsTable(name: String): Boolean = TABLE_TASKS == name
-
- override fun getTable(name: String): Table? {
- if (!containsTable(name)) {
- return null
- }
- return SwfTaskTable(path)
- }
-
- override fun toString(): String = "SwfTrace[$path]"
-}
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
index 36c3122e..1fd076d5 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
@@ -22,10 +22,11 @@
package org.opendc.trace.swf
+import org.opendc.trace.*
+import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
-import java.net.URL
-import java.nio.file.Paths
-import kotlin.io.path.exists
+import java.nio.file.Path
+import kotlin.io.path.bufferedReader
/**
* Support for the Standard Workload Format (SWF) in OpenDC.
@@ -35,9 +36,41 @@ import kotlin.io.path.exists
public class SwfTraceFormat : TraceFormat {
override val name: String = "swf"
- override fun open(url: URL): SwfTrace {
- val path = Paths.get(url.toURI())
- require(path.exists()) { "URL $url does not exist" }
- return SwfTrace(path)
+ override fun create(path: Path) {
+ throw UnsupportedOperationException("Writing not supported for this format")
+ }
+
+ override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
+
+ override fun getDetails(path: Path, table: String): TableDetails {
+ return when (table) {
+ TABLE_TASKS -> TableDetails(
+ listOf(
+ TASK_ID,
+ TASK_SUBMIT_TIME,
+ TASK_WAIT_TIME,
+ TASK_RUNTIME,
+ TASK_REQ_NCPUS,
+ TASK_ALLOC_NCPUS,
+ TASK_PARENTS,
+ TASK_STATUS,
+ TASK_GROUP_ID,
+ TASK_USER_ID
+ ),
+ emptyList()
+ )
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newReader(path: Path, table: String): TableReader {
+ return when (table) {
+ TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader())
+ else -> throw IllegalArgumentException("Table $table not supported")
+ }
+ }
+
+ override fun newWriter(path: Path, table: String): TableWriter {
+ throw UnsupportedOperationException("Writing not supported for this format")
}
}
diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
index 828c2bfa..4dcd43f6 100644
--- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
@@ -27,61 +27,38 @@ import org.junit.jupiter.api.Assertions.*
import org.opendc.trace.TABLE_TASKS
import org.opendc.trace.TASK_ALLOC_NCPUS
import org.opendc.trace.TASK_ID
-import java.net.URL
+import java.nio.file.Paths
/**
* Test suite for the [SwfTraceFormat] class.
*/
internal class SwfTraceFormatTest {
- @Test
- fun testTraceExists() {
- val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
- val format = SwfTraceFormat()
- assertDoesNotThrow {
- format.open(input)
- }
- }
-
- @Test
- fun testTraceDoesNotExists() {
- val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
- val format = SwfTraceFormat()
- assertThrows<IllegalArgumentException> {
- format.open(URL(input.toString() + "help"))
- }
- }
+ private val format = SwfTraceFormat()
@Test
fun testTables() {
- val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
- val trace = SwfTraceFormat().open(input)
+ val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI())
- assertEquals(listOf(TABLE_TASKS), trace.tables)
+ assertEquals(listOf(TABLE_TASKS), format.getTables(path))
}
@Test
fun testTableExists() {
- val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
- val table = SwfTraceFormat().open(input).getTable(TABLE_TASKS)
-
- assertNotNull(table)
- assertDoesNotThrow { table!!.newReader() }
+ val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI())
+ assertDoesNotThrow { format.getDetails(path, TABLE_TASKS) }
}
@Test
fun testTableDoesNotExist() {
- val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
- val trace = SwfTraceFormat().open(input)
+ val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI())
- assertFalse(trace.containsTable("test"))
- assertNull(trace.getTable("test"))
+ assertThrows<IllegalArgumentException> { format.getDetails(path, "test") }
}
@Test
fun testReader() {
- val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
- val trace = SwfTraceFormat().open(input)
- val reader = trace.getTable(TABLE_TASKS)!!.newReader()
+ val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI())
+ val reader = format.newReader(path, TABLE_TASKS)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -94,14 +71,4 @@ internal class SwfTraceFormatTest {
reader.close()
}
-
- @Test
- fun testReaderPartition() {
- val input = checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf"))
- val trace = SwfTraceFormat().open(input)
-
- assertThrows<IllegalArgumentException> {
- trace.getTable(TABLE_TASKS)!!.newReader("test")
- }
- }
}