summaryrefslogtreecommitdiff
path: root/opendc-trace
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-12 12:08:55 +0200
committerGitHub <noreply@github.com>2021-09-12 12:08:55 +0200
commit2cd3bd18e548a72d64afe0e7f59487f4747d722f (patch)
treedc9e2fba5ca4d19a90934a8b68dbb8110ee34bb7 /opendc-trace
parentcae193284570d6ee9dbacdde57b3e4e367aa9d9f (diff)
parent992b65396f55c0e12b36823d191dea8e03dd45ba (diff)
merge: Add support for new trace formats
This pull request updates the trace API with the addition of several new trace formats. - Add support for Materna traces from GWA - Keep reader state in own class - Parse last column in Solvinity trace format - Add support Azure VM traces - Add support for WfCommons (WorkflowHub) traces - Add API for accessing available table columns - Add synthetic resource table for Bitbrains format - Support dynamic resolving of trace formats **Breaking API Changes** - Replace `isSupported` by a list of `TableColumns`
Diffstat (limited to 'opendc-trace')
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt4
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt4
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt17
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt35
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt34
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt192
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt61
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt108
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt12
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt23
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt21
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt89
-rw-r--r--opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt12
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt27
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt18
-rw-r--r--opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt4
-rw-r--r--opendc-trace/opendc-trace-wfformat/build.gradle.kts37
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt56
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt234
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt47
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt47
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat1
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt345
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt133
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json1342
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt27
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt27
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt2
-rw-r--r--opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt22
29 files changed, 2740 insertions, 241 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
index 44dec95b..e2e5ea6d 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt
@@ -41,7 +41,7 @@ public val RESOURCE_START_TIME: TableColumn<Instant> = TableColumn("resource:sta
* End time for the resource.
*/
@JvmField
-public val RESOURCE_END_TIME: TableColumn<Instant> = TableColumn("resource:end_time", Instant::class.java)
+public val RESOURCE_STOP_TIME: TableColumn<Instant> = TableColumn("resource:stop_time", Instant::class.java)
/**
* Number of CPUs for the resource.
@@ -50,7 +50,7 @@ public val RESOURCE_END_TIME: TableColumn<Instant> = TableColumn("resource:end_t
public val RESOURCE_NCPUS: TableColumn<Int> = intColumn("resource:num_cpus")
/**
- * Memory capacity for the resource.
+ * Memory capacity for the resource in KB.
*/
@JvmField
public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = doubleColumn("resource:mem_capacity")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
index 11e5d6b7..6aca2051 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
@@ -37,9 +37,9 @@ public interface Table {
public val isSynthetic: Boolean
/**
- * Determine whether the specified [column] is supported by this table.
+ * The list of columns supported in this table.
*/
- public fun isSupported(column: TableColumn<*>): Boolean
+ public val columns: List<TableColumn<*>>
/**
* Open a [TableReader] for this table.
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt
index 88bbc623..46920dce 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt
@@ -23,49 +23,52 @@
@file:JvmName("TaskColumns")
package org.opendc.trace
+import java.time.Duration
+import java.time.Instant
+
/**
* A column containing the task identifier.
*/
@JvmField
-public val TASK_ID: TableColumn<Long> = longColumn("task:id")
+public val TASK_ID: TableColumn<String> = stringColumn("task:id")
/**
* A column containing the identifier of the workflow.
*/
@JvmField
-public val TASK_WORKFLOW_ID: TableColumn<Long> = longColumn("task:workflow_id")
+public val TASK_WORKFLOW_ID: TableColumn<String> = stringColumn("task:workflow_id")
/**
* A column containing the submit time of the task.
*/
@JvmField
-public val TASK_SUBMIT_TIME: TableColumn<Long> = longColumn("task:submit_time")
+public val TASK_SUBMIT_TIME: TableColumn<Instant> = TableColumn("task:submit_time", type = Instant::class.java)
/**
* A column containing the wait time of the task.
*/
@JvmField
-public val TASK_WAIT_TIME: TableColumn<Long> = longColumn("task:wait_time")
+public val TASK_WAIT_TIME: TableColumn<Instant> = TableColumn("task:wait_time", type = Instant::class.java)
/**
* A column containing the runtime time of the task.
*/
@JvmField
-public val TASK_RUNTIME: TableColumn<Long> = longColumn("task:runtime")
+public val TASK_RUNTIME: TableColumn<Duration> = TableColumn("task:runtime", type = Duration::class.java)
/**
* A column containing the parents of a task.
*/
@Suppress("UNCHECKED_CAST")
@JvmField
-public val TASK_PARENTS: TableColumn<Set<Long>> = TableColumn("task:parents", type = Set::class.java as Class<Set<Long>>)
+public val TASK_PARENTS: TableColumn<Set<String>> = TableColumn("task:parents", type = Set::class.java as Class<Set<String>>)
/**
* A column containing the children of a task.
*/
@Suppress("UNCHECKED_CAST")
@JvmField
-public val TASK_CHILDREN: TableColumn<Set<Long>> = TableColumn("task:children", type = Set::class.java as Class<Set<Long>>)
+public val TASK_CHILDREN: TableColumn<Set<String>> = TableColumn("task:children", type = Set::class.java as Class<Set<String>>)
/**
* A column containing the requested CPUs of a task.
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
index 36e93b52..0ae45e86 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
@@ -22,6 +22,11 @@
package org.opendc.trace
+import org.opendc.trace.spi.TraceFormat
+import java.io.File
+import java.net.URL
+import java.nio.file.Path
+
/**
* A trace is a collection of related tables that characterize a workload.
*/
@@ -40,4 +45,34 @@ public interface Trace {
* Obtain a [Table] with the specified [name].
*/
public fun getTable(name: String): Table?
+
+ public companion object {
+ /**
+ * Open a [Trace] at the specified [url] in the given [format].
+ *
+ * @throws IllegalArgumentException if [format] is not supported.
+ */
+ public fun open(url: URL, format: String): Trace {
+ val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" }
+ return provider.open(url)
+ }
+
+ /**
+ * Open a [Trace] at the specified [path] in the given [format].
+ *
+ * @throws IllegalArgumentException if [format] is not supported.
+ */
+ public fun open(path: File, format: String): Trace {
+ return open(path.toURI().toURL(), format)
+ }
+
+ /**
+ * Open a [Trace] at the specified [path] in the given [format].
+ *
+ * @throws IllegalArgumentException if [format] is not supported.
+ */
+ public fun open(path: Path, format: String): Trace {
+ return open(path.toUri().toURL(), format)
+ }
+ }
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
index 767ef919..c9e5954d 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt
@@ -33,7 +33,7 @@ import kotlin.io.path.nameWithoutExtension
/**
* The resource state [Table] in the Bitbrains format.
*/
-internal class BitbrainsResourceStateTable(private val factory: CsvFactory, private val path: Path) : Table {
+internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path: Path) : Table {
/**
* The partitions that belong to the table.
*/
@@ -41,28 +41,26 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, priv
Files.walk(path, 1)
.filter { !Files.isDirectory(it) && it.extension == "csv" }
.collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
override val name: String = TABLE_RESOURCE_STATES
override val isSynthetic: Boolean = false
- override fun isSupported(column: TableColumn<*>): Boolean {
- return when (column) {
- RESOURCE_STATE_ID -> true
- RESOURCE_STATE_TIMESTAMP -> true
- RESOURCE_STATE_NCPUS -> true
- RESOURCE_STATE_CPU_CAPACITY -> true
- RESOURCE_STATE_CPU_USAGE -> true
- RESOURCE_STATE_CPU_USAGE_PCT -> true
- RESOURCE_STATE_MEM_CAPACITY -> true
- RESOURCE_STATE_MEM_USAGE -> true
- RESOURCE_STATE_DISK_READ -> true
- RESOURCE_STATE_DISK_WRITE -> true
- RESOURCE_STATE_NET_RX -> true
- RESOURCE_STATE_NET_TX -> true
- else -> false
- }
- }
+ override val columns: List<TableColumn<*>> = listOf(
+ RESOURCE_STATE_ID,
+ RESOURCE_STATE_TIMESTAMP,
+ RESOURCE_STATE_NCPUS,
+ RESOURCE_STATE_CPU_CAPACITY,
+ RESOURCE_STATE_CPU_USAGE,
+ RESOURCE_STATE_CPU_USAGE_PCT,
+ RESOURCE_STATE_MEM_CAPACITY,
+ RESOURCE_STATE_MEM_USAGE,
+ RESOURCE_STATE_DISK_READ,
+ RESOURCE_STATE_DISK_WRITE,
+ RESOURCE_STATE_NET_RX,
+ RESOURCE_STATE_NET_TX,
+ )
override fun newReader(): TableReader {
val it = partitions.iterator()
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
index 5687ac7f..dab784c2 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
@@ -22,20 +22,42 @@
package org.opendc.trace.bitbrains
+import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.JsonToken
import com.fasterxml.jackson.dataformat.csv.CsvParser
import com.fasterxml.jackson.dataformat.csv.CsvSchema
import org.opendc.trace.*
+import java.text.NumberFormat
import java.time.Instant
+import java.time.LocalDateTime
+import java.time.ZoneOffset
+import java.time.format.DateTimeFormatter
+import java.time.format.DateTimeParseException
+import java.util.*
/**
* A [TableReader] for the Bitbrains resource state table.
*/
internal class BitbrainsResourceStateTableReader(private val partition: String, private val parser: CsvParser) : TableReader {
/**
- * The current parser state.
+ * The [DateTimeFormatter] used to parse the timestamps in case of the Materna trace.
*/
- private val state = RowState()
+ private val formatter = DateTimeFormatter.ofPattern("dd.MM.yyyy HH:mm:ss")
+
+ /**
+ * The type of timestamps in the trace.
+ */
+ private var timestampType: TimestampType = TimestampType.UNDECIDED
+
+ /**
+ * The [NumberFormat] used to parse doubles containing a comma.
+ */
+ private val nf = NumberFormat.getInstance(Locale.GERMAN)
+
+ /**
+ * A flag to indicate that the trace contains decimals with a comma separator.
+ */
+ private var usesCommaDecimalSeparator = false
init {
parser.schema = schema
@@ -43,7 +65,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun nextRow(): Boolean {
// Reset the row state
- state.reset()
+ reset()
if (!nextStart()) {
return false
@@ -57,17 +79,32 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
}
when (parser.currentName) {
- "Timestamp [ms]" -> state.timestamp = Instant.ofEpochSecond(parser.longValue)
- "CPU cores" -> state.cpuCores = parser.intValue
- "CPU capacity provisioned [MHZ]" -> state.cpuCapacity = parser.doubleValue
- "CPU usage [MHZ]" -> state.cpuUsage = parser.doubleValue
- "CPU usage [%]" -> state.cpuUsagePct = parser.doubleValue
- "Memory capacity provisioned [KB]" -> state.memCapacity = parser.doubleValue
- "Memory usage [KB]" -> state.memUsage = parser.doubleValue
- "Disk read throughput [KB/s]" -> state.diskRead = parser.doubleValue
- "Disk write throughput [KB/s]" -> state.diskWrite = parser.doubleValue
- "Network received throughput [KB/s]" -> state.netReceived = parser.doubleValue
- "Network transmitted throughput [KB/s]" -> state.netTransmitted = parser.doubleValue
+ "Timestamp [ms]" -> {
+ timestamp = when (timestampType) {
+ TimestampType.UNDECIDED -> {
+ try {
+ val res = LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC)
+ timestampType = TimestampType.DATE_TIME
+ res
+ } catch (e: DateTimeParseException) {
+ timestampType = TimestampType.EPOCH_MILLIS
+ Instant.ofEpochSecond(parser.longValue)
+ }
+ }
+ TimestampType.DATE_TIME -> LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC)
+ TimestampType.EPOCH_MILLIS -> Instant.ofEpochSecond(parser.longValue)
+ }
+ }
+ "CPU cores" -> cpuCores = parser.intValue
+ "CPU capacity provisioned [MHZ]" -> cpuCapacity = parseSafeDouble()
+ "CPU usage [MHZ]" -> cpuUsage = parseSafeDouble()
+ "CPU usage [%]" -> cpuUsagePct = parseSafeDouble() / 100.0 // Convert to range [0, 1]
+ "Memory capacity provisioned [KB]" -> memCapacity = parseSafeDouble()
+ "Memory usage [KB]" -> memUsage = parseSafeDouble()
+ "Disk read throughput [KB/s]" -> diskRead = parseSafeDouble()
+ "Disk write throughput [KB/s]" -> diskWrite = parseSafeDouble()
+ "Network received throughput [KB/s]" -> netReceived = parseSafeDouble()
+ "Network transmitted throughput [KB/s]" -> netTransmitted = parseSafeDouble()
}
}
@@ -95,17 +132,17 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun <T> get(column: TableColumn<T>): T {
val res: Any? = when (column) {
RESOURCE_STATE_ID -> partition
- RESOURCE_STATE_TIMESTAMP -> state.timestamp
- RESOURCE_STATE_NCPUS -> state.cpuCores
- RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct
- RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
- RESOURCE_STATE_MEM_USAGE -> state.memUsage
- RESOURCE_STATE_DISK_READ -> state.diskRead
- RESOURCE_STATE_DISK_WRITE -> state.diskWrite
- RESOURCE_STATE_NET_RX -> state.netReceived
- RESOURCE_STATE_NET_TX -> state.netTransmitted
+ RESOURCE_STATE_TIMESTAMP -> timestamp
+ RESOURCE_STATE_NCPUS -> cpuCores
+ RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
+ RESOURCE_STATE_MEM_CAPACITY -> memCapacity
+ RESOURCE_STATE_MEM_USAGE -> memUsage
+ RESOURCE_STATE_DISK_READ -> diskRead
+ RESOURCE_STATE_DISK_WRITE -> diskWrite
+ RESOURCE_STATE_NET_RX -> netReceived
+ RESOURCE_STATE_NET_TX -> netTransmitted
else -> throw IllegalArgumentException("Invalid column")
}
@@ -119,7 +156,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun getInt(column: TableColumn<Int>): Int {
return when (column) {
- RESOURCE_STATE_NCPUS -> state.cpuCores
+ RESOURCE_STATE_NCPUS -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -130,15 +167,15 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun getDouble(column: TableColumn<Double>): Double {
return when (column) {
- RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity
- RESOURCE_STATE_CPU_USAGE -> state.cpuUsage
- RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct
- RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity
- RESOURCE_STATE_MEM_USAGE -> state.memUsage
- RESOURCE_STATE_DISK_READ -> state.diskRead
- RESOURCE_STATE_DISK_WRITE -> state.diskWrite
- RESOURCE_STATE_NET_RX -> state.netReceived
- RESOURCE_STATE_NET_TX -> state.netTransmitted
+ RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity
+ RESOURCE_STATE_CPU_USAGE -> cpuUsage
+ RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct
+ RESOURCE_STATE_MEM_CAPACITY -> memCapacity
+ RESOURCE_STATE_MEM_USAGE -> memUsage
+ RESOURCE_STATE_DISK_READ -> diskRead
+ RESOURCE_STATE_DISK_WRITE -> diskWrite
+ RESOURCE_STATE_NET_RX -> netReceived
+ RESOURCE_STATE_NET_TX -> netTransmitted
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -161,37 +198,62 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
}
/**
- * The current row state.
+ * Try to parse the current value safely as double.
*/
- private class RowState {
- var timestamp: Instant? = null
- var cpuCores = -1
- var cpuCapacity = Double.NaN
- var cpuUsage = Double.NaN
- var cpuUsagePct = Double.NaN
- var memCapacity = Double.NaN
- var memUsage = Double.NaN
- var diskRead = Double.NaN
- var diskWrite = Double.NaN
- var netReceived = Double.NaN
- var netTransmitted = Double.NaN
+ private fun parseSafeDouble(): Double {
+ if (!usesCommaDecimalSeparator) {
+ try {
+ return parser.doubleValue
+ } catch (e: JsonParseException) {
+ usesCommaDecimalSeparator = true
+ }
+ }
- /**
- * Reset the state.
- */
- fun reset() {
- timestamp = null
- cpuCores = -1
- cpuCapacity = Double.NaN
- cpuUsage = Double.NaN
- cpuUsagePct = Double.NaN
- memCapacity = Double.NaN
- memUsage = Double.NaN
- diskRead = Double.NaN
- diskWrite = Double.NaN
- netReceived = Double.NaN
- netTransmitted = Double.NaN
+ val text = parser.text
+ if (text.isBlank()) {
+ return 0.0
}
+
+ return nf.parse(text).toDouble()
+ }
+
+ /**
+ * State fields of the reader.
+ */
+ private var timestamp: Instant? = null
+ private var cpuCores = -1
+ private var cpuCapacity = Double.NaN
+ private var cpuUsage = Double.NaN
+ private var cpuUsagePct = Double.NaN
+ private var memCapacity = Double.NaN
+ private var memUsage = Double.NaN
+ private var diskRead = Double.NaN
+ private var diskWrite = Double.NaN
+ private var netReceived = Double.NaN
+ private var netTransmitted = Double.NaN
+
+ /**
+ * Reset the state.
+ */
+ private fun reset() {
+ timestamp = null
+ cpuCores = -1
+ cpuCapacity = Double.NaN
+ cpuUsage = Double.NaN
+ cpuUsagePct = Double.NaN
+ memCapacity = Double.NaN
+ memUsage = Double.NaN
+ diskRead = Double.NaN
+ diskWrite = Double.NaN
+ netReceived = Double.NaN
+ netTransmitted = Double.NaN
+ }
+
+ /**
+ * The type of the timestamp in the trace.
+ */
+ private enum class TimestampType {
+ UNDECIDED, DATE_TIME, EPOCH_MILLIS
}
companion object {
@@ -199,15 +261,17 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
* The [CsvSchema] that is used to parse the trace.
*/
private val schema = CsvSchema.builder()
- .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER_OR_STRING)
.addColumn("CPU cores", CsvSchema.ColumnType.NUMBER)
.addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER)
.addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER)
.addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER)
.addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER)
.addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory usage [%]", CsvSchema.ColumnType.NUMBER)
.addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
.addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Disk size [GB]", CsvSchema.ColumnType.NUMBER)
.addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
.addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
.setAllowComments(true)
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt
new file mode 100644
index 00000000..bc4f0b7d
--- /dev/null
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.*
+import java.nio.file.Files
+import java.nio.file.Path
+import java.util.stream.Collectors
+import kotlin.io.path.extension
+import kotlin.io.path.nameWithoutExtension
+
+/**
+ * The resources [Table] in the Bitbrains format.
+ */
+internal class BitbrainsResourceTable(private val factory: CsvFactory, path: Path) : Table {
+ /**
+ * The VMs that belong to the table.
+ */
+ private val vms =
+ Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "csv" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
+
+ override val name: String = TABLE_RESOURCES
+
+ override val isSynthetic: Boolean = true
+
+ override val columns: List<TableColumn<*>> = listOf(RESOURCE_ID)
+
+ override fun newReader(): TableReader {
+ return BitbrainsResourceTableReader(factory, vms)
+ }
+
+ override fun newReader(partition: String): TableReader {
+ throw IllegalArgumentException("Unknown partition $partition")
+ }
+
+ override fun toString(): String = "BitbrainsResourceTable"
+}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt
new file mode 100644
index 00000000..c02dc5ae
--- /dev/null
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.bitbrains
+
+import com.fasterxml.jackson.dataformat.csv.CsvFactory
+import org.opendc.trace.*
+import java.nio.file.Path
+
+/**
+ * A [TableReader] for the Bitbrains resource table.
+ */
+internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms: Map<String, Path>) : TableReader {
+ /**
+ * An iterator to iterate over the resource entries.
+ */
+ private val it = vms.iterator()
+
+ override fun nextRow(): Boolean {
+ reset()
+
+ while (it.hasNext()) {
+ val (name, path) = it.next()
+
+ val parser = factory.createParser(path.toFile())
+ val reader = BitbrainsResourceStateTableReader(name, parser)
+
+ try {
+ if (!reader.nextRow()) {
+ continue
+ }
+
+ id = reader.get(RESOURCE_STATE_ID)
+ return true
+ } finally {
+ reader.close()
+ }
+ }
+
+ return false
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ RESOURCE_ID -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val res: Any? = when (column) {
+ RESOURCE_ID -> id
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {}
+
+ /**
+ * State fields of the reader.
+ */
+ private var id: String? = null
+
+ /**
+ * Reset the state of the reader.
+ */
+ private fun reset() {
+ id = null
+ }
+}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt
index 5a2d4243..bcd8dd52 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt
@@ -30,16 +30,16 @@ import java.nio.file.Path
* [Trace] implementation for the Bitbrains format.
*/
public class BitbrainsTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace {
- override val tables: List<String> = listOf(TABLE_RESOURCE_STATES)
+ override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
- override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name
+ override fun containsTable(name: String): Boolean = tables.contains(name)
override fun getTable(name: String): Table? {
- if (!containsTable(name)) {
- return null
+ return when (name) {
+ TABLE_RESOURCES -> BitbrainsResourceTable(factory, path)
+ TABLE_RESOURCE_STATES -> BitbrainsResourceStateTable(factory, path)
+ else -> null
}
-
- return BitbrainsResourceStateTable(factory, path)
}
override fun toString(): String = "BitbrainsTrace[$path]"
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
index 550805d3..ff4a33f8 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
@@ -25,9 +25,7 @@ package org.opendc.trace.bitbrains
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
-import org.opendc.trace.RESOURCE_STATE_CPU_USAGE
-import org.opendc.trace.RESOURCE_STATE_TIMESTAMP
-import org.opendc.trace.TABLE_RESOURCE_STATES
+import org.opendc.trace.*
import java.net.URL
/**
@@ -58,7 +56,7 @@ class BitbrainsTraceFormatTest {
val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
val trace = format.open(url)
- assertEquals(listOf(TABLE_RESOURCE_STATES), trace.tables)
+ assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables)
}
@Test
@@ -82,6 +80,23 @@ class BitbrainsTraceFormatTest {
}
@Test
+ fun testResources() {
+ val format = BitbrainsTraceFormat()
+ val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
+ val trace = format.open(url)
+
+ val reader = trace.getTable(TABLE_RESOURCES)!!.newReader()
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("bitbrains", reader.get(RESOURCE_ID)) },
+ { assertFalse(reader.nextRow()) }
+ )
+
+ reader.close()
+ }
+
+ @Test
fun testSmoke() {
val format = BitbrainsTraceFormat()
val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv"))
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt
index 80a99d10..fd7bd068 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt
@@ -34,18 +34,15 @@ internal class GwfTaskTable(private val factory: CsvFactory, private val url: UR
override val isSynthetic: Boolean = false
- override fun isSupported(column: TableColumn<*>): Boolean {
- return when (column) {
- TASK_WORKFLOW_ID -> true
- TASK_ID -> true
- TASK_SUBMIT_TIME -> true
- TASK_RUNTIME -> true
- TASK_REQ_NCPUS -> true
- TASK_ALLOC_NCPUS -> true
- TASK_PARENTS -> true
- else -> false
- }
- }
+ override val columns: List<TableColumn<*>> = listOf(
+ TASK_WORKFLOW_ID,
+ TASK_ID,
+ TASK_SUBMIT_TIME,
+ TASK_RUNTIME,
+ TASK_REQ_NCPUS,
+ TASK_ALLOC_NCPUS,
+ TASK_PARENTS
+ )
override fun newReader(): TableReader {
return GwfTaskTableReader(factory.createParser(url))
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
index 64b7d465..39eb5520 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
@@ -26,24 +26,21 @@ import com.fasterxml.jackson.core.JsonToken
import com.fasterxml.jackson.dataformat.csv.CsvParser
import com.fasterxml.jackson.dataformat.csv.CsvSchema
import org.opendc.trace.*
+import java.time.Duration
+import java.time.Instant
import java.util.regex.Pattern
/**
* A [TableReader] implementation for the GWF format.
*/
internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
- /**
- * The current parser state.
- */
- private val state = RowState()
-
init {
parser.schema = schema
}
override fun nextRow(): Boolean {
// Reset the row state
- state.reset()
+ reset()
if (!nextStart()) {
return false
@@ -57,12 +54,12 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
}
when (parser.currentName) {
- "WorkflowID" -> state.workflowId = parser.longValue
- "JobID" -> state.jobId = parser.longValue
- "SubmitTime" -> state.submitTime = parser.longValue
- "RunTime" -> state.runtime = parser.longValue
- "NProcs" -> state.nProcs = parser.intValue
- "ReqNProcs" -> state.reqNProcs = parser.intValue
+ "WorkflowID" -> workflowId = parser.text
+ "JobID" -> jobId = parser.text
+ "SubmitTime" -> submitTime = Instant.ofEpochSecond(parser.longValue)
+ "RunTime" -> runtime = Duration.ofSeconds(parser.longValue)
+ "NProcs" -> nProcs = parser.intValue
+ "ReqNProcs" -> reqNProcs = parser.intValue
"Dependencies" -> parseParents(parser.valueAsString)
}
}
@@ -84,14 +81,14 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
}
override fun <T> get(column: TableColumn<T>): T {
- val res: Any = when (column) {
- TASK_WORKFLOW_ID -> state.workflowId
- TASK_ID -> state.jobId
- TASK_SUBMIT_TIME -> state.submitTime
- TASK_RUNTIME -> state.runtime
- TASK_REQ_NCPUS -> state.nProcs
- TASK_ALLOC_NCPUS -> state.reqNProcs
- TASK_PARENTS -> state.dependencies
+ val res: Any? = when (column) {
+ TASK_WORKFLOW_ID -> workflowId
+ TASK_ID -> jobId
+ TASK_SUBMIT_TIME -> submitTime
+ TASK_RUNTIME -> runtime
+ TASK_REQ_NCPUS -> nProcs
+ TASK_ALLOC_NCPUS -> reqNProcs
+ TASK_PARENTS -> dependencies
else -> throw IllegalArgumentException("Invalid column")
}
@@ -105,20 +102,14 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
override fun getInt(column: TableColumn<Int>): Int {
return when (column) {
- TASK_REQ_NCPUS -> state.nProcs
- TASK_ALLOC_NCPUS -> state.reqNProcs
+ TASK_REQ_NCPUS -> nProcs
+ TASK_ALLOC_NCPUS -> reqNProcs
else -> throw IllegalArgumentException("Invalid column")
}
}
override fun getLong(column: TableColumn<Long>): Long {
- return when (column) {
- TASK_WORKFLOW_ID -> state.workflowId
- TASK_ID -> state.jobId
- TASK_SUBMIT_TIME -> state.submitTime
- TASK_RUNTIME -> state.runtime
- else -> throw IllegalArgumentException("Invalid column")
- }
+ throw IllegalArgumentException("Invalid column")
}
override fun getDouble(column: TableColumn<Double>): Double {
@@ -166,29 +157,27 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
}
/**
- * The current row state.
+ * Reader state fields.
*/
- private class RowState {
- var workflowId = -1L
- var jobId = -1L
- var submitTime = -1L
- var runtime = -1L
- var nProcs = -1
- var reqNProcs = -1
- var dependencies = emptySet<Long>()
+ private var workflowId: String? = null
+ private var jobId: String? = null
+ private var submitTime: Instant? = null
+ private var runtime: Duration? = null
+ private var nProcs = -1
+ private var reqNProcs = -1
+ private var dependencies = emptySet<Long>()
- /**
- * Reset the state.
- */
- fun reset() {
- workflowId = -1
- jobId = -1
- submitTime = -1
- runtime = -1
- nProcs = -1
- reqNProcs = -1
- dependencies = emptySet()
- }
+ /**
+ * Reset the state.
+ */
+ private fun reset() {
+ workflowId = null
+ jobId = null
+ submitTime = null
+ runtime = null
+ nProcs = -1
+ reqNProcs = -1
+ dependencies = emptySet()
}
companion object {
diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
index 6b0568fe..b209b979 100644
--- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
@@ -29,6 +29,8 @@ import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
import org.opendc.trace.*
import java.net.URL
+import java.time.Duration
+import java.time.Instant
/**
* Test suite for the [GwfTraceFormat] class.
@@ -90,11 +92,11 @@ internal class GwfTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals(0L, reader.getLong(TASK_WORKFLOW_ID)) },
- { assertEquals(1L, reader.getLong(TASK_ID)) },
- { assertEquals(16, reader.getLong(TASK_SUBMIT_TIME)) },
- { assertEquals(11, reader.getLong(TASK_RUNTIME)) },
- { assertEquals(setOf<Long>(), reader.get(TASK_PARENTS)) },
+ { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) },
+ { assertEquals("1", reader.get(TASK_ID)) },
+ { assertEquals(Instant.ofEpochSecond(16), reader.get(TASK_SUBMIT_TIME)) },
+ { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) },
+ { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) },
)
}
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt
index 12a51a2f..7ec0d607 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt
@@ -34,21 +34,18 @@ internal class SwfTaskTable(private val path: Path) : Table {
override val isSynthetic: Boolean = false
- override fun isSupported(column: TableColumn<*>): Boolean {
- return when (column) {
- TASK_ID -> true
- TASK_SUBMIT_TIME -> true
- TASK_WAIT_TIME -> true
- TASK_RUNTIME -> true
- TASK_REQ_NCPUS -> true
- TASK_ALLOC_NCPUS -> true
- TASK_PARENTS -> true
- TASK_STATUS -> true
- TASK_GROUP_ID -> true
- TASK_USER_ID -> true
- else -> false
- }
- }
+ override val columns: List<TableColumn<*>> = listOf(
+ TASK_ID,
+ TASK_SUBMIT_TIME,
+ TASK_WAIT_TIME,
+ TASK_RUNTIME,
+ TASK_REQ_NCPUS,
+ TASK_ALLOC_NCPUS,
+ TASK_PARENTS,
+ TASK_STATUS,
+ TASK_GROUP_ID,
+ TASK_USER_ID
+ )
override fun newReader(): TableReader {
val reader = path.bufferedReader()
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
index 5f879a54..3f49c770 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
@@ -24,6 +24,8 @@ package org.opendc.trace.swf
import org.opendc.trace.*
import java.io.BufferedReader
+import java.time.Duration
+import java.time.Instant
/**
* A [TableReader] implementation for the SWF format.
@@ -85,10 +87,10 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
override fun <T> get(column: TableColumn<T>): T {
val res: Any = when (column) {
- TASK_ID -> getLong(TASK_ID)
- TASK_SUBMIT_TIME -> getLong(TASK_SUBMIT_TIME)
- TASK_WAIT_TIME -> getLong(TASK_WAIT_TIME)
- TASK_RUNTIME -> getLong(TASK_RUNTIME)
+ TASK_ID -> fields[COL_JOB_ID]
+ TASK_SUBMIT_TIME -> Instant.ofEpochSecond(fields[COL_SUBMIT_TIME].toLong(10))
+ TASK_WAIT_TIME -> Duration.ofSeconds(fields[COL_WAIT_TIME].toLong(10))
+ TASK_RUNTIME -> Duration.ofSeconds(fields[COL_RUN_TIME].toLong(10))
TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS)
TASK_ALLOC_NCPUS -> getInt(TASK_ALLOC_NCPUS)
TASK_PARENTS -> {
@@ -121,13 +123,7 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
}
override fun getLong(column: TableColumn<Long>): Long {
- return when (column) {
- TASK_ID -> fields[COL_JOB_ID].toLong(10)
- TASK_SUBMIT_TIME -> fields[COL_SUBMIT_TIME].toLong(10)
- TASK_WAIT_TIME -> fields[COL_WAIT_TIME].toLong(10)
- TASK_RUNTIME -> fields[COL_RUN_TIME].toLong(10)
- else -> throw IllegalArgumentException("Invalid column")
- }
+ throw IllegalArgumentException("Invalid column")
}
override fun getDouble(column: TableColumn<Double>): Double {
diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
index 9686891b..828c2bfa 100644
--- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
@@ -85,10 +85,10 @@ internal class SwfTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals(1, reader.getLong(TASK_ID)) },
+ { assertEquals("1", reader.get(TASK_ID)) },
{ assertEquals(306, reader.getInt(TASK_ALLOC_NCPUS)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals(2, reader.getLong(TASK_ID)) },
+ { assertEquals("2", reader.get(TASK_ID)) },
{ assertEquals(17, reader.getInt(TASK_ALLOC_NCPUS)) },
)
diff --git a/opendc-trace/opendc-trace-wfformat/build.gradle.kts b/opendc-trace/opendc-trace-wfformat/build.gradle.kts
new file mode 100644
index 00000000..2d336d03
--- /dev/null
+++ b/opendc-trace/opendc-trace-wfformat/build.gradle.kts
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Support for WfCommons workload traces in OpenDC"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+ `testing-conventions`
+ `jacoco-conventions`
+}
+
+dependencies {
+ api(platform(projects.opendcPlatform))
+ api(projects.opendcTrace.opendcTraceApi)
+
+ implementation(libs.jackson.core)
+}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt
new file mode 100644
index 00000000..7b7f979f
--- /dev/null
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wfformat
+
+import com.fasterxml.jackson.core.JsonFactory
+import org.opendc.trace.*
+import java.nio.file.Path
+
+/**
+ * A [Table] containing the tasks in a WfCommons workload trace.
+ */
+internal class WfFormatTaskTable(private val factory: JsonFactory, private val path: Path) : Table {
+ override val name: String = TABLE_TASKS
+
+ override val isSynthetic: Boolean = false
+
+ override val columns: List<TableColumn<*>> = listOf(
+ TASK_ID,
+ TASK_WORKFLOW_ID,
+ TASK_RUNTIME,
+ TASK_REQ_NCPUS,
+ TASK_PARENTS,
+ TASK_CHILDREN
+ )
+
+ override fun newReader(): TableReader {
+ val parser = factory.createParser(path.toFile())
+ return WfFormatTaskTableReader(parser)
+ }
+
+ override fun newReader(partition: String): TableReader {
+ throw IllegalArgumentException("Invalid partition $partition")
+ }
+
+ override fun toString(): String = "WfFormatTaskTable"
+}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
new file mode 100644
index 00000000..4408ba5c
--- /dev/null
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
@@ -0,0 +1,234 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wfformat
+
+import com.fasterxml.jackson.core.JsonParseException
+import com.fasterxml.jackson.core.JsonParser
+import com.fasterxml.jackson.core.JsonToken
+import org.opendc.trace.*
+import java.time.Duration
+import kotlin.math.roundToInt
+
+/**
+ * A [TableReader] implementation for the WfCommons workload trace format.
+ */
+internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableReader {
+ /**
+ * The current nesting of the parser.
+ */
+ private var level: ParserLevel = ParserLevel.TOP
+
+ override fun nextRow(): Boolean {
+ reset()
+
+ var hasJob = false
+
+ while (!hasJob) {
+ when (level) {
+ ParserLevel.TOP -> {
+ val token = parser.nextToken()
+
+ // Check whether the document is not empty and starts with an object
+ if (token == null) {
+ break
+ } else if (token != JsonToken.START_OBJECT) {
+ throw JsonParseException(parser, "Expected object", parser.currentLocation)
+ } else {
+ level = ParserLevel.TRACE
+ }
+ }
+ ParserLevel.TRACE -> {
+ // Seek for the workflow object in the file
+ if (!seekWorkflow()) {
+ break
+ } else if (!parser.isExpectedStartObjectToken) {
+ throw JsonParseException(parser, "Expected object", parser.currentLocation)
+ } else {
+ level = ParserLevel.WORKFLOW
+ }
+ }
+ ParserLevel.WORKFLOW -> {
+ // Seek for the jobs object in the file
+ level = if (!seekJobs()) {
+ ParserLevel.TRACE
+ } else if (!parser.isExpectedStartArrayToken) {
+ throw JsonParseException(parser, "Expected array", parser.currentLocation)
+ } else {
+ ParserLevel.JOB
+ }
+ }
+ ParserLevel.JOB -> {
+ when (parser.nextToken()) {
+ JsonToken.END_ARRAY -> level = ParserLevel.WORKFLOW
+ JsonToken.START_OBJECT -> {
+ parseJob()
+ hasJob = true
+ break
+ }
+ else -> throw JsonParseException(parser, "Unexpected token", parser.currentLocation)
+ }
+ }
+ }
+ }
+
+ return hasJob
+ }
+
+ override fun hasColumn(column: TableColumn<*>): Boolean {
+ return when (column) {
+ TASK_ID -> true
+ TASK_WORKFLOW_ID -> true
+ TASK_RUNTIME -> true
+ TASK_REQ_NCPUS -> true
+ TASK_PARENTS -> true
+ TASK_CHILDREN -> true
+ else -> false
+ }
+ }
+
+ override fun <T> get(column: TableColumn<T>): T {
+ val res: Any? = when (column) {
+ TASK_ID -> id
+ TASK_WORKFLOW_ID -> workflowId
+ TASK_RUNTIME -> runtime
+ TASK_PARENTS -> parents
+ TASK_CHILDREN -> children
+ TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ return res as T
+ }
+
+ override fun getBoolean(column: TableColumn<Boolean>): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(column: TableColumn<Int>): Int {
+ return when (column) {
+ TASK_REQ_NCPUS -> cores
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(column: TableColumn<Long>): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(column: TableColumn<Double>): Double {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {
+ parser.close()
+ }
+
+ /**
+ * Parse the trace and seek until the workflow description.
+ */
+ private fun seekWorkflow(): Boolean {
+ while (parser.nextValue() != JsonToken.END_OBJECT) {
+ when (parser.currentName) {
+ "name" -> workflowId = parser.text
+ "workflow" -> return true
+ else -> parser.skipChildren()
+ }
+ }
+
+ return false
+ }
+
+ /**
+ * Parse the workflow description in the file and seek until the first job.
+ */
+ private fun seekJobs(): Boolean {
+ while (parser.nextValue() != JsonToken.END_OBJECT) {
+ when (parser.currentName) {
+ "jobs" -> return true
+ else -> parser.skipChildren()
+ }
+ }
+
+ return false
+ }
+
+ /**
+ * Parse a single job in the file.
+ */
+ private fun parseJob() {
+ while (parser.nextValue() != JsonToken.END_OBJECT) {
+ when (parser.currentName) {
+ "name" -> id = parser.text
+ "parents" -> parents = parseIds()
+ "children" -> children = parseIds()
+ "runtime" -> runtime = Duration.ofSeconds(parser.numberValue.toLong())
+ "cores" -> cores = parser.floatValue.roundToInt()
+ else -> parser.skipChildren()
+ }
+ }
+ }
+
+ /**
+ * Parse the parents/children of a job.
+ */
+ private fun parseIds(): Set<String> {
+ if (!parser.isExpectedStartArrayToken) {
+ throw JsonParseException(parser, "Expected array", parser.currentLocation)
+ }
+
+ val ids = mutableSetOf<String>()
+
+ while (parser.nextToken() != JsonToken.END_ARRAY) {
+ if (parser.currentToken != JsonToken.VALUE_STRING) {
+ throw JsonParseException(parser, "Expected token", parser.currentLocation)
+ }
+
+ ids.add(parser.valueAsString)
+ }
+
+ return ids
+ }
+
+ private enum class ParserLevel {
+ TOP, TRACE, WORKFLOW, JOB
+ }
+
+ /**
+ * State fields for the parser.
+ */
+ private var id: String? = null
+ private var workflowId: String? = null
+ private var runtime: Duration? = null
+ private var parents: Set<String>? = null
+ private var children: Set<String>? = null
+ private var cores = -1
+
+ private fun reset() {
+ id = null
+ runtime = null
+ parents = null
+ children = null
+ cores = -1
+ }
+}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt
new file mode 100644
index 00000000..2d9c79fb
--- /dev/null
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wfformat
+
+import com.fasterxml.jackson.core.JsonFactory
+import org.opendc.trace.TABLE_TASKS
+import org.opendc.trace.Table
+import org.opendc.trace.Trace
+import java.nio.file.Path
+
+/**
+ * [Trace] implementation for the WfCommons workload trace format.
+ */
+public class WfFormatTrace internal constructor(private val factory: JsonFactory, private val path: Path) : Trace {
+ override val tables: List<String> = listOf(TABLE_TASKS)
+
+ override fun containsTable(name: String): Boolean = TABLE_TASKS == name
+
+ override fun getTable(name: String): Table? {
+ return when (name) {
+ TABLE_TASKS -> WfFormatTaskTable(factory, path)
+ else -> null
+ }
+ }
+
+ override fun toString(): String = "WfFormatTrace[$path]"
+}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
new file mode 100644
index 00000000..ff8d054c
--- /dev/null
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wfformat
+
+import com.fasterxml.jackson.core.JsonFactory
+import org.opendc.trace.spi.TraceFormat
+import java.net.URL
+import java.nio.file.Paths
+import kotlin.io.path.exists
+
+/**
+ * A [TraceFormat] implementation for the WfCommons workload trace format.
+ */
+public class WfFormatTraceFormat : TraceFormat {
+ /**
+ * The [JsonFactory] that is used to created JSON parsers.
+ */
+ private val factory = JsonFactory()
+
+ override val name: String = "wfformat"
+
+ override fun open(url: URL): WfFormatTrace {
+ val path = Paths.get(url.toURI())
+ require(path.exists()) { "URL $url does not exist" }
+ return WfFormatTrace(factory, path)
+ }
+}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
new file mode 100644
index 00000000..ee3aa2f6
--- /dev/null
+++ b/opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat
@@ -0,0 +1 @@
+org.opendc.trace.wfformat.WfFormatTraceFormat
diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt
new file mode 100644
index 00000000..b07f27ed
--- /dev/null
+++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt
@@ -0,0 +1,345 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wfformat
+
+import com.fasterxml.jackson.core.JsonFactory
+import com.fasterxml.jackson.core.JsonParseException
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.junit.jupiter.api.assertThrows
+import org.opendc.trace.TASK_ID
+import org.opendc.trace.TASK_PARENTS
+
+/**
+ * Test suite for the [WfFormatTaskTableReader] class.
+ */
+internal class WfFormatTaskTableReaderTest {
+ /**
+ * The [JsonFactory] used to construct the parser.
+ */
+ private val factory = JsonFactory()
+
+ @Test
+ fun testEmptyInput() {
+ val content = ""
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertFalse(reader.nextRow())
+ reader.close()
+ }
+
+ @Test
+ fun testTopLevelArrayInput() {
+ val content = "[]"
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertThrows<JsonParseException> {
+ while (reader.nextRow()) {
+ continue
+ }
+ }
+
+ reader.close()
+ }
+
+ @Test
+ fun testNoWorkflow() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon"
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertDoesNotThrow {
+ while (reader.nextRow()) {
+ continue
+ }
+ }
+
+ reader.close()
+ }
+
+ @Test
+ fun testWorkflowArrayType() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": []
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertThrows<JsonParseException> {
+ while (reader.nextRow()) {
+ continue
+ }
+ }
+
+ reader.close()
+ }
+
+ @Test
+ fun testWorkflowNullType() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": null
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertThrows<JsonParseException> {
+ while (reader.nextRow()) {
+ continue
+ }
+ }
+
+ reader.close()
+ }
+
+ @Test
+ fun testNoJobs() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+
+ }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertDoesNotThrow { reader.nextRow() }
+
+ reader.close()
+ }
+
+ @Test
+ fun testJobsObjectType() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": { "jobs": {} }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertThrows<JsonParseException> { reader.nextRow() }
+
+ reader.close()
+ }
+
+ @Test
+ fun testJobsNullType() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": { "jobs": null }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertThrows<JsonParseException> { reader.nextRow() }
+
+ reader.close()
+ }
+
+ @Test
+ fun testJobsInvalidChildType() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+ "jobs": [1]
+ }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertThrows<JsonParseException> { reader.nextRow() }
+
+ reader.close()
+ }
+
+ @Test
+ fun testJobsValidChildType() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+ "jobs": [
+ {
+ "name": "test"
+ }
+ ]
+ }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertTrue(reader.nextRow())
+ assertEquals("test", reader.get(TASK_ID))
+ assertFalse(reader.nextRow())
+
+ reader.close()
+ }
+
+ @Test
+ fun testJobsInvalidParents() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+ "jobs": [
+ {
+ "name": "test",
+ "parents": 1,
+ }
+ ]
+ }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertThrows<JsonParseException> { reader.nextRow() }
+
+ reader.close()
+ }
+
+ @Test
+ fun testJobsInvalidParentsItem() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+ "jobs": [
+ {
+ "name": "test",
+ "parents": [1],
+ }
+ ]
+ }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertThrows<JsonParseException> { reader.nextRow() }
+
+ reader.close()
+ }
+
+ @Test
+ fun testJobsValidParents() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+ "jobs": [
+ {
+ "name": "test",
+ "parents": ["1"]
+ }
+ ]
+ }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertTrue(reader.nextRow())
+ assertEquals(setOf("1"), reader.get(TASK_PARENTS))
+ assertFalse(reader.nextRow())
+
+ reader.close()
+ }
+
+ @Test
+ fun testJobsInvalidSecondEntry() {
+ val content = """
+ {
+ "workflow": {
+ "jobs": [
+ {
+ "name": "test",
+ "parents": ["1"]
+ },
+ "test"
+ ]
+ }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertDoesNotThrow { reader.nextRow() }
+ assertThrows<JsonParseException> { reader.nextRow() }
+
+ reader.close()
+ }
+
+ @Test
+ fun testDuplicateJobsArray() {
+ val content = """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+ "jobs": [
+ {
+ "name": "test",
+ "parents": ["1"]
+ }
+ ],
+ "jobs": [
+ {
+ "name": "test2",
+ "parents": ["test"]
+ }
+ ]
+ }
+ }
+ """.trimIndent()
+ val parser = factory.createParser(content)
+ val reader = WfFormatTaskTableReader(parser)
+
+ assertTrue(reader.nextRow())
+ assertTrue(reader.nextRow())
+ assertEquals("test2", reader.get(TASK_ID))
+ assertFalse(reader.nextRow())
+
+ reader.close()
+ }
+}
diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
new file mode 100644
index 00000000..0bfc8840
--- /dev/null
+++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wfformat
+
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.junit.jupiter.api.assertThrows
+import org.opendc.trace.*
+import java.io.File
+import java.net.URL
+
+/**
+ * Test suite for the [WfFormatTraceFormat] class.
+ */
+class WfFormatTraceFormatTest {
+ @Test
+ fun testTraceExists() {
+ val input = File("src/test/resources/trace.json").toURI().toURL()
+ val format = WfFormatTraceFormat()
+ assertDoesNotThrow { format.open(input) }
+ }
+
+ @Test
+ fun testTraceDoesNotExists() {
+ val input = File("src/test/resources/trace.json").toURI().toURL()
+ val format = WfFormatTraceFormat()
+ assertThrows<IllegalArgumentException> { format.open(URL(input.toString() + "help")) }
+ }
+
+ @Test
+ fun testTables() {
+ val input = File("src/test/resources/trace.json").toURI().toURL()
+ val format = WfFormatTraceFormat()
+ val trace = format.open(input)
+
+ assertEquals(listOf(TABLE_TASKS), trace.tables)
+ }
+
+ @Test
+ fun testTableExists() {
+ val input = File("src/test/resources/trace.json").toURI().toURL()
+ val format = WfFormatTraceFormat()
+ val table = format.open(input).getTable(TABLE_TASKS)
+
+ assertNotNull(table)
+ assertDoesNotThrow { table!!.newReader() }
+ }
+
+ @Test
+ fun testTableDoesNotExist() {
+ val input = File("src/test/resources/trace.json").toURI().toURL()
+ val format = WfFormatTraceFormat()
+ val trace = format.open(input)
+
+ assertFalse(trace.containsTable("test"))
+ assertNull(trace.getTable("test"))
+ }
+
+ /**
+ * Smoke test for parsing WfCommons traces.
+ */
+ @Test
+ fun testTableReader() {
+ val input = File("src/test/resources/trace.json").toURI().toURL()
+ val trace = WfFormatTraceFormat().open(input)
+ val reader = trace.getTable(TABLE_TASKS)!!.newReader()
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("makebwaindex_mammoth_mt_krause.fasta", reader.get(TASK_ID)) },
+ { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) },
+ { assertEquals(172000, reader.get(TASK_RUNTIME).toMillis()) },
+ { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) },
+ )
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("makeseqdict_mammoth_mt_krause.fasta", reader.get(TASK_ID)) },
+ { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) },
+ { assertEquals(175000, reader.get(TASK_RUNTIME).toMillis()) },
+ { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.get(TASK_PARENTS)) },
+ )
+
+ reader.close()
+ }
+
+ /**
+ * Test full iteration of the table.
+ */
+ @Test
+ fun testTableReaderFull() {
+ val input = File("src/test/resources/trace.json").toURI().toURL()
+ val trace = WfFormatTraceFormat().open(input)
+ val reader = trace.getTable(TABLE_TASKS)!!.newReader()
+
+ assertDoesNotThrow {
+ while (reader.nextRow()) {
+ // reader.get(TASK_ID)
+ }
+ reader.close()
+ }
+ }
+
+ @Test
+ fun testTableReaderPartition() {
+ val input = File("src/test/resources/trace.json").toURI().toURL()
+ val format = WfFormatTraceFormat()
+ val table = format.open(input).getTable(TABLE_TASKS)!!
+
+ assertThrows<IllegalArgumentException> { table.newReader("test") }
+ }
+}
diff --git a/opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json b/opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json
new file mode 100644
index 00000000..d21f024d
--- /dev/null
+++ b/opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json
@@ -0,0 +1,1342 @@
+{
+ "name": "eager-nextflow-chameleon",
+ "description": "Instance generated with WfCommons - https://wfcommons.org",
+ "createdAt": "2021-09-06T03:43:31.762479",
+ "schemaVersion": "1.2",
+ "author": {
+ "name": "cc",
+ "email": "support@wfcommons.org"
+ },
+ "wms": {
+ "name": "Nextflow",
+ "version": "21.04.3",
+ "url": "https://www.nextflow.io"
+ },
+ "workflow": {
+ "executedAt": "20210906T034331+0000",
+ "makespan": 275,
+ "jobs": [
+ {
+ "name": "makebwaindex_mammoth_mt_krause.fasta",
+ "type": "compute",
+ "runtime": 172.182,
+ "command": {
+ "program": "makebwaindex",
+ "arguments": [
+ "bwa",
+ "index",
+ "Mammoth_MT_Krause.fasta",
+ "mkdir",
+ "BWAIndex",
+ "&&",
+ "mv",
+ "Mammoth_MT_Krause.fasta*",
+ "BWAIndex"
+ ]
+ },
+ "parents": [],
+ "children": [
+ "makeseqdict_mammoth_mt_krause.fasta"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000001",
+ "category": "makebwaindex",
+ "avgCPU": 5.8,
+ "bytesRead": 124,
+ "bytesWritten": 126,
+ "memory": 4248
+ },
+ {
+ "name": "makeseqdict_mammoth_mt_krause.fasta",
+ "type": "compute",
+ "runtime": 175.427,
+ "command": {
+ "program": "makeseqdict",
+ "arguments": [
+ "picard",
+ "-Xmx6144M",
+ "CreateSequenceDictionary",
+ "R=Mammoth_MT_Krause.fasta",
+ "O=\"Mammoth_MT_Krause.dict\""
+ ]
+ },
+ "parents": [
+ "makebwaindex_mammoth_mt_krause.fasta"
+ ],
+ "children": [
+ "makefastaindex_mammoth_mt_krause.fasta"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000003",
+ "category": "makeseqdict",
+ "avgCPU": 83.5,
+ "bytesRead": 22728,
+ "bytesWritten": 1300,
+ "memory": 104416
+ },
+ {
+ "name": "makefastaindex_mammoth_mt_krause.fasta",
+ "type": "compute",
+ "runtime": 170.797,
+ "command": {
+ "program": "makefastaindex",
+ "arguments": [
+ "samtools",
+ "faidx",
+ "Mammoth_MT_Krause.fasta"
+ ]
+ },
+ "parents": [
+ "makeseqdict_mammoth_mt_krause.fasta"
+ ],
+ "children": [
+ "output_documentation"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000002",
+ "category": "makefastaindex",
+ "avgCPU": 23.8,
+ "bytesRead": 66,
+ "bytesWritten": 4,
+ "memory": 6096
+ },
+ {
+ "name": "output_documentation",
+ "type": "compute",
+ "runtime": 173.479,
+ "command": {
+ "program": "output_documentation",
+ "arguments": [
+ "markdown_to_html.py",
+ "output.md",
+ "-o",
+ "results_description.html"
+ ]
+ },
+ "parents": [
+ "makefastaindex_mammoth_mt_krause.fasta"
+ ],
+ "children": [
+ "get_software_versions"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000005",
+ "category": "output_documentation",
+ "avgCPU": 84.0,
+ "bytesRead": 8222,
+ "bytesWritten": 15165,
+ "memory": 11488
+ },
+ {
+ "name": "get_software_versions",
+ "type": "compute",
+ "runtime": 183.445,
+ "command": {
+ "program": "get_software_versions",
+ "arguments": [
+ "echo",
+ "2.3.5",
+ "&>",
+ "v_pipeline.txt",
+ "echo",
+ "21.04.3",
+ "&>",
+ "v_nextflow.txt",
+ "fastqc",
+ "--version",
+ "&>",
+ "v_fastqc.txt",
+ "2>&1",
+ "||",
+ "true",
+ "AdapterRemoval",
+ "--version",
+ "&>",
+ "v_adapterremoval.txt",
+ "2>&1",
+ "||",
+ "true",
+ "fastp",
+ "--version",
+ "&>",
+ "v_fastp.txt",
+ "2>&1",
+ "||",
+ "true",
+ "bwa",
+ "&>",
+ "v_bwa.txt",
+ "2>&1",
+ "||",
+ "true",
+ "circulargenerator",
+ "--help",
+ "|",
+ "head",
+ "-n",
+ "1",
+ "&>",
+ "v_circulargenerator.txt",
+ "2>&1",
+ "||",
+ "true",
+ "samtools",
+ "--version",
+ "&>",
+ "v_samtools.txt",
+ "2>&1",
+ "||",
+ "true",
+ "dedup",
+ "-v",
+ "&>",
+ "v_dedup.txt",
+ "2>&1",
+ "||",
+ "true",
+ "##",
+ "bioconda",
+ "recipe",
+ "of",
+ "picard",
+ "is",
+ "incorrectly",
+ "set",
+ "up",
+ "and",
+ "extra",
+ "warning",
+ "made",
+ "with",
+ "stderr,",
+ "this",
+ "ugly",
+ "command",
+ "ensures",
+ "only",
+ "version",
+ "exported",
+ "(",
+ "exec",
+ "7>&1",
+ "picard",
+ "MarkDuplicates",
+ "--version",
+ "2>&1",
+ ">&7",
+ "|",
+ "grep",
+ "-v",
+ "/",
+ ">&2",
+ ")",
+ "2>",
+ "v_markduplicates.txt",
+ "||",
+ "true",
+ "qualimap",
+ "--version",
+ "&>",
+ "v_qualimap.txt",
+ "2>&1",
+ "||",
+ "true",
+ "preseq",
+ "&>",
+ "v_preseq.txt",
+ "2>&1",
+ "||",
+ "true",
+ "gatk",
+ "--version",
+ "2>&1",
+ "|",
+ "head",
+ "-n",
+ "1",
+ ">",
+ "v_gatk.txt",
+ "2>&1",
+ "||",
+ "true",
+ "gatk3",
+ "--version",
+ "2>&1",
+ ">",
+ "v_gatk3.txt",
+ "2>&1",
+ "||",
+ "true",
+ "freebayes",
+ "--version",
+ "&>",
+ "v_freebayes.txt",
+ "2>&1",
+ "||",
+ "true",
+ "bedtools",
+ "--version",
+ "&>",
+ "v_bedtools.txt",
+ "2>&1",
+ "||",
+ "true",
+ "damageprofiler",
+ "--version",
+ "&>",
+ "v_damageprofiler.txt",
+ "2>&1",
+ "||",
+ "true",
+ "bam",
+ "--version",
+ "&>",
+ "v_bamutil.txt",
+ "2>&1",
+ "||",
+ "true",
+ "pmdtools",
+ "--version",
+ "&>",
+ "v_pmdtools.txt",
+ "2>&1",
+ "||",
+ "true",
+ "angsd",
+ "-h",
+ "|&",
+ "head",
+ "-n",
+ "1",
+ "|",
+ "cut",
+ "-d",
+ "-f3-4",
+ "&>",
+ "v_angsd.txt",
+ "2>&1",
+ "||",
+ "true",
+ "multivcfanalyzer",
+ "--help",
+ "|",
+ "head",
+ "-n",
+ "1",
+ "&>",
+ "v_multivcfanalyzer.txt",
+ "2>&1",
+ "||",
+ "true",
+ "malt-run",
+ "--help",
+ "|&",
+ "tail",
+ "-n",
+ "3",
+ "|",
+ "head",
+ "-n",
+ "1",
+ "|",
+ "cut",
+ "-f",
+ "2",
+ "-d(",
+ "|",
+ "cut",
+ "-f",
+ "1",
+ "-d",
+ ",",
+ "&>",
+ "v_malt.txt",
+ "2>&1",
+ "||",
+ "true",
+ "MaltExtract",
+ "--help",
+ "|",
+ "head",
+ "-n",
+ "2",
+ "|",
+ "tail",
+ "-n",
+ "1",
+ "&>",
+ "v_maltextract.txt",
+ "2>&1",
+ "||",
+ "true",
+ "multiqc",
+ "--version",
+ "&>",
+ "v_multiqc.txt",
+ "2>&1",
+ "||",
+ "true",
+ "vcf2genome",
+ "-h",
+ "|&",
+ "head",
+ "-n",
+ "1",
+ "&>",
+ "v_vcf2genome.txt",
+ "||",
+ "true",
+ "mtnucratio",
+ "--help",
+ "&>",
+ "v_mtnucratiocalculator.txt",
+ "||",
+ "true",
+ "sexdeterrmine",
+ "--version",
+ "&>",
+ "v_sexdeterrmine.txt",
+ "||",
+ "true",
+ "kraken2",
+ "--version",
+ "|",
+ "head",
+ "-n",
+ "1",
+ "&>",
+ "v_kraken.txt",
+ "||",
+ "true",
+ "endorS.py",
+ "--version",
+ "&>",
+ "v_endorSpy.txt",
+ "||",
+ "true",
+ "pileupCaller",
+ "--version",
+ "&>",
+ "v_sequencetools.txt",
+ "2>&1",
+ "||",
+ "true",
+ "bowtie2",
+ "--version",
+ "|",
+ "grep",
+ "-a",
+ "bowtie2-.*",
+ "-fdebug",
+ ">",
+ "v_bowtie2.txt",
+ "||",
+ "true",
+ "eigenstrat_snp_coverage",
+ "--version",
+ "|",
+ "cut",
+ "-d",
+ "-f2",
+ ">v_eigenstrat_snp_coverage.txt",
+ "||",
+ "true",
+ "mapDamage",
+ "--version",
+ ">",
+ "v_mapdamage.txt",
+ "||",
+ "true",
+ "bbduk.sh",
+ "|",
+ "grep",
+ "Last",
+ "modified",
+ "|",
+ "cut",
+ "-d",
+ "-f",
+ "3-99",
+ ">",
+ "v_bbduk.txt",
+ "||",
+ "true",
+ "scrape_software_versions.py",
+ "&>",
+ "software_versions_mqc.yaml"
+ ]
+ },
+ "parents": [
+ "output_documentation"
+ ],
+ "children": [
+ "fastqc_jk2782_l1",
+ "fastqc_jk2802_l2"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000006",
+ "category": "get_software_versions",
+ "avgCPU": 147.8,
+ "bytesRead": 172760,
+ "bytesWritten": 1048,
+ "memory": 387324
+ },
+ {
+ "name": "fastqc_jk2782_l1",
+ "type": "compute",
+ "runtime": 175.205,
+ "command": {
+ "program": "fastqc",
+ "arguments": [
+ "fastqc",
+ "-t",
+ "2",
+ "-q",
+ "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq.gz",
+ "JK2782_TGGCCGATCAACGA_L008_R2_001.fastq.gz.tengrand.fq.gz",
+ "rename",
+ "s/_fastqc.zip$/_raw_fastqc.zip/",
+ "*_fastqc.zip",
+ "rename",
+ "s/_fastqc.html$/_raw_fastqc.html/",
+ "*_fastqc.html"
+ ]
+ },
+ "parents": [
+ "get_software_versions"
+ ],
+ "children": [
+ "adapter_removal_jk2782_l1",
+ "adapter_removal_jk2802_l2"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000007",
+ "category": "fastqc",
+ "avgCPU": 161.8,
+ "bytesRead": 35981,
+ "bytesWritten": 3967,
+ "memory": 270124
+ },
+ {
+ "name": "adapter_removal_jk2782_l1",
+ "type": "compute",
+ "runtime": 172.643,
+ "command": {
+ "program": "adapter_removal",
+ "arguments": [
+ "mkdir",
+ "-p",
+ "output",
+ "AdapterRemoval",
+ "--file1",
+ "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq.gz",
+ "--file2",
+ "JK2782_TGGCCGATCAACGA_L008_R2_001.fastq.gz.tengrand.fq.gz",
+ "--basename",
+ "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe",
+ "--gzip",
+ "--threads",
+ "2",
+ "--qualitymax",
+ "41",
+ "--collapse",
+ "--trimns",
+ "--trimqualities",
+ "--adapter1",
+ "AGATCGGAAGAGCACACGTCTGAACTCCAGTCAC",
+ "--adapter2",
+ "AGATCGGAAGAGCGTCGTGTAGGGAAAGAGTGTA",
+ "--minlength",
+ "30",
+ "--minquality",
+ "20",
+ "--minadapteroverlap",
+ "1",
+ "cat",
+ "*.collapsed.gz",
+ "*.collapsed.truncated.gz",
+ "*.singleton.truncated.gz",
+ "*.pair1.truncated.gz",
+ "*.pair2.truncated.gz",
+ ">",
+ "output/JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.tmp.fq.gz",
+ "mv",
+ "*.settings",
+ "output/",
+ "##",
+ "Add",
+ "R_",
+ "and",
+ "L_",
+ "for",
+ "unmerged",
+ "reads",
+ "for",
+ "DeDup",
+ "compatibility",
+ "AdapterRemovalFixPrefix",
+ "-Xmx4g",
+ "output/JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.tmp.fq.gz",
+ "|",
+ "pigz",
+ "-p",
+ "1",
+ ">",
+ "output/JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz"
+ ]
+ },
+ "parents": [
+ "fastqc_jk2782_l1",
+ "fastqc_jk2802_l2"
+ ],
+ "children": [
+ "fastqc_after_clipping_jk2782_l1",
+ "fastqc_after_clipping_jk2802_l2"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000008",
+ "category": "adapter_removal",
+ "avgCPU": 160.9,
+ "bytesRead": 17357,
+ "bytesWritten": 4405,
+ "memory": 79308
+ },
+ {
+ "name": "fastqc_jk2802_l2",
+ "type": "compute",
+ "runtime": 177.338,
+ "command": {
+ "program": "fastqc",
+ "arguments": [
+ "fastqc",
+ "-q",
+ "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq.gz",
+ "rename",
+ "s/_fastqc.zip$/_raw_fastqc.zip/",
+ "*_fastqc.zip",
+ "rename",
+ "s/_fastqc.html$/_raw_fastqc.html/",
+ "*_fastqc.html"
+ ]
+ },
+ "parents": [
+ "get_software_versions"
+ ],
+ "children": [
+ "adapter_removal_jk2782_l1",
+ "adapter_removal_jk2802_l2"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000009",
+ "category": "fastqc",
+ "avgCPU": 120.1,
+ "bytesRead": 24457,
+ "bytesWritten": 2181,
+ "memory": 181060
+ },
+ {
+ "name": "adapter_removal_jk2802_l2",
+ "type": "compute",
+ "runtime": 174.313,
+ "command": {
+ "program": "adapter_removal",
+ "arguments": [
+ "mkdir",
+ "-p",
+ "output",
+ "AdapterRemoval",
+ "--file1",
+ "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq.gz",
+ "--basename",
+ "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se",
+ "--gzip",
+ "--threads",
+ "2",
+ "--qualitymax",
+ "41",
+ "--trimns",
+ "--trimqualities",
+ "--adapter1",
+ "AGATCGGAAGAGCACACGTCTGAACTCCAGTCAC",
+ "--adapter2",
+ "AGATCGGAAGAGCGTCGTGTAGGGAAAGAGTGTA",
+ "--minlength",
+ "30",
+ "--minquality",
+ "20",
+ "--minadapteroverlap",
+ "1",
+ "mv",
+ "*.settings",
+ "*.se.truncated.gz",
+ "output/"
+ ]
+ },
+ "parents": [
+ "fastqc_jk2782_l1",
+ "fastqc_jk2802_l2"
+ ],
+ "children": [
+ "fastqc_after_clipping_jk2782_l1",
+ "fastqc_after_clipping_jk2802_l2"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000010",
+ "category": "adapter_removal",
+ "avgCPU": 106.5,
+ "bytesRead": 683,
+ "bytesWritten": 897,
+ "memory": 12136
+ },
+ {
+ "name": "fastqc_after_clipping_jk2782_l1",
+ "type": "compute",
+ "runtime": 15.371,
+ "command": {
+ "program": "fastqc_after_clipping",
+ "arguments": [
+ "fastqc",
+ "-t",
+ "2",
+ "-q",
+ "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz"
+ ]
+ },
+ "parents": [
+ "adapter_removal_jk2782_l1",
+ "adapter_removal_jk2802_l2"
+ ],
+ "children": [
+ "bwa_jk2802",
+ "bwa_jk2782"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000013",
+ "category": "fastqc_after_clipping",
+ "avgCPU": 133.3,
+ "bytesRead": 23788,
+ "bytesWritten": 1998,
+ "memory": 215020
+ },
+ {
+ "name": "fastqc_after_clipping_jk2802_l2",
+ "type": "compute",
+ "runtime": 15.272,
+ "command": {
+ "program": "fastqc_after_clipping",
+ "arguments": [
+ "fastqc",
+ "-t",
+ "2",
+ "-q",
+ "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se.truncated.gz"
+ ]
+ },
+ "parents": [
+ "adapter_removal_jk2782_l1",
+ "adapter_removal_jk2802_l2"
+ ],
+ "children": [
+ "bwa_jk2802",
+ "bwa_jk2782"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000014",
+ "category": "fastqc_after_clipping",
+ "avgCPU": 124.1,
+ "bytesRead": 23882,
+ "bytesWritten": 2143,
+ "memory": 213064
+ },
+ {
+ "name": "bwa_jk2802",
+ "type": "compute",
+ "runtime": 9.566,
+ "command": {
+ "program": "bwa",
+ "arguments": [
+ "bwa",
+ "aln",
+ "-t",
+ "2",
+ "BWAIndex/Mammoth_MT_Krause.fasta",
+ "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se.truncated.gz",
+ "-n",
+ "0.04",
+ "-l",
+ "1024",
+ "-k",
+ "2",
+ "-o",
+ "1",
+ "-f",
+ "JK2802.sai",
+ "bwa",
+ "samse",
+ "-r",
+ "\"@RGtID:ILLUMINA-JK2802tSM:JK2802tPL:illuminatPU:ILLUMINA-JK2802-SE\"",
+ "BWAIndex/Mammoth_MT_Krause.fasta",
+ "JK2802.sai",
+ "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se.truncated.gz",
+ "|",
+ "samtools",
+ "sort",
+ "-@",
+ "1",
+ "-O",
+ "bam",
+ "-",
+ ">",
+ "\"JK2802\"_\"SE\".mapped.bam",
+ "samtools",
+ "index",
+ "\"JK2802\"_\"SE\".mapped.bam"
+ ]
+ },
+ "parents": [
+ "fastqc_after_clipping_jk2782_l1",
+ "fastqc_after_clipping_jk2802_l2"
+ ],
+ "children": [
+ "samtools_flagstat_jk2782",
+ "samtools_flagstat_jk2802"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000016",
+ "category": "bwa",
+ "avgCPU": 15.7,
+ "bytesRead": 3774,
+ "bytesWritten": 3367,
+ "memory": 10628
+ },
+ {
+ "name": "bwa_jk2782",
+ "type": "compute",
+ "runtime": 9.652,
+ "command": {
+ "program": "bwa",
+ "arguments": [
+ "bwa",
+ "aln",
+ "-t",
+ "2",
+ "BWAIndex/Mammoth_MT_Krause.fasta",
+ "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz",
+ "-n",
+ "0.04",
+ "-l",
+ "1024",
+ "-k",
+ "2",
+ "-o",
+ "1",
+ "-f",
+ "JK2782.sai",
+ "bwa",
+ "samse",
+ "-r",
+ "\"@RGtID:ILLUMINA-JK2782tSM:JK2782tPL:illuminatPU:ILLUMINA-JK2782-PE\"",
+ "BWAIndex/Mammoth_MT_Krause.fasta",
+ "JK2782.sai",
+ "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz",
+ "|",
+ "samtools",
+ "sort",
+ "-@",
+ "1",
+ "-O",
+ "bam",
+ "-",
+ ">",
+ "\"JK2782\"_\"PE\".mapped.bam",
+ "samtools",
+ "index",
+ "\"JK2782\"_\"PE\".mapped.bam"
+ ]
+ },
+ "parents": [
+ "fastqc_after_clipping_jk2782_l1",
+ "fastqc_after_clipping_jk2802_l2"
+ ],
+ "children": [
+ "samtools_flagstat_jk2782",
+ "samtools_flagstat_jk2802"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000015",
+ "category": "bwa",
+ "avgCPU": 69.8,
+ "bytesRead": 3705,
+ "bytesWritten": 3355,
+ "memory": 12876
+ },
+ {
+ "name": "samtools_flagstat_jk2782",
+ "type": "compute",
+ "runtime": 13.011,
+ "command": {
+ "program": "samtools_flagstat",
+ "arguments": [
+ "samtools",
+ "flagstat",
+ "JK2782_PE.mapped.bam",
+ ">",
+ "JK2782_flagstat.stats"
+ ]
+ },
+ "parents": [
+ "bwa_jk2802",
+ "bwa_jk2782"
+ ],
+ "children": [
+ "markduplicates_jk2782",
+ "markduplicates_jk2802"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000026",
+ "category": "samtools_flagstat",
+ "avgCPU": 30.1,
+ "bytesRead": 478,
+ "bytesWritten": 5,
+ "memory": 6468
+ },
+ {
+ "name": "samtools_flagstat_jk2802",
+ "type": "compute",
+ "runtime": 13.129,
+ "command": {
+ "program": "samtools_flagstat",
+ "arguments": [
+ "samtools",
+ "flagstat",
+ "JK2802_SE.mapped.bam",
+ ">",
+ "JK2802_flagstat.stats"
+ ]
+ },
+ "parents": [
+ "bwa_jk2802",
+ "bwa_jk2782"
+ ],
+ "children": [
+ "markduplicates_jk2782",
+ "markduplicates_jk2802"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000024",
+ "category": "samtools_flagstat",
+ "avgCPU": 118.5,
+ "bytesRead": 551,
+ "bytesWritten": 5
+ },
+ {
+ "name": "markduplicates_jk2782",
+ "type": "compute",
+ "runtime": 22.655,
+ "command": {
+ "program": "markduplicates",
+ "arguments": [
+ "mv",
+ "JK2782_PE.mapped.bam",
+ "JK2782.bam",
+ "picard",
+ "-Xmx4096M",
+ "MarkDuplicates",
+ "INPUT=JK2782.bam",
+ "OUTPUT=JK2782_rmdup.bam",
+ "REMOVE_DUPLICATES=TRUE",
+ "AS=TRUE",
+ "METRICS_FILE=\"JK2782_rmdup.metrics\"",
+ "VALIDATION_STRINGENCY=SILENT",
+ "samtools",
+ "index",
+ "JK2782_rmdup.bam"
+ ]
+ },
+ "parents": [
+ "samtools_flagstat_jk2782",
+ "samtools_flagstat_jk2802"
+ ],
+ "children": [
+ "preseq_jk2782",
+ "preseq_jk2802"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000021",
+ "category": "markduplicates",
+ "avgCPU": 173.6,
+ "bytesRead": 24055,
+ "bytesWritten": 2319,
+ "memory": 1400048
+ },
+ {
+ "name": "markduplicates_jk2802",
+ "type": "compute",
+ "runtime": 21.545,
+ "command": {
+ "program": "markduplicates",
+ "arguments": [
+ "mv",
+ "JK2802_SE.mapped.bam",
+ "JK2802.bam",
+ "picard",
+ "-Xmx4096M",
+ "MarkDuplicates",
+ "INPUT=JK2802.bam",
+ "OUTPUT=JK2802_rmdup.bam",
+ "REMOVE_DUPLICATES=TRUE",
+ "AS=TRUE",
+ "METRICS_FILE=\"JK2802_rmdup.metrics\"",
+ "VALIDATION_STRINGENCY=SILENT",
+ "samtools",
+ "index",
+ "JK2802_rmdup.bam"
+ ]
+ },
+ "parents": [
+ "samtools_flagstat_jk2782",
+ "samtools_flagstat_jk2802"
+ ],
+ "children": [
+ "preseq_jk2782",
+ "preseq_jk2802"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000020",
+ "category": "markduplicates",
+ "avgCPU": 182.6,
+ "bytesRead": 24242,
+ "bytesWritten": 2466,
+ "memory": 1404624
+ },
+ {
+ "name": "preseq_jk2782",
+ "type": "compute",
+ "runtime": 12.299,
+ "command": {
+ "program": "preseq",
+ "arguments": [
+ "preseq",
+ "c_curve",
+ "-s",
+ "1000",
+ "-o",
+ "JK2782_PE.mapped.ccurve",
+ "-B",
+ "JK2782_PE.mapped.bam"
+ ]
+ },
+ "parents": [
+ "markduplicates_jk2782",
+ "markduplicates_jk2802"
+ ],
+ "children": [
+ "endorspy_jk2782",
+ "endorspy_jk2802"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000030",
+ "category": "preseq",
+ "avgCPU": 81.9,
+ "bytesRead": 473,
+ "bytesWritten": 4,
+ "memory": 12032
+ },
+ {
+ "name": "preseq_jk2802",
+ "type": "compute",
+ "runtime": 10.188,
+ "command": {
+ "program": "preseq",
+ "arguments": [
+ "preseq",
+ "c_curve",
+ "-s",
+ "1000",
+ "-o",
+ "JK2802_SE.mapped.ccurve",
+ "-B",
+ "JK2802_SE.mapped.bam"
+ ]
+ },
+ "parents": [
+ "markduplicates_jk2782",
+ "markduplicates_jk2802"
+ ],
+ "children": [
+ "endorspy_jk2782",
+ "endorspy_jk2802"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000027",
+ "category": "preseq",
+ "avgCPU": 77.6,
+ "bytesRead": 548,
+ "bytesWritten": 4,
+ "memory": 11972
+ },
+ {
+ "name": "endorspy_jk2782",
+ "type": "compute",
+ "runtime": 7.537,
+ "command": {
+ "program": "endorspy",
+ "arguments": [
+ "endorS.py",
+ "-o",
+ "json",
+ "-n",
+ "JK2782",
+ "JK2782_flagstat.stats"
+ ]
+ },
+ "parents": [
+ "preseq_jk2782",
+ "preseq_jk2802"
+ ],
+ "children": [
+ "damageprofiler_jk2802",
+ "damageprofiler_jk2782"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000031",
+ "category": "endorspy",
+ "avgCPU": 44.7,
+ "bytesRead": 623,
+ "bytesWritten": 4,
+ "memory": 12264
+ },
+ {
+ "name": "endorspy_jk2802",
+ "type": "compute",
+ "runtime": 8.0,
+ "command": {
+ "program": "endorspy",
+ "arguments": [
+ "endorS.py",
+ "-o",
+ "json",
+ "-n",
+ "JK2802",
+ "JK2802_flagstat.stats"
+ ]
+ },
+ "parents": [
+ "preseq_jk2782",
+ "preseq_jk2802"
+ ],
+ "children": [
+ "damageprofiler_jk2802",
+ "damageprofiler_jk2782"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000032",
+ "category": "endorspy",
+ "avgCPU": 54.0,
+ "bytesRead": 623,
+ "bytesWritten": 4,
+ "memory": 12224
+ },
+ {
+ "name": "damageprofiler_jk2802",
+ "type": "compute",
+ "runtime": 18.596,
+ "command": {
+ "program": "damageprofiler",
+ "arguments": [
+ "damageprofiler",
+ "-Xmx4g",
+ "-i",
+ "JK2802_rmdup.bam",
+ "-r",
+ "Mammoth_MT_Krause.fasta",
+ "-l",
+ "100",
+ "-t",
+ "15",
+ "-o",
+ ".",
+ "-yaxis_damageplot",
+ "0.30"
+ ]
+ },
+ "parents": [
+ "endorspy_jk2782",
+ "endorspy_jk2802"
+ ],
+ "children": [
+ "qualimap_jk2802",
+ "qualimap_jk2782"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000033",
+ "category": "damageprofiler",
+ "avgCPU": 88.6,
+ "bytesRead": 25744,
+ "bytesWritten": 391,
+ "memory": 242940
+ },
+ {
+ "name": "damageprofiler_jk2782",
+ "type": "compute",
+ "runtime": 16.736,
+ "command": {
+ "program": "damageprofiler",
+ "arguments": [
+ "damageprofiler",
+ "-Xmx4g",
+ "-i",
+ "JK2782_rmdup.bam",
+ "-r",
+ "Mammoth_MT_Krause.fasta",
+ "-l",
+ "100",
+ "-t",
+ "15",
+ "-o",
+ ".",
+ "-yaxis_damageplot",
+ "0.30"
+ ]
+ },
+ "parents": [
+ "endorspy_jk2782",
+ "endorspy_jk2802"
+ ],
+ "children": [
+ "qualimap_jk2802",
+ "qualimap_jk2782"
+ ],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000036",
+ "category": "damageprofiler",
+ "avgCPU": 88.3,
+ "bytesRead": 25661,
+ "bytesWritten": 327,
+ "memory": 198276
+ },
+ {
+ "name": "qualimap_jk2802",
+ "type": "compute",
+ "runtime": 15.368,
+ "command": {
+ "program": "qualimap",
+ "arguments": [
+ "qualimap",
+ "bamqc",
+ "-bam",
+ "JK2802_rmdup.bam",
+ "-nt",
+ "2",
+ "-outdir",
+ ".",
+ "-outformat",
+ "\"HTML\"",
+ "--java-mem-size=4G"
+ ]
+ },
+ "parents": [
+ "damageprofiler_jk2802",
+ "damageprofiler_jk2782"
+ ],
+ "children": [
+ "multiqc_1"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000053",
+ "category": "qualimap",
+ "avgCPU": 177.7,
+ "bytesRead": 35038,
+ "bytesWritten": 1712,
+ "memory": 209440
+ },
+ {
+ "name": "qualimap_jk2782",
+ "type": "compute",
+ "runtime": 14.223,
+ "command": {
+ "program": "qualimap",
+ "arguments": [
+ "qualimap",
+ "bamqc",
+ "-bam",
+ "JK2782_rmdup.bam",
+ "-nt",
+ "2",
+ "-outdir",
+ ".",
+ "-outformat",
+ "\"HTML\"",
+ "--java-mem-size=4G"
+ ]
+ },
+ "parents": [
+ "damageprofiler_jk2802",
+ "damageprofiler_jk2782"
+ ],
+ "children": [
+ "multiqc_1"
+ ],
+ "files": [],
+ "cores": 2.0,
+ "id": "ID000054",
+ "category": "qualimap",
+ "avgCPU": 181.9,
+ "bytesRead": 34954,
+ "bytesWritten": 1937,
+ "memory": 232196
+ },
+ {
+ "name": "multiqc_1",
+ "type": "compute",
+ "runtime": 46.376,
+ "command": {
+ "program": "multiqc",
+ "arguments": [
+ "multiqc",
+ "-f",
+ "multiqc_config.yaml",
+ "."
+ ]
+ },
+ "parents": [
+ "qualimap_jk2802",
+ "qualimap_jk2782"
+ ],
+ "children": [],
+ "files": [],
+ "cores": 1.0,
+ "id": "ID000056",
+ "category": "multiqc",
+ "avgCPU": 93.0,
+ "bytesRead": 1215169,
+ "bytesWritten": 22599,
+ "memory": 139496
+ }
+ ]
+ }
+}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt
index be26f540..74202718 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt
@@ -35,21 +35,18 @@ internal class WtfTaskTable(private val path: Path) : Table {
override val isSynthetic: Boolean = false
- override fun isSupported(column: TableColumn<*>): Boolean {
- return when (column) {
- TASK_ID -> true
- TASK_WORKFLOW_ID -> true
- TASK_SUBMIT_TIME -> true
- TASK_WAIT_TIME -> true
- TASK_RUNTIME -> true
- TASK_REQ_NCPUS -> true
- TASK_PARENTS -> true
- TASK_CHILDREN -> true
- TASK_GROUP_ID -> true
- TASK_USER_ID -> true
- else -> false
- }
- }
+ override val columns: List<TableColumn<*>> = listOf(
+ TASK_ID,
+ TASK_WORKFLOW_ID,
+ TASK_SUBMIT_TIME,
+ TASK_WAIT_TIME,
+ TASK_RUNTIME,
+ TASK_REQ_NCPUS,
+ TASK_PARENTS,
+ TASK_CHILDREN,
+ TASK_GROUP_ID,
+ TASK_USER_ID
+ )
override fun newReader(): TableReader {
val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0"))
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
index b6789542..5e2463f8 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
@@ -25,6 +25,8 @@ package org.opendc.trace.wtf
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Duration
+import java.time.Instant
/**
* A [TableReader] implementation for the WTF format.
@@ -61,14 +63,14 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
@Suppress("UNCHECKED_CAST")
val res: Any = when (column) {
- TASK_ID -> record["id"]
- TASK_WORKFLOW_ID -> record["workflow_id"]
- TASK_SUBMIT_TIME -> record["ts_submit"]
- TASK_WAIT_TIME -> record["wait_time"]
- TASK_RUNTIME -> record["runtime"]
+ TASK_ID -> (record["id"] as Long).toString()
+ TASK_WORKFLOW_ID -> (record["workflow_id"] as Long).toString()
+ TASK_SUBMIT_TIME -> Instant.ofEpochMilli(record["ts_submit"] as Long)
+ TASK_WAIT_TIME -> Duration.ofMillis(record["wait_time"] as Long)
+ TASK_RUNTIME -> Duration.ofMillis(record["runtime"] as Long)
TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt()
- TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"] as Long }.toSet()
- TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"] as Long }.toSet()
+ TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
+ TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
TASK_GROUP_ID -> record["group_id"]
TASK_USER_ID -> record["user_id"]
else -> throw IllegalArgumentException("Invalid column")
@@ -94,16 +96,7 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
}
override fun getLong(column: TableColumn<Long>): Long {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (column) {
- TASK_ID -> record["id"] as Long
- TASK_WORKFLOW_ID -> record["workflow_id"] as Long
- TASK_SUBMIT_TIME -> record["ts_submit"] as Long
- TASK_WAIT_TIME -> record["wait_time"] as Long
- TASK_RUNTIME -> record["runtime"] as Long
- else -> throw IllegalArgumentException("Invalid column")
- }
+ throw IllegalArgumentException("Invalid column")
}
override fun getDouble(column: TableColumn<Double>): Double {
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt
index 7eff0f5a..a755a107 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt
@@ -43,5 +43,5 @@ public class WtfTrace internal constructor(private val path: Path) : Trace {
return WtfTaskTable(path)
}
- override fun toString(): String = "SwfTrace[$path]"
+ override fun toString(): String = "WtfTrace[$path]"
}
diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
index a05a523e..b155f265 100644
--- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
@@ -28,6 +28,8 @@ import org.junit.jupiter.api.assertThrows
import org.opendc.trace.*
import java.io.File
import java.net.URL
+import java.time.Duration
+import java.time.Instant
/**
* Test suite for the [WtfTraceFormat] class.
@@ -91,20 +93,20 @@ class WtfTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals(362334516345962206, reader.getLong(TASK_ID)) },
- { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) },
- { assertEquals(245604, reader.getLong(TASK_SUBMIT_TIME)) },
- { assertEquals(8163, reader.getLong(TASK_RUNTIME)) },
- { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) },
+ { assertEquals("362334516345962206", reader.get(TASK_ID)) },
+ { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) },
+ { assertEquals(Instant.ofEpochMilli(245604), reader.get(TASK_SUBMIT_TIME)) },
+ { assertEquals(Duration.ofMillis(8163), reader.get(TASK_RUNTIME)) },
+ { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) },
)
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals(502010169100446658, reader.getLong(TASK_ID)) },
- { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) },
- { assertEquals(251325, reader.getLong(TASK_SUBMIT_TIME)) },
- { assertEquals(8216, reader.getLong(TASK_RUNTIME)) },
- { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) },
+ { assertEquals("502010169100446658", reader.get(TASK_ID)) },
+ { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) },
+ { assertEquals(Instant.ofEpochMilli(251325), reader.get(TASK_SUBMIT_TIME)) },
+ { assertEquals(Duration.ofMillis(8216), reader.get(TASK_RUNTIME)) },
+ { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) },
)
reader.close()