diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-12 12:08:55 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-09-12 12:08:55 +0200 |
| commit | 2cd3bd18e548a72d64afe0e7f59487f4747d722f (patch) | |
| tree | dc9e2fba5ca4d19a90934a8b68dbb8110ee34bb7 /opendc-trace | |
| parent | cae193284570d6ee9dbacdde57b3e4e367aa9d9f (diff) | |
| parent | 992b65396f55c0e12b36823d191dea8e03dd45ba (diff) | |
merge: Add support for new trace formats
This pull request updates the trace API with the addition of several new trace formats.
- Add support for Materna traces from GWA
- Keep reader state in own class
- Parse last column in Solvinity trace format
- Add support Azure VM traces
- Add support for WfCommons (WorkflowHub) traces
- Add API for accessing available table columns
- Add synthetic resource table for Bitbrains format
- Support dynamic resolving of trace formats
**Breaking API Changes**
- Replace `isSupported` by a list of `TableColumns`
Diffstat (limited to 'opendc-trace')
29 files changed, 2740 insertions, 241 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt index 44dec95b..e2e5ea6d 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt @@ -41,7 +41,7 @@ public val RESOURCE_START_TIME: TableColumn<Instant> = TableColumn("resource:sta * End time for the resource. */ @JvmField -public val RESOURCE_END_TIME: TableColumn<Instant> = TableColumn("resource:end_time", Instant::class.java) +public val RESOURCE_STOP_TIME: TableColumn<Instant> = TableColumn("resource:stop_time", Instant::class.java) /** * Number of CPUs for the resource. @@ -50,7 +50,7 @@ public val RESOURCE_END_TIME: TableColumn<Instant> = TableColumn("resource:end_t public val RESOURCE_NCPUS: TableColumn<Int> = intColumn("resource:num_cpus") /** - * Memory capacity for the resource. + * Memory capacity for the resource in KB. */ @JvmField public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = doubleColumn("resource:mem_capacity") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index 11e5d6b7..6aca2051 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -37,9 +37,9 @@ public interface Table { public val isSynthetic: Boolean /** - * Determine whether the specified [column] is supported by this table. + * The list of columns supported in this table. */ - public fun isSupported(column: TableColumn<*>): Boolean + public val columns: List<TableColumn<*>> /** * Open a [TableReader] for this table. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt index 88bbc623..46920dce 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt @@ -23,49 +23,52 @@ @file:JvmName("TaskColumns") package org.opendc.trace +import java.time.Duration +import java.time.Instant + /** * A column containing the task identifier. */ @JvmField -public val TASK_ID: TableColumn<Long> = longColumn("task:id") +public val TASK_ID: TableColumn<String> = stringColumn("task:id") /** * A column containing the identifier of the workflow. */ @JvmField -public val TASK_WORKFLOW_ID: TableColumn<Long> = longColumn("task:workflow_id") +public val TASK_WORKFLOW_ID: TableColumn<String> = stringColumn("task:workflow_id") /** * A column containing the submit time of the task. */ @JvmField -public val TASK_SUBMIT_TIME: TableColumn<Long> = longColumn("task:submit_time") +public val TASK_SUBMIT_TIME: TableColumn<Instant> = TableColumn("task:submit_time", type = Instant::class.java) /** * A column containing the wait time of the task. */ @JvmField -public val TASK_WAIT_TIME: TableColumn<Long> = longColumn("task:wait_time") +public val TASK_WAIT_TIME: TableColumn<Instant> = TableColumn("task:wait_time", type = Instant::class.java) /** * A column containing the runtime time of the task. */ @JvmField -public val TASK_RUNTIME: TableColumn<Long> = longColumn("task:runtime") +public val TASK_RUNTIME: TableColumn<Duration> = TableColumn("task:runtime", type = Duration::class.java) /** * A column containing the parents of a task. */ @Suppress("UNCHECKED_CAST") @JvmField -public val TASK_PARENTS: TableColumn<Set<Long>> = TableColumn("task:parents", type = Set::class.java as Class<Set<Long>>) +public val TASK_PARENTS: TableColumn<Set<String>> = TableColumn("task:parents", type = Set::class.java as Class<Set<String>>) /** * A column containing the children of a task. */ @Suppress("UNCHECKED_CAST") @JvmField -public val TASK_CHILDREN: TableColumn<Set<Long>> = TableColumn("task:children", type = Set::class.java as Class<Set<Long>>) +public val TASK_CHILDREN: TableColumn<Set<String>> = TableColumn("task:children", type = Set::class.java as Class<Set<String>>) /** * A column containing the requested CPUs of a task. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt index 36e93b52..0ae45e86 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt @@ -22,6 +22,11 @@ package org.opendc.trace +import org.opendc.trace.spi.TraceFormat +import java.io.File +import java.net.URL +import java.nio.file.Path + /** * A trace is a collection of related tables that characterize a workload. */ @@ -40,4 +45,34 @@ public interface Trace { * Obtain a [Table] with the specified [name]. */ public fun getTable(name: String): Table? + + public companion object { + /** + * Open a [Trace] at the specified [url] in the given [format]. + * + * @throws IllegalArgumentException if [format] is not supported. + */ + public fun open(url: URL, format: String): Trace { + val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } + return provider.open(url) + } + + /** + * Open a [Trace] at the specified [path] in the given [format]. + * + * @throws IllegalArgumentException if [format] is not supported. + */ + public fun open(path: File, format: String): Trace { + return open(path.toURI().toURL(), format) + } + + /** + * Open a [Trace] at the specified [path] in the given [format]. + * + * @throws IllegalArgumentException if [format] is not supported. + */ + public fun open(path: Path, format: String): Trace { + return open(path.toUri().toURL(), format) + } + } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt index 767ef919..c9e5954d 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt @@ -33,7 +33,7 @@ import kotlin.io.path.nameWithoutExtension /** * The resource state [Table] in the Bitbrains format. */ -internal class BitbrainsResourceStateTable(private val factory: CsvFactory, private val path: Path) : Table { +internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path: Path) : Table { /** * The partitions that belong to the table. */ @@ -41,28 +41,26 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, priv Files.walk(path, 1) .filter { !Files.isDirectory(it) && it.extension == "csv" } .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() override val name: String = TABLE_RESOURCE_STATES override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_STATE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_NCPUS -> true - RESOURCE_STATE_CPU_CAPACITY -> true - RESOURCE_STATE_CPU_USAGE -> true - RESOURCE_STATE_CPU_USAGE_PCT -> true - RESOURCE_STATE_MEM_CAPACITY -> true - RESOURCE_STATE_MEM_USAGE -> true - RESOURCE_STATE_DISK_READ -> true - RESOURCE_STATE_DISK_WRITE -> true - RESOURCE_STATE_NET_RX -> true - RESOURCE_STATE_NET_TX -> true - else -> false - } - } + override val columns: List<TableColumn<*>> = listOf( + RESOURCE_STATE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_NCPUS, + RESOURCE_STATE_CPU_CAPACITY, + RESOURCE_STATE_CPU_USAGE, + RESOURCE_STATE_CPU_USAGE_PCT, + RESOURCE_STATE_MEM_CAPACITY, + RESOURCE_STATE_MEM_USAGE, + RESOURCE_STATE_DISK_READ, + RESOURCE_STATE_DISK_WRITE, + RESOURCE_STATE_NET_RX, + RESOURCE_STATE_NET_TX, + ) override fun newReader(): TableReader { val it = partitions.iterator() diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt index 5687ac7f..dab784c2 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt @@ -22,20 +22,42 @@ package org.opendc.trace.bitbrains +import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* +import java.text.NumberFormat import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneOffset +import java.time.format.DateTimeFormatter +import java.time.format.DateTimeParseException +import java.util.* /** * A [TableReader] for the Bitbrains resource state table. */ internal class BitbrainsResourceStateTableReader(private val partition: String, private val parser: CsvParser) : TableReader { /** - * The current parser state. + * The [DateTimeFormatter] used to parse the timestamps in case of the Materna trace. */ - private val state = RowState() + private val formatter = DateTimeFormatter.ofPattern("dd.MM.yyyy HH:mm:ss") + + /** + * The type of timestamps in the trace. + */ + private var timestampType: TimestampType = TimestampType.UNDECIDED + + /** + * The [NumberFormat] used to parse doubles containing a comma. + */ + private val nf = NumberFormat.getInstance(Locale.GERMAN) + + /** + * A flag to indicate that the trace contains decimals with a comma separator. + */ + private var usesCommaDecimalSeparator = false init { parser.schema = schema @@ -43,7 +65,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun nextRow(): Boolean { // Reset the row state - state.reset() + reset() if (!nextStart()) { return false @@ -57,17 +79,32 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, } when (parser.currentName) { - "Timestamp [ms]" -> state.timestamp = Instant.ofEpochSecond(parser.longValue) - "CPU cores" -> state.cpuCores = parser.intValue - "CPU capacity provisioned [MHZ]" -> state.cpuCapacity = parser.doubleValue - "CPU usage [MHZ]" -> state.cpuUsage = parser.doubleValue - "CPU usage [%]" -> state.cpuUsagePct = parser.doubleValue - "Memory capacity provisioned [KB]" -> state.memCapacity = parser.doubleValue - "Memory usage [KB]" -> state.memUsage = parser.doubleValue - "Disk read throughput [KB/s]" -> state.diskRead = parser.doubleValue - "Disk write throughput [KB/s]" -> state.diskWrite = parser.doubleValue - "Network received throughput [KB/s]" -> state.netReceived = parser.doubleValue - "Network transmitted throughput [KB/s]" -> state.netTransmitted = parser.doubleValue + "Timestamp [ms]" -> { + timestamp = when (timestampType) { + TimestampType.UNDECIDED -> { + try { + val res = LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC) + timestampType = TimestampType.DATE_TIME + res + } catch (e: DateTimeParseException) { + timestampType = TimestampType.EPOCH_MILLIS + Instant.ofEpochSecond(parser.longValue) + } + } + TimestampType.DATE_TIME -> LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC) + TimestampType.EPOCH_MILLIS -> Instant.ofEpochSecond(parser.longValue) + } + } + "CPU cores" -> cpuCores = parser.intValue + "CPU capacity provisioned [MHZ]" -> cpuCapacity = parseSafeDouble() + "CPU usage [MHZ]" -> cpuUsage = parseSafeDouble() + "CPU usage [%]" -> cpuUsagePct = parseSafeDouble() / 100.0 // Convert to range [0, 1] + "Memory capacity provisioned [KB]" -> memCapacity = parseSafeDouble() + "Memory usage [KB]" -> memUsage = parseSafeDouble() + "Disk read throughput [KB/s]" -> diskRead = parseSafeDouble() + "Disk write throughput [KB/s]" -> diskWrite = parseSafeDouble() + "Network received throughput [KB/s]" -> netReceived = parseSafeDouble() + "Network transmitted throughput [KB/s]" -> netTransmitted = parseSafeDouble() } } @@ -95,17 +132,17 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun <T> get(column: TableColumn<T>): T { val res: Any? = when (column) { RESOURCE_STATE_ID -> partition - RESOURCE_STATE_TIMESTAMP -> state.timestamp - RESOURCE_STATE_NCPUS -> state.cpuCores - RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity - RESOURCE_STATE_CPU_USAGE -> state.cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct - RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity - RESOURCE_STATE_MEM_USAGE -> state.memUsage - RESOURCE_STATE_DISK_READ -> state.diskRead - RESOURCE_STATE_DISK_WRITE -> state.diskWrite - RESOURCE_STATE_NET_RX -> state.netReceived - RESOURCE_STATE_NET_TX -> state.netTransmitted + RESOURCE_STATE_TIMESTAMP -> timestamp + RESOURCE_STATE_NCPUS -> cpuCores + RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity + RESOURCE_STATE_CPU_USAGE -> cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct + RESOURCE_STATE_MEM_CAPACITY -> memCapacity + RESOURCE_STATE_MEM_USAGE -> memUsage + RESOURCE_STATE_DISK_READ -> diskRead + RESOURCE_STATE_DISK_WRITE -> diskWrite + RESOURCE_STATE_NET_RX -> netReceived + RESOURCE_STATE_NET_TX -> netTransmitted else -> throw IllegalArgumentException("Invalid column") } @@ -119,7 +156,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun getInt(column: TableColumn<Int>): Int { return when (column) { - RESOURCE_STATE_NCPUS -> state.cpuCores + RESOURCE_STATE_NCPUS -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } @@ -130,15 +167,15 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun getDouble(column: TableColumn<Double>): Double { return when (column) { - RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity - RESOURCE_STATE_CPU_USAGE -> state.cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct - RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity - RESOURCE_STATE_MEM_USAGE -> state.memUsage - RESOURCE_STATE_DISK_READ -> state.diskRead - RESOURCE_STATE_DISK_WRITE -> state.diskWrite - RESOURCE_STATE_NET_RX -> state.netReceived - RESOURCE_STATE_NET_TX -> state.netTransmitted + RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity + RESOURCE_STATE_CPU_USAGE -> cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct + RESOURCE_STATE_MEM_CAPACITY -> memCapacity + RESOURCE_STATE_MEM_USAGE -> memUsage + RESOURCE_STATE_DISK_READ -> diskRead + RESOURCE_STATE_DISK_WRITE -> diskWrite + RESOURCE_STATE_NET_RX -> netReceived + RESOURCE_STATE_NET_TX -> netTransmitted else -> throw IllegalArgumentException("Invalid column") } } @@ -161,37 +198,62 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, } /** - * The current row state. + * Try to parse the current value safely as double. */ - private class RowState { - var timestamp: Instant? = null - var cpuCores = -1 - var cpuCapacity = Double.NaN - var cpuUsage = Double.NaN - var cpuUsagePct = Double.NaN - var memCapacity = Double.NaN - var memUsage = Double.NaN - var diskRead = Double.NaN - var diskWrite = Double.NaN - var netReceived = Double.NaN - var netTransmitted = Double.NaN + private fun parseSafeDouble(): Double { + if (!usesCommaDecimalSeparator) { + try { + return parser.doubleValue + } catch (e: JsonParseException) { + usesCommaDecimalSeparator = true + } + } - /** - * Reset the state. - */ - fun reset() { - timestamp = null - cpuCores = -1 - cpuCapacity = Double.NaN - cpuUsage = Double.NaN - cpuUsagePct = Double.NaN - memCapacity = Double.NaN - memUsage = Double.NaN - diskRead = Double.NaN - diskWrite = Double.NaN - netReceived = Double.NaN - netTransmitted = Double.NaN + val text = parser.text + if (text.isBlank()) { + return 0.0 } + + return nf.parse(text).toDouble() + } + + /** + * State fields of the reader. + */ + private var timestamp: Instant? = null + private var cpuCores = -1 + private var cpuCapacity = Double.NaN + private var cpuUsage = Double.NaN + private var cpuUsagePct = Double.NaN + private var memCapacity = Double.NaN + private var memUsage = Double.NaN + private var diskRead = Double.NaN + private var diskWrite = Double.NaN + private var netReceived = Double.NaN + private var netTransmitted = Double.NaN + + /** + * Reset the state. + */ + private fun reset() { + timestamp = null + cpuCores = -1 + cpuCapacity = Double.NaN + cpuUsage = Double.NaN + cpuUsagePct = Double.NaN + memCapacity = Double.NaN + memUsage = Double.NaN + diskRead = Double.NaN + diskWrite = Double.NaN + netReceived = Double.NaN + netTransmitted = Double.NaN + } + + /** + * The type of the timestamp in the trace. + */ + private enum class TimestampType { + UNDECIDED, DATE_TIME, EPOCH_MILLIS } companion object { @@ -199,15 +261,17 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, * The [CsvSchema] that is used to parse the trace. */ private val schema = CsvSchema.builder() - .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER) + .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER_OR_STRING) .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER) .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER) .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER) .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER) .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER) .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory usage [%]", CsvSchema.ColumnType.NUMBER) .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER) .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Disk size [GB]", CsvSchema.ColumnType.NUMBER) .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER) .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER) .setAllowComments(true) diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt new file mode 100644 index 00000000..bc4f0b7d --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension + +/** + * The resources [Table] in the Bitbrains format. + */ +internal class BitbrainsResourceTable(private val factory: CsvFactory, path: Path) : Table { + /** + * The VMs that belong to the table. + */ + private val vms = + Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + + override val name: String = TABLE_RESOURCES + + override val isSynthetic: Boolean = true + + override val columns: List<TableColumn<*>> = listOf(RESOURCE_ID) + + override fun newReader(): TableReader { + return BitbrainsResourceTableReader(factory, vms) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("Unknown partition $partition") + } + + override fun toString(): String = "BitbrainsResourceTable" +} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt new file mode 100644 index 00000000..c02dc5ae --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.nio.file.Path + +/** + * A [TableReader] for the Bitbrains resource table. + */ +internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms: Map<String, Path>) : TableReader { + /** + * An iterator to iterate over the resource entries. + */ + private val it = vms.iterator() + + override fun nextRow(): Boolean { + reset() + + while (it.hasNext()) { + val (name, path) = it.next() + + val parser = factory.createParser(path.toFile()) + val reader = BitbrainsResourceStateTableReader(name, parser) + + try { + if (!reader.nextRow()) { + continue + } + + id = reader.get(RESOURCE_STATE_ID) + return true + } finally { + reader.close() + } + } + + return false + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_ID -> true + else -> false + } + } + + override fun <T> get(column: TableColumn<T>): T { + val res: Any? = when (column) { + RESOURCE_ID -> id + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn<Boolean>): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn<Int>): Int { + throw IllegalArgumentException("Invalid column") + } + + override fun getLong(column: TableColumn<Long>): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn<Double>): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun close() {} + + /** + * State fields of the reader. + */ + private var id: String? = null + + /** + * Reset the state of the reader. + */ + private fun reset() { + id = null + } +} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt index 5a2d4243..bcd8dd52 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt @@ -30,16 +30,16 @@ import java.nio.file.Path * [Trace] implementation for the Bitbrains format. */ public class BitbrainsTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { - override val tables: List<String> = listOf(TABLE_RESOURCE_STATES) + override val tables: List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name + override fun containsTable(name: String): Boolean = tables.contains(name) override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null + return when (name) { + TABLE_RESOURCES -> BitbrainsResourceTable(factory, path) + TABLE_RESOURCE_STATES -> BitbrainsResourceStateTable(factory, path) + else -> null } - - return BitbrainsResourceStateTable(factory, path) } override fun toString(): String = "BitbrainsTrace[$path]" diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt index 550805d3..ff4a33f8 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt @@ -25,9 +25,7 @@ package org.opendc.trace.bitbrains import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import org.opendc.trace.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.RESOURCE_STATE_TIMESTAMP -import org.opendc.trace.TABLE_RESOURCE_STATES +import org.opendc.trace.* import java.net.URL /** @@ -58,7 +56,7 @@ class BitbrainsTraceFormatTest { val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) val trace = format.open(url) - assertEquals(listOf(TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) } @Test @@ -82,6 +80,23 @@ class BitbrainsTraceFormatTest { } @Test + fun testResources() { + val format = BitbrainsTraceFormat() + val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) + val trace = format.open(url) + + val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("bitbrains", reader.get(RESOURCE_ID)) }, + { assertFalse(reader.nextRow()) } + ) + + reader.close() + } + + @Test fun testSmoke() { val format = BitbrainsTraceFormat() val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt index 80a99d10..fd7bd068 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt @@ -34,18 +34,15 @@ internal class GwfTaskTable(private val factory: CsvFactory, private val url: UR override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - TASK_WORKFLOW_ID -> true - TASK_ID -> true - TASK_SUBMIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_ALLOC_NCPUS -> true - TASK_PARENTS -> true - else -> false - } - } + override val columns: List<TableColumn<*>> = listOf( + TASK_WORKFLOW_ID, + TASK_ID, + TASK_SUBMIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS + ) override fun newReader(): TableReader { return GwfTaskTableReader(factory.createParser(url)) diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index 64b7d465..39eb5520 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -26,24 +26,21 @@ import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* +import java.time.Duration +import java.time.Instant import java.util.regex.Pattern /** * A [TableReader] implementation for the GWF format. */ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { - /** - * The current parser state. - */ - private val state = RowState() - init { parser.schema = schema } override fun nextRow(): Boolean { // Reset the row state - state.reset() + reset() if (!nextStart()) { return false @@ -57,12 +54,12 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } when (parser.currentName) { - "WorkflowID" -> state.workflowId = parser.longValue - "JobID" -> state.jobId = parser.longValue - "SubmitTime" -> state.submitTime = parser.longValue - "RunTime" -> state.runtime = parser.longValue - "NProcs" -> state.nProcs = parser.intValue - "ReqNProcs" -> state.reqNProcs = parser.intValue + "WorkflowID" -> workflowId = parser.text + "JobID" -> jobId = parser.text + "SubmitTime" -> submitTime = Instant.ofEpochSecond(parser.longValue) + "RunTime" -> runtime = Duration.ofSeconds(parser.longValue) + "NProcs" -> nProcs = parser.intValue + "ReqNProcs" -> reqNProcs = parser.intValue "Dependencies" -> parseParents(parser.valueAsString) } } @@ -84,14 +81,14 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } override fun <T> get(column: TableColumn<T>): T { - val res: Any = when (column) { - TASK_WORKFLOW_ID -> state.workflowId - TASK_ID -> state.jobId - TASK_SUBMIT_TIME -> state.submitTime - TASK_RUNTIME -> state.runtime - TASK_REQ_NCPUS -> state.nProcs - TASK_ALLOC_NCPUS -> state.reqNProcs - TASK_PARENTS -> state.dependencies + val res: Any? = when (column) { + TASK_WORKFLOW_ID -> workflowId + TASK_ID -> jobId + TASK_SUBMIT_TIME -> submitTime + TASK_RUNTIME -> runtime + TASK_REQ_NCPUS -> nProcs + TASK_ALLOC_NCPUS -> reqNProcs + TASK_PARENTS -> dependencies else -> throw IllegalArgumentException("Invalid column") } @@ -105,20 +102,14 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { override fun getInt(column: TableColumn<Int>): Int { return when (column) { - TASK_REQ_NCPUS -> state.nProcs - TASK_ALLOC_NCPUS -> state.reqNProcs + TASK_REQ_NCPUS -> nProcs + TASK_ALLOC_NCPUS -> reqNProcs else -> throw IllegalArgumentException("Invalid column") } } override fun getLong(column: TableColumn<Long>): Long { - return when (column) { - TASK_WORKFLOW_ID -> state.workflowId - TASK_ID -> state.jobId - TASK_SUBMIT_TIME -> state.submitTime - TASK_RUNTIME -> state.runtime - else -> throw IllegalArgumentException("Invalid column") - } + throw IllegalArgumentException("Invalid column") } override fun getDouble(column: TableColumn<Double>): Double { @@ -166,29 +157,27 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } /** - * The current row state. + * Reader state fields. */ - private class RowState { - var workflowId = -1L - var jobId = -1L - var submitTime = -1L - var runtime = -1L - var nProcs = -1 - var reqNProcs = -1 - var dependencies = emptySet<Long>() + private var workflowId: String? = null + private var jobId: String? = null + private var submitTime: Instant? = null + private var runtime: Duration? = null + private var nProcs = -1 + private var reqNProcs = -1 + private var dependencies = emptySet<Long>() - /** - * Reset the state. - */ - fun reset() { - workflowId = -1 - jobId = -1 - submitTime = -1 - runtime = -1 - nProcs = -1 - reqNProcs = -1 - dependencies = emptySet() - } + /** + * Reset the state. + */ + private fun reset() { + workflowId = null + jobId = null + submitTime = null + runtime = null + nProcs = -1 + reqNProcs = -1 + dependencies = emptySet() } companion object { diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index 6b0568fe..b209b979 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -29,6 +29,8 @@ import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.opendc.trace.* import java.net.URL +import java.time.Duration +import java.time.Instant /** * Test suite for the [GwfTraceFormat] class. @@ -90,11 +92,11 @@ internal class GwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(0L, reader.getLong(TASK_WORKFLOW_ID)) }, - { assertEquals(1L, reader.getLong(TASK_ID)) }, - { assertEquals(16, reader.getLong(TASK_SUBMIT_TIME)) }, - { assertEquals(11, reader.getLong(TASK_RUNTIME)) }, - { assertEquals(setOf<Long>(), reader.get(TASK_PARENTS)) }, + { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals("1", reader.get(TASK_ID)) }, + { assertEquals(Instant.ofEpochSecond(16), reader.get(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) }, + { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) }, ) } diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt index 12a51a2f..7ec0d607 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt @@ -34,21 +34,18 @@ internal class SwfTaskTable(private val path: Path) : Table { override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - TASK_ID -> true - TASK_SUBMIT_TIME -> true - TASK_WAIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_ALLOC_NCPUS -> true - TASK_PARENTS -> true - TASK_STATUS -> true - TASK_GROUP_ID -> true - TASK_USER_ID -> true - else -> false - } - } + override val columns: List<TableColumn<*>> = listOf( + TASK_ID, + TASK_SUBMIT_TIME, + TASK_WAIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS, + TASK_STATUS, + TASK_GROUP_ID, + TASK_USER_ID + ) override fun newReader(): TableReader { val reader = path.bufferedReader() diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt index 5f879a54..3f49c770 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt @@ -24,6 +24,8 @@ package org.opendc.trace.swf import org.opendc.trace.* import java.io.BufferedReader +import java.time.Duration +import java.time.Instant /** * A [TableReader] implementation for the SWF format. @@ -85,10 +87,10 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea override fun <T> get(column: TableColumn<T>): T { val res: Any = when (column) { - TASK_ID -> getLong(TASK_ID) - TASK_SUBMIT_TIME -> getLong(TASK_SUBMIT_TIME) - TASK_WAIT_TIME -> getLong(TASK_WAIT_TIME) - TASK_RUNTIME -> getLong(TASK_RUNTIME) + TASK_ID -> fields[COL_JOB_ID] + TASK_SUBMIT_TIME -> Instant.ofEpochSecond(fields[COL_SUBMIT_TIME].toLong(10)) + TASK_WAIT_TIME -> Duration.ofSeconds(fields[COL_WAIT_TIME].toLong(10)) + TASK_RUNTIME -> Duration.ofSeconds(fields[COL_RUN_TIME].toLong(10)) TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS) TASK_ALLOC_NCPUS -> getInt(TASK_ALLOC_NCPUS) TASK_PARENTS -> { @@ -121,13 +123,7 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea } override fun getLong(column: TableColumn<Long>): Long { - return when (column) { - TASK_ID -> fields[COL_JOB_ID].toLong(10) - TASK_SUBMIT_TIME -> fields[COL_SUBMIT_TIME].toLong(10) - TASK_WAIT_TIME -> fields[COL_WAIT_TIME].toLong(10) - TASK_RUNTIME -> fields[COL_RUN_TIME].toLong(10) - else -> throw IllegalArgumentException("Invalid column") - } + throw IllegalArgumentException("Invalid column") } override fun getDouble(column: TableColumn<Double>): Double { diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt index 9686891b..828c2bfa 100644 --- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -85,10 +85,10 @@ internal class SwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(1, reader.getLong(TASK_ID)) }, + { assertEquals("1", reader.get(TASK_ID)) }, { assertEquals(306, reader.getInt(TASK_ALLOC_NCPUS)) }, { assertTrue(reader.nextRow()) }, - { assertEquals(2, reader.getLong(TASK_ID)) }, + { assertEquals("2", reader.get(TASK_ID)) }, { assertEquals(17, reader.getInt(TASK_ALLOC_NCPUS)) }, ) diff --git a/opendc-trace/opendc-trace-wfformat/build.gradle.kts b/opendc-trace/opendc-trace-wfformat/build.gradle.kts new file mode 100644 index 00000000..2d336d03 --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/build.gradle.kts @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Support for WfCommons workload traces in OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcTrace.opendcTraceApi) + + implementation(libs.jackson.core) +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt new file mode 100644 index 00000000..7b7f979f --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonFactory +import org.opendc.trace.* +import java.nio.file.Path + +/** + * A [Table] containing the tasks in a WfCommons workload trace. + */ +internal class WfFormatTaskTable(private val factory: JsonFactory, private val path: Path) : Table { + override val name: String = TABLE_TASKS + + override val isSynthetic: Boolean = false + + override val columns: List<TableColumn<*>> = listOf( + TASK_ID, + TASK_WORKFLOW_ID, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_PARENTS, + TASK_CHILDREN + ) + + override fun newReader(): TableReader { + val parser = factory.createParser(path.toFile()) + return WfFormatTaskTableReader(parser) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("Invalid partition $partition") + } + + override fun toString(): String = "WfFormatTaskTable" +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt new file mode 100644 index 00000000..4408ba5c --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt @@ -0,0 +1,234 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonParseException +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.core.JsonToken +import org.opendc.trace.* +import java.time.Duration +import kotlin.math.roundToInt + +/** + * A [TableReader] implementation for the WfCommons workload trace format. + */ +internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableReader { + /** + * The current nesting of the parser. + */ + private var level: ParserLevel = ParserLevel.TOP + + override fun nextRow(): Boolean { + reset() + + var hasJob = false + + while (!hasJob) { + when (level) { + ParserLevel.TOP -> { + val token = parser.nextToken() + + // Check whether the document is not empty and starts with an object + if (token == null) { + break + } else if (token != JsonToken.START_OBJECT) { + throw JsonParseException(parser, "Expected object", parser.currentLocation) + } else { + level = ParserLevel.TRACE + } + } + ParserLevel.TRACE -> { + // Seek for the workflow object in the file + if (!seekWorkflow()) { + break + } else if (!parser.isExpectedStartObjectToken) { + throw JsonParseException(parser, "Expected object", parser.currentLocation) + } else { + level = ParserLevel.WORKFLOW + } + } + ParserLevel.WORKFLOW -> { + // Seek for the jobs object in the file + level = if (!seekJobs()) { + ParserLevel.TRACE + } else if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array", parser.currentLocation) + } else { + ParserLevel.JOB + } + } + ParserLevel.JOB -> { + when (parser.nextToken()) { + JsonToken.END_ARRAY -> level = ParserLevel.WORKFLOW + JsonToken.START_OBJECT -> { + parseJob() + hasJob = true + break + } + else -> throw JsonParseException(parser, "Unexpected token", parser.currentLocation) + } + } + } + } + + return hasJob + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + TASK_ID -> true + TASK_WORKFLOW_ID -> true + TASK_RUNTIME -> true + TASK_REQ_NCPUS -> true + TASK_PARENTS -> true + TASK_CHILDREN -> true + else -> false + } + } + + override fun <T> get(column: TableColumn<T>): T { + val res: Any? = when (column) { + TASK_ID -> id + TASK_WORKFLOW_ID -> workflowId + TASK_RUNTIME -> runtime + TASK_PARENTS -> parents + TASK_CHILDREN -> children + TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS) + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn<Boolean>): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn<Int>): Int { + return when (column) { + TASK_REQ_NCPUS -> cores + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn<Long>): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn<Double>): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + parser.close() + } + + /** + * Parse the trace and seek until the workflow description. + */ + private fun seekWorkflow(): Boolean { + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "name" -> workflowId = parser.text + "workflow" -> return true + else -> parser.skipChildren() + } + } + + return false + } + + /** + * Parse the workflow description in the file and seek until the first job. + */ + private fun seekJobs(): Boolean { + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "jobs" -> return true + else -> parser.skipChildren() + } + } + + return false + } + + /** + * Parse a single job in the file. + */ + private fun parseJob() { + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "name" -> id = parser.text + "parents" -> parents = parseIds() + "children" -> children = parseIds() + "runtime" -> runtime = Duration.ofSeconds(parser.numberValue.toLong()) + "cores" -> cores = parser.floatValue.roundToInt() + else -> parser.skipChildren() + } + } + } + + /** + * Parse the parents/children of a job. + */ + private fun parseIds(): Set<String> { + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array", parser.currentLocation) + } + + val ids = mutableSetOf<String>() + + while (parser.nextToken() != JsonToken.END_ARRAY) { + if (parser.currentToken != JsonToken.VALUE_STRING) { + throw JsonParseException(parser, "Expected token", parser.currentLocation) + } + + ids.add(parser.valueAsString) + } + + return ids + } + + private enum class ParserLevel { + TOP, TRACE, WORKFLOW, JOB + } + + /** + * State fields for the parser. + */ + private var id: String? = null + private var workflowId: String? = null + private var runtime: Duration? = null + private var parents: Set<String>? = null + private var children: Set<String>? = null + private var cores = -1 + + private fun reset() { + id = null + runtime = null + parents = null + children = null + cores = -1 + } +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt new file mode 100644 index 00000000..2d9c79fb --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonFactory +import org.opendc.trace.TABLE_TASKS +import org.opendc.trace.Table +import org.opendc.trace.Trace +import java.nio.file.Path + +/** + * [Trace] implementation for the WfCommons workload trace format. + */ +public class WfFormatTrace internal constructor(private val factory: JsonFactory, private val path: Path) : Trace { + override val tables: List<String> = listOf(TABLE_TASKS) + + override fun containsTable(name: String): Boolean = TABLE_TASKS == name + + override fun getTable(name: String): Table? { + return when (name) { + TABLE_TASKS -> WfFormatTaskTable(factory, path) + else -> null + } + } + + override fun toString(): String = "WfFormatTrace[$path]" +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt new file mode 100644 index 00000000..ff8d054c --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonFactory +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * A [TraceFormat] implementation for the WfCommons workload trace format. + */ +public class WfFormatTraceFormat : TraceFormat { + /** + * The [JsonFactory] that is used to created JSON parsers. + */ + private val factory = JsonFactory() + + override val name: String = "wfformat" + + override fun open(url: URL): WfFormatTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return WfFormatTrace(factory, path) + } +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat new file mode 100644 index 00000000..ee3aa2f6 --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat @@ -0,0 +1 @@ +org.opendc.trace.wfformat.WfFormatTraceFormat diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt new file mode 100644 index 00000000..b07f27ed --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt @@ -0,0 +1,345 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonFactory +import com.fasterxml.jackson.core.JsonParseException +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows +import org.opendc.trace.TASK_ID +import org.opendc.trace.TASK_PARENTS + +/** + * Test suite for the [WfFormatTaskTableReader] class. + */ +internal class WfFormatTaskTableReaderTest { + /** + * The [JsonFactory] used to construct the parser. + */ + private val factory = JsonFactory() + + @Test + fun testEmptyInput() { + val content = "" + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertFalse(reader.nextRow()) + reader.close() + } + + @Test + fun testTopLevelArrayInput() { + val content = "[]" + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { + while (reader.nextRow()) { + continue + } + } + + reader.close() + } + + @Test + fun testNoWorkflow() { + val content = """ + { + "name": "eager-nextflow-chameleon" + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertDoesNotThrow { + while (reader.nextRow()) { + continue + } + } + + reader.close() + } + + @Test + fun testWorkflowArrayType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": [] + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { + while (reader.nextRow()) { + continue + } + } + + reader.close() + } + + @Test + fun testWorkflowNullType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": null + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { + while (reader.nextRow()) { + continue + } + } + + reader.close() + } + + @Test + fun testNoJobs() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertDoesNotThrow { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsObjectType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { "jobs": {} } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsNullType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { "jobs": null } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsInvalidChildType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [1] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsValidChildType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test" + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertTrue(reader.nextRow()) + assertEquals("test", reader.get(TASK_ID)) + assertFalse(reader.nextRow()) + + reader.close() + } + + @Test + fun testJobsInvalidParents() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": 1, + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsInvalidParentsItem() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": [1], + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsValidParents() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": ["1"] + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertTrue(reader.nextRow()) + assertEquals(setOf("1"), reader.get(TASK_PARENTS)) + assertFalse(reader.nextRow()) + + reader.close() + } + + @Test + fun testJobsInvalidSecondEntry() { + val content = """ + { + "workflow": { + "jobs": [ + { + "name": "test", + "parents": ["1"] + }, + "test" + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertDoesNotThrow { reader.nextRow() } + assertThrows<JsonParseException> { reader.nextRow() } + + reader.close() + } + + @Test + fun testDuplicateJobsArray() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": ["1"] + } + ], + "jobs": [ + { + "name": "test2", + "parents": ["test"] + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertTrue(reader.nextRow()) + assertTrue(reader.nextRow()) + assertEquals("test2", reader.get(TASK_ID)) + assertFalse(reader.nextRow()) + + reader.close() + } +} diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt new file mode 100644 index 00000000..0bfc8840 --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows +import org.opendc.trace.* +import java.io.File +import java.net.URL + +/** + * Test suite for the [WfFormatTraceFormat] class. + */ +class WfFormatTraceFormatTest { + @Test + fun testTraceExists() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + assertDoesNotThrow { format.open(input) } + } + + @Test + fun testTraceDoesNotExists() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + assertThrows<IllegalArgumentException> { format.open(URL(input.toString() + "help")) } + } + + @Test + fun testTables() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + val trace = format.open(input) + + assertEquals(listOf(TABLE_TASKS), trace.tables) + } + + @Test + fun testTableExists() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + val table = format.open(input).getTable(TABLE_TASKS) + + assertNotNull(table) + assertDoesNotThrow { table!!.newReader() } + } + + @Test + fun testTableDoesNotExist() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + val trace = format.open(input) + + assertFalse(trace.containsTable("test")) + assertNull(trace.getTable("test")) + } + + /** + * Smoke test for parsing WfCommons traces. + */ + @Test + fun testTableReader() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val trace = WfFormatTraceFormat().open(input) + val reader = trace.getTable(TABLE_TASKS)!!.newReader() + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("makebwaindex_mammoth_mt_krause.fasta", reader.get(TASK_ID)) }, + { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals(172000, reader.get(TASK_RUNTIME).toMillis()) }, + { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) }, + ) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("makeseqdict_mammoth_mt_krause.fasta", reader.get(TASK_ID)) }, + { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals(175000, reader.get(TASK_RUNTIME).toMillis()) }, + { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.get(TASK_PARENTS)) }, + ) + + reader.close() + } + + /** + * Test full iteration of the table. + */ + @Test + fun testTableReaderFull() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val trace = WfFormatTraceFormat().open(input) + val reader = trace.getTable(TABLE_TASKS)!!.newReader() + + assertDoesNotThrow { + while (reader.nextRow()) { + // reader.get(TASK_ID) + } + reader.close() + } + } + + @Test + fun testTableReaderPartition() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + val table = format.open(input).getTable(TABLE_TASKS)!! + + assertThrows<IllegalArgumentException> { table.newReader("test") } + } +} diff --git a/opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json b/opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json new file mode 100644 index 00000000..d21f024d --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json @@ -0,0 +1,1342 @@ +{ + "name": "eager-nextflow-chameleon", + "description": "Instance generated with WfCommons - https://wfcommons.org", + "createdAt": "2021-09-06T03:43:31.762479", + "schemaVersion": "1.2", + "author": { + "name": "cc", + "email": "support@wfcommons.org" + }, + "wms": { + "name": "Nextflow", + "version": "21.04.3", + "url": "https://www.nextflow.io" + }, + "workflow": { + "executedAt": "20210906T034331+0000", + "makespan": 275, + "jobs": [ + { + "name": "makebwaindex_mammoth_mt_krause.fasta", + "type": "compute", + "runtime": 172.182, + "command": { + "program": "makebwaindex", + "arguments": [ + "bwa", + "index", + "Mammoth_MT_Krause.fasta", + "mkdir", + "BWAIndex", + "&&", + "mv", + "Mammoth_MT_Krause.fasta*", + "BWAIndex" + ] + }, + "parents": [], + "children": [ + "makeseqdict_mammoth_mt_krause.fasta" + ], + "files": [], + "cores": 1.0, + "id": "ID000001", + "category": "makebwaindex", + "avgCPU": 5.8, + "bytesRead": 124, + "bytesWritten": 126, + "memory": 4248 + }, + { + "name": "makeseqdict_mammoth_mt_krause.fasta", + "type": "compute", + "runtime": 175.427, + "command": { + "program": "makeseqdict", + "arguments": [ + "picard", + "-Xmx6144M", + "CreateSequenceDictionary", + "R=Mammoth_MT_Krause.fasta", + "O=\"Mammoth_MT_Krause.dict\"" + ] + }, + "parents": [ + "makebwaindex_mammoth_mt_krause.fasta" + ], + "children": [ + "makefastaindex_mammoth_mt_krause.fasta" + ], + "files": [], + "cores": 1.0, + "id": "ID000003", + "category": "makeseqdict", + "avgCPU": 83.5, + "bytesRead": 22728, + "bytesWritten": 1300, + "memory": 104416 + }, + { + "name": "makefastaindex_mammoth_mt_krause.fasta", + "type": "compute", + "runtime": 170.797, + "command": { + "program": "makefastaindex", + "arguments": [ + "samtools", + "faidx", + "Mammoth_MT_Krause.fasta" + ] + }, + "parents": [ + "makeseqdict_mammoth_mt_krause.fasta" + ], + "children": [ + "output_documentation" + ], + "files": [], + "cores": 1.0, + "id": "ID000002", + "category": "makefastaindex", + "avgCPU": 23.8, + "bytesRead": 66, + "bytesWritten": 4, + "memory": 6096 + }, + { + "name": "output_documentation", + "type": "compute", + "runtime": 173.479, + "command": { + "program": "output_documentation", + "arguments": [ + "markdown_to_html.py", + "output.md", + "-o", + "results_description.html" + ] + }, + "parents": [ + "makefastaindex_mammoth_mt_krause.fasta" + ], + "children": [ + "get_software_versions" + ], + "files": [], + "cores": 1.0, + "id": "ID000005", + "category": "output_documentation", + "avgCPU": 84.0, + "bytesRead": 8222, + "bytesWritten": 15165, + "memory": 11488 + }, + { + "name": "get_software_versions", + "type": "compute", + "runtime": 183.445, + "command": { + "program": "get_software_versions", + "arguments": [ + "echo", + "2.3.5", + "&>", + "v_pipeline.txt", + "echo", + "21.04.3", + "&>", + "v_nextflow.txt", + "fastqc", + "--version", + "&>", + "v_fastqc.txt", + "2>&1", + "||", + "true", + "AdapterRemoval", + "--version", + "&>", + "v_adapterremoval.txt", + "2>&1", + "||", + "true", + "fastp", + "--version", + "&>", + "v_fastp.txt", + "2>&1", + "||", + "true", + "bwa", + "&>", + "v_bwa.txt", + "2>&1", + "||", + "true", + "circulargenerator", + "--help", + "|", + "head", + "-n", + "1", + "&>", + "v_circulargenerator.txt", + "2>&1", + "||", + "true", + "samtools", + "--version", + "&>", + "v_samtools.txt", + "2>&1", + "||", + "true", + "dedup", + "-v", + "&>", + "v_dedup.txt", + "2>&1", + "||", + "true", + "##", + "bioconda", + "recipe", + "of", + "picard", + "is", + "incorrectly", + "set", + "up", + "and", + "extra", + "warning", + "made", + "with", + "stderr,", + "this", + "ugly", + "command", + "ensures", + "only", + "version", + "exported", + "(", + "exec", + "7>&1", + "picard", + "MarkDuplicates", + "--version", + "2>&1", + ">&7", + "|", + "grep", + "-v", + "/", + ">&2", + ")", + "2>", + "v_markduplicates.txt", + "||", + "true", + "qualimap", + "--version", + "&>", + "v_qualimap.txt", + "2>&1", + "||", + "true", + "preseq", + "&>", + "v_preseq.txt", + "2>&1", + "||", + "true", + "gatk", + "--version", + "2>&1", + "|", + "head", + "-n", + "1", + ">", + "v_gatk.txt", + "2>&1", + "||", + "true", + "gatk3", + "--version", + "2>&1", + ">", + "v_gatk3.txt", + "2>&1", + "||", + "true", + "freebayes", + "--version", + "&>", + "v_freebayes.txt", + "2>&1", + "||", + "true", + "bedtools", + "--version", + "&>", + "v_bedtools.txt", + "2>&1", + "||", + "true", + "damageprofiler", + "--version", + "&>", + "v_damageprofiler.txt", + "2>&1", + "||", + "true", + "bam", + "--version", + "&>", + "v_bamutil.txt", + "2>&1", + "||", + "true", + "pmdtools", + "--version", + "&>", + "v_pmdtools.txt", + "2>&1", + "||", + "true", + "angsd", + "-h", + "|&", + "head", + "-n", + "1", + "|", + "cut", + "-d", + "-f3-4", + "&>", + "v_angsd.txt", + "2>&1", + "||", + "true", + "multivcfanalyzer", + "--help", + "|", + "head", + "-n", + "1", + "&>", + "v_multivcfanalyzer.txt", + "2>&1", + "||", + "true", + "malt-run", + "--help", + "|&", + "tail", + "-n", + "3", + "|", + "head", + "-n", + "1", + "|", + "cut", + "-f", + "2", + "-d(", + "|", + "cut", + "-f", + "1", + "-d", + ",", + "&>", + "v_malt.txt", + "2>&1", + "||", + "true", + "MaltExtract", + "--help", + "|", + "head", + "-n", + "2", + "|", + "tail", + "-n", + "1", + "&>", + "v_maltextract.txt", + "2>&1", + "||", + "true", + "multiqc", + "--version", + "&>", + "v_multiqc.txt", + "2>&1", + "||", + "true", + "vcf2genome", + "-h", + "|&", + "head", + "-n", + "1", + "&>", + "v_vcf2genome.txt", + "||", + "true", + "mtnucratio", + "--help", + "&>", + "v_mtnucratiocalculator.txt", + "||", + "true", + "sexdeterrmine", + "--version", + "&>", + "v_sexdeterrmine.txt", + "||", + "true", + "kraken2", + "--version", + "|", + "head", + "-n", + "1", + "&>", + "v_kraken.txt", + "||", + "true", + "endorS.py", + "--version", + "&>", + "v_endorSpy.txt", + "||", + "true", + "pileupCaller", + "--version", + "&>", + "v_sequencetools.txt", + "2>&1", + "||", + "true", + "bowtie2", + "--version", + "|", + "grep", + "-a", + "bowtie2-.*", + "-fdebug", + ">", + "v_bowtie2.txt", + "||", + "true", + "eigenstrat_snp_coverage", + "--version", + "|", + "cut", + "-d", + "-f2", + ">v_eigenstrat_snp_coverage.txt", + "||", + "true", + "mapDamage", + "--version", + ">", + "v_mapdamage.txt", + "||", + "true", + "bbduk.sh", + "|", + "grep", + "Last", + "modified", + "|", + "cut", + "-d", + "-f", + "3-99", + ">", + "v_bbduk.txt", + "||", + "true", + "scrape_software_versions.py", + "&>", + "software_versions_mqc.yaml" + ] + }, + "parents": [ + "output_documentation" + ], + "children": [ + "fastqc_jk2782_l1", + "fastqc_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000006", + "category": "get_software_versions", + "avgCPU": 147.8, + "bytesRead": 172760, + "bytesWritten": 1048, + "memory": 387324 + }, + { + "name": "fastqc_jk2782_l1", + "type": "compute", + "runtime": 175.205, + "command": { + "program": "fastqc", + "arguments": [ + "fastqc", + "-t", + "2", + "-q", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq.gz", + "JK2782_TGGCCGATCAACGA_L008_R2_001.fastq.gz.tengrand.fq.gz", + "rename", + "s/_fastqc.zip$/_raw_fastqc.zip/", + "*_fastqc.zip", + "rename", + "s/_fastqc.html$/_raw_fastqc.html/", + "*_fastqc.html" + ] + }, + "parents": [ + "get_software_versions" + ], + "children": [ + "adapter_removal_jk2782_l1", + "adapter_removal_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000007", + "category": "fastqc", + "avgCPU": 161.8, + "bytesRead": 35981, + "bytesWritten": 3967, + "memory": 270124 + }, + { + "name": "adapter_removal_jk2782_l1", + "type": "compute", + "runtime": 172.643, + "command": { + "program": "adapter_removal", + "arguments": [ + "mkdir", + "-p", + "output", + "AdapterRemoval", + "--file1", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq.gz", + "--file2", + "JK2782_TGGCCGATCAACGA_L008_R2_001.fastq.gz.tengrand.fq.gz", + "--basename", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe", + "--gzip", + "--threads", + "2", + "--qualitymax", + "41", + "--collapse", + "--trimns", + "--trimqualities", + "--adapter1", + "AGATCGGAAGAGCACACGTCTGAACTCCAGTCAC", + "--adapter2", + "AGATCGGAAGAGCGTCGTGTAGGGAAAGAGTGTA", + "--minlength", + "30", + "--minquality", + "20", + "--minadapteroverlap", + "1", + "cat", + "*.collapsed.gz", + "*.collapsed.truncated.gz", + "*.singleton.truncated.gz", + "*.pair1.truncated.gz", + "*.pair2.truncated.gz", + ">", + "output/JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.tmp.fq.gz", + "mv", + "*.settings", + "output/", + "##", + "Add", + "R_", + "and", + "L_", + "for", + "unmerged", + "reads", + "for", + "DeDup", + "compatibility", + "AdapterRemovalFixPrefix", + "-Xmx4g", + "output/JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.tmp.fq.gz", + "|", + "pigz", + "-p", + "1", + ">", + "output/JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz" + ] + }, + "parents": [ + "fastqc_jk2782_l1", + "fastqc_jk2802_l2" + ], + "children": [ + "fastqc_after_clipping_jk2782_l1", + "fastqc_after_clipping_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000008", + "category": "adapter_removal", + "avgCPU": 160.9, + "bytesRead": 17357, + "bytesWritten": 4405, + "memory": 79308 + }, + { + "name": "fastqc_jk2802_l2", + "type": "compute", + "runtime": 177.338, + "command": { + "program": "fastqc", + "arguments": [ + "fastqc", + "-q", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq.gz", + "rename", + "s/_fastqc.zip$/_raw_fastqc.zip/", + "*_fastqc.zip", + "rename", + "s/_fastqc.html$/_raw_fastqc.html/", + "*_fastqc.html" + ] + }, + "parents": [ + "get_software_versions" + ], + "children": [ + "adapter_removal_jk2782_l1", + "adapter_removal_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000009", + "category": "fastqc", + "avgCPU": 120.1, + "bytesRead": 24457, + "bytesWritten": 2181, + "memory": 181060 + }, + { + "name": "adapter_removal_jk2802_l2", + "type": "compute", + "runtime": 174.313, + "command": { + "program": "adapter_removal", + "arguments": [ + "mkdir", + "-p", + "output", + "AdapterRemoval", + "--file1", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq.gz", + "--basename", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se", + "--gzip", + "--threads", + "2", + "--qualitymax", + "41", + "--trimns", + "--trimqualities", + "--adapter1", + "AGATCGGAAGAGCACACGTCTGAACTCCAGTCAC", + "--adapter2", + "AGATCGGAAGAGCGTCGTGTAGGGAAAGAGTGTA", + "--minlength", + "30", + "--minquality", + "20", + "--minadapteroverlap", + "1", + "mv", + "*.settings", + "*.se.truncated.gz", + "output/" + ] + }, + "parents": [ + "fastqc_jk2782_l1", + "fastqc_jk2802_l2" + ], + "children": [ + "fastqc_after_clipping_jk2782_l1", + "fastqc_after_clipping_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000010", + "category": "adapter_removal", + "avgCPU": 106.5, + "bytesRead": 683, + "bytesWritten": 897, + "memory": 12136 + }, + { + "name": "fastqc_after_clipping_jk2782_l1", + "type": "compute", + "runtime": 15.371, + "command": { + "program": "fastqc_after_clipping", + "arguments": [ + "fastqc", + "-t", + "2", + "-q", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz" + ] + }, + "parents": [ + "adapter_removal_jk2782_l1", + "adapter_removal_jk2802_l2" + ], + "children": [ + "bwa_jk2802", + "bwa_jk2782" + ], + "files": [], + "cores": 2.0, + "id": "ID000013", + "category": "fastqc_after_clipping", + "avgCPU": 133.3, + "bytesRead": 23788, + "bytesWritten": 1998, + "memory": 215020 + }, + { + "name": "fastqc_after_clipping_jk2802_l2", + "type": "compute", + "runtime": 15.272, + "command": { + "program": "fastqc_after_clipping", + "arguments": [ + "fastqc", + "-t", + "2", + "-q", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se.truncated.gz" + ] + }, + "parents": [ + "adapter_removal_jk2782_l1", + "adapter_removal_jk2802_l2" + ], + "children": [ + "bwa_jk2802", + "bwa_jk2782" + ], + "files": [], + "cores": 2.0, + "id": "ID000014", + "category": "fastqc_after_clipping", + "avgCPU": 124.1, + "bytesRead": 23882, + "bytesWritten": 2143, + "memory": 213064 + }, + { + "name": "bwa_jk2802", + "type": "compute", + "runtime": 9.566, + "command": { + "program": "bwa", + "arguments": [ + "bwa", + "aln", + "-t", + "2", + "BWAIndex/Mammoth_MT_Krause.fasta", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se.truncated.gz", + "-n", + "0.04", + "-l", + "1024", + "-k", + "2", + "-o", + "1", + "-f", + "JK2802.sai", + "bwa", + "samse", + "-r", + "\"@RGtID:ILLUMINA-JK2802tSM:JK2802tPL:illuminatPU:ILLUMINA-JK2802-SE\"", + "BWAIndex/Mammoth_MT_Krause.fasta", + "JK2802.sai", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se.truncated.gz", + "|", + "samtools", + "sort", + "-@", + "1", + "-O", + "bam", + "-", + ">", + "\"JK2802\"_\"SE\".mapped.bam", + "samtools", + "index", + "\"JK2802\"_\"SE\".mapped.bam" + ] + }, + "parents": [ + "fastqc_after_clipping_jk2782_l1", + "fastqc_after_clipping_jk2802_l2" + ], + "children": [ + "samtools_flagstat_jk2782", + "samtools_flagstat_jk2802" + ], + "files": [], + "cores": 2.0, + "id": "ID000016", + "category": "bwa", + "avgCPU": 15.7, + "bytesRead": 3774, + "bytesWritten": 3367, + "memory": 10628 + }, + { + "name": "bwa_jk2782", + "type": "compute", + "runtime": 9.652, + "command": { + "program": "bwa", + "arguments": [ + "bwa", + "aln", + "-t", + "2", + "BWAIndex/Mammoth_MT_Krause.fasta", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz", + "-n", + "0.04", + "-l", + "1024", + "-k", + "2", + "-o", + "1", + "-f", + "JK2782.sai", + "bwa", + "samse", + "-r", + "\"@RGtID:ILLUMINA-JK2782tSM:JK2782tPL:illuminatPU:ILLUMINA-JK2782-PE\"", + "BWAIndex/Mammoth_MT_Krause.fasta", + "JK2782.sai", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz", + "|", + "samtools", + "sort", + "-@", + "1", + "-O", + "bam", + "-", + ">", + "\"JK2782\"_\"PE\".mapped.bam", + "samtools", + "index", + "\"JK2782\"_\"PE\".mapped.bam" + ] + }, + "parents": [ + "fastqc_after_clipping_jk2782_l1", + "fastqc_after_clipping_jk2802_l2" + ], + "children": [ + "samtools_flagstat_jk2782", + "samtools_flagstat_jk2802" + ], + "files": [], + "cores": 2.0, + "id": "ID000015", + "category": "bwa", + "avgCPU": 69.8, + "bytesRead": 3705, + "bytesWritten": 3355, + "memory": 12876 + }, + { + "name": "samtools_flagstat_jk2782", + "type": "compute", + "runtime": 13.011, + "command": { + "program": "samtools_flagstat", + "arguments": [ + "samtools", + "flagstat", + "JK2782_PE.mapped.bam", + ">", + "JK2782_flagstat.stats" + ] + }, + "parents": [ + "bwa_jk2802", + "bwa_jk2782" + ], + "children": [ + "markduplicates_jk2782", + "markduplicates_jk2802" + ], + "files": [], + "cores": 1.0, + "id": "ID000026", + "category": "samtools_flagstat", + "avgCPU": 30.1, + "bytesRead": 478, + "bytesWritten": 5, + "memory": 6468 + }, + { + "name": "samtools_flagstat_jk2802", + "type": "compute", + "runtime": 13.129, + "command": { + "program": "samtools_flagstat", + "arguments": [ + "samtools", + "flagstat", + "JK2802_SE.mapped.bam", + ">", + "JK2802_flagstat.stats" + ] + }, + "parents": [ + "bwa_jk2802", + "bwa_jk2782" + ], + "children": [ + "markduplicates_jk2782", + "markduplicates_jk2802" + ], + "files": [], + "cores": 1.0, + "id": "ID000024", + "category": "samtools_flagstat", + "avgCPU": 118.5, + "bytesRead": 551, + "bytesWritten": 5 + }, + { + "name": "markduplicates_jk2782", + "type": "compute", + "runtime": 22.655, + "command": { + "program": "markduplicates", + "arguments": [ + "mv", + "JK2782_PE.mapped.bam", + "JK2782.bam", + "picard", + "-Xmx4096M", + "MarkDuplicates", + "INPUT=JK2782.bam", + "OUTPUT=JK2782_rmdup.bam", + "REMOVE_DUPLICATES=TRUE", + "AS=TRUE", + "METRICS_FILE=\"JK2782_rmdup.metrics\"", + "VALIDATION_STRINGENCY=SILENT", + "samtools", + "index", + "JK2782_rmdup.bam" + ] + }, + "parents": [ + "samtools_flagstat_jk2782", + "samtools_flagstat_jk2802" + ], + "children": [ + "preseq_jk2782", + "preseq_jk2802" + ], + "files": [], + "cores": 2.0, + "id": "ID000021", + "category": "markduplicates", + "avgCPU": 173.6, + "bytesRead": 24055, + "bytesWritten": 2319, + "memory": 1400048 + }, + { + "name": "markduplicates_jk2802", + "type": "compute", + "runtime": 21.545, + "command": { + "program": "markduplicates", + "arguments": [ + "mv", + "JK2802_SE.mapped.bam", + "JK2802.bam", + "picard", + "-Xmx4096M", + "MarkDuplicates", + "INPUT=JK2802.bam", + "OUTPUT=JK2802_rmdup.bam", + "REMOVE_DUPLICATES=TRUE", + "AS=TRUE", + "METRICS_FILE=\"JK2802_rmdup.metrics\"", + "VALIDATION_STRINGENCY=SILENT", + "samtools", + "index", + "JK2802_rmdup.bam" + ] + }, + "parents": [ + "samtools_flagstat_jk2782", + "samtools_flagstat_jk2802" + ], + "children": [ + "preseq_jk2782", + "preseq_jk2802" + ], + "files": [], + "cores": 2.0, + "id": "ID000020", + "category": "markduplicates", + "avgCPU": 182.6, + "bytesRead": 24242, + "bytesWritten": 2466, + "memory": 1404624 + }, + { + "name": "preseq_jk2782", + "type": "compute", + "runtime": 12.299, + "command": { + "program": "preseq", + "arguments": [ + "preseq", + "c_curve", + "-s", + "1000", + "-o", + "JK2782_PE.mapped.ccurve", + "-B", + "JK2782_PE.mapped.bam" + ] + }, + "parents": [ + "markduplicates_jk2782", + "markduplicates_jk2802" + ], + "children": [ + "endorspy_jk2782", + "endorspy_jk2802" + ], + "files": [], + "cores": 1.0, + "id": "ID000030", + "category": "preseq", + "avgCPU": 81.9, + "bytesRead": 473, + "bytesWritten": 4, + "memory": 12032 + }, + { + "name": "preseq_jk2802", + "type": "compute", + "runtime": 10.188, + "command": { + "program": "preseq", + "arguments": [ + "preseq", + "c_curve", + "-s", + "1000", + "-o", + "JK2802_SE.mapped.ccurve", + "-B", + "JK2802_SE.mapped.bam" + ] + }, + "parents": [ + "markduplicates_jk2782", + "markduplicates_jk2802" + ], + "children": [ + "endorspy_jk2782", + "endorspy_jk2802" + ], + "files": [], + "cores": 1.0, + "id": "ID000027", + "category": "preseq", + "avgCPU": 77.6, + "bytesRead": 548, + "bytesWritten": 4, + "memory": 11972 + }, + { + "name": "endorspy_jk2782", + "type": "compute", + "runtime": 7.537, + "command": { + "program": "endorspy", + "arguments": [ + "endorS.py", + "-o", + "json", + "-n", + "JK2782", + "JK2782_flagstat.stats" + ] + }, + "parents": [ + "preseq_jk2782", + "preseq_jk2802" + ], + "children": [ + "damageprofiler_jk2802", + "damageprofiler_jk2782" + ], + "files": [], + "cores": 1.0, + "id": "ID000031", + "category": "endorspy", + "avgCPU": 44.7, + "bytesRead": 623, + "bytesWritten": 4, + "memory": 12264 + }, + { + "name": "endorspy_jk2802", + "type": "compute", + "runtime": 8.0, + "command": { + "program": "endorspy", + "arguments": [ + "endorS.py", + "-o", + "json", + "-n", + "JK2802", + "JK2802_flagstat.stats" + ] + }, + "parents": [ + "preseq_jk2782", + "preseq_jk2802" + ], + "children": [ + "damageprofiler_jk2802", + "damageprofiler_jk2782" + ], + "files": [], + "cores": 1.0, + "id": "ID000032", + "category": "endorspy", + "avgCPU": 54.0, + "bytesRead": 623, + "bytesWritten": 4, + "memory": 12224 + }, + { + "name": "damageprofiler_jk2802", + "type": "compute", + "runtime": 18.596, + "command": { + "program": "damageprofiler", + "arguments": [ + "damageprofiler", + "-Xmx4g", + "-i", + "JK2802_rmdup.bam", + "-r", + "Mammoth_MT_Krause.fasta", + "-l", + "100", + "-t", + "15", + "-o", + ".", + "-yaxis_damageplot", + "0.30" + ] + }, + "parents": [ + "endorspy_jk2782", + "endorspy_jk2802" + ], + "children": [ + "qualimap_jk2802", + "qualimap_jk2782" + ], + "files": [], + "cores": 1.0, + "id": "ID000033", + "category": "damageprofiler", + "avgCPU": 88.6, + "bytesRead": 25744, + "bytesWritten": 391, + "memory": 242940 + }, + { + "name": "damageprofiler_jk2782", + "type": "compute", + "runtime": 16.736, + "command": { + "program": "damageprofiler", + "arguments": [ + "damageprofiler", + "-Xmx4g", + "-i", + "JK2782_rmdup.bam", + "-r", + "Mammoth_MT_Krause.fasta", + "-l", + "100", + "-t", + "15", + "-o", + ".", + "-yaxis_damageplot", + "0.30" + ] + }, + "parents": [ + "endorspy_jk2782", + "endorspy_jk2802" + ], + "children": [ + "qualimap_jk2802", + "qualimap_jk2782" + ], + "files": [], + "cores": 1.0, + "id": "ID000036", + "category": "damageprofiler", + "avgCPU": 88.3, + "bytesRead": 25661, + "bytesWritten": 327, + "memory": 198276 + }, + { + "name": "qualimap_jk2802", + "type": "compute", + "runtime": 15.368, + "command": { + "program": "qualimap", + "arguments": [ + "qualimap", + "bamqc", + "-bam", + "JK2802_rmdup.bam", + "-nt", + "2", + "-outdir", + ".", + "-outformat", + "\"HTML\"", + "--java-mem-size=4G" + ] + }, + "parents": [ + "damageprofiler_jk2802", + "damageprofiler_jk2782" + ], + "children": [ + "multiqc_1" + ], + "files": [], + "cores": 2.0, + "id": "ID000053", + "category": "qualimap", + "avgCPU": 177.7, + "bytesRead": 35038, + "bytesWritten": 1712, + "memory": 209440 + }, + { + "name": "qualimap_jk2782", + "type": "compute", + "runtime": 14.223, + "command": { + "program": "qualimap", + "arguments": [ + "qualimap", + "bamqc", + "-bam", + "JK2782_rmdup.bam", + "-nt", + "2", + "-outdir", + ".", + "-outformat", + "\"HTML\"", + "--java-mem-size=4G" + ] + }, + "parents": [ + "damageprofiler_jk2802", + "damageprofiler_jk2782" + ], + "children": [ + "multiqc_1" + ], + "files": [], + "cores": 2.0, + "id": "ID000054", + "category": "qualimap", + "avgCPU": 181.9, + "bytesRead": 34954, + "bytesWritten": 1937, + "memory": 232196 + }, + { + "name": "multiqc_1", + "type": "compute", + "runtime": 46.376, + "command": { + "program": "multiqc", + "arguments": [ + "multiqc", + "-f", + "multiqc_config.yaml", + "." + ] + }, + "parents": [ + "qualimap_jk2802", + "qualimap_jk2782" + ], + "children": [], + "files": [], + "cores": 1.0, + "id": "ID000056", + "category": "multiqc", + "avgCPU": 93.0, + "bytesRead": 1215169, + "bytesWritten": 22599, + "memory": 139496 + } + ] + } +} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt index be26f540..74202718 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt @@ -35,21 +35,18 @@ internal class WtfTaskTable(private val path: Path) : Table { override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - TASK_ID -> true - TASK_WORKFLOW_ID -> true - TASK_SUBMIT_TIME -> true - TASK_WAIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_PARENTS -> true - TASK_CHILDREN -> true - TASK_GROUP_ID -> true - TASK_USER_ID -> true - else -> false - } - } + override val columns: List<TableColumn<*>> = listOf( + TASK_ID, + TASK_WORKFLOW_ID, + TASK_SUBMIT_TIME, + TASK_WAIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_PARENTS, + TASK_CHILDREN, + TASK_GROUP_ID, + TASK_USER_ID + ) override fun newReader(): TableReader { val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")) diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt index b6789542..5e2463f8 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt @@ -25,6 +25,8 @@ package org.opendc.trace.wtf import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Duration +import java.time.Instant /** * A [TableReader] implementation for the WTF format. @@ -61,14 +63,14 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic @Suppress("UNCHECKED_CAST") val res: Any = when (column) { - TASK_ID -> record["id"] - TASK_WORKFLOW_ID -> record["workflow_id"] - TASK_SUBMIT_TIME -> record["ts_submit"] - TASK_WAIT_TIME -> record["wait_time"] - TASK_RUNTIME -> record["runtime"] + TASK_ID -> (record["id"] as Long).toString() + TASK_WORKFLOW_ID -> (record["workflow_id"] as Long).toString() + TASK_SUBMIT_TIME -> Instant.ofEpochMilli(record["ts_submit"] as Long) + TASK_WAIT_TIME -> Duration.ofMillis(record["wait_time"] as Long) + TASK_RUNTIME -> Duration.ofMillis(record["runtime"] as Long) TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt() - TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"] as Long }.toSet() - TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"] as Long }.toSet() + TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet() + TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet() TASK_GROUP_ID -> record["group_id"] TASK_USER_ID -> record["user_id"] else -> throw IllegalArgumentException("Invalid column") @@ -94,16 +96,7 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic } override fun getLong(column: TableColumn<Long>): Long { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (column) { - TASK_ID -> record["id"] as Long - TASK_WORKFLOW_ID -> record["workflow_id"] as Long - TASK_SUBMIT_TIME -> record["ts_submit"] as Long - TASK_WAIT_TIME -> record["wait_time"] as Long - TASK_RUNTIME -> record["runtime"] as Long - else -> throw IllegalArgumentException("Invalid column") - } + throw IllegalArgumentException("Invalid column") } override fun getDouble(column: TableColumn<Double>): Double { diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt index 7eff0f5a..a755a107 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt @@ -43,5 +43,5 @@ public class WtfTrace internal constructor(private val path: Path) : Trace { return WtfTaskTable(path) } - override fun toString(): String = "SwfTrace[$path]" + override fun toString(): String = "WtfTrace[$path]" } diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index a05a523e..b155f265 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -28,6 +28,8 @@ import org.junit.jupiter.api.assertThrows import org.opendc.trace.* import java.io.File import java.net.URL +import java.time.Duration +import java.time.Instant /** * Test suite for the [WtfTraceFormat] class. @@ -91,20 +93,20 @@ class WtfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(362334516345962206, reader.getLong(TASK_ID)) }, - { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) }, - { assertEquals(245604, reader.getLong(TASK_SUBMIT_TIME)) }, - { assertEquals(8163, reader.getLong(TASK_RUNTIME)) }, - { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) }, + { assertEquals("362334516345962206", reader.get(TASK_ID)) }, + { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals(Instant.ofEpochMilli(245604), reader.get(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofMillis(8163), reader.get(TASK_RUNTIME)) }, + { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) }, ) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(502010169100446658, reader.getLong(TASK_ID)) }, - { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) }, - { assertEquals(251325, reader.getLong(TASK_SUBMIT_TIME)) }, - { assertEquals(8216, reader.getLong(TASK_RUNTIME)) }, - { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) }, + { assertEquals("502010169100446658", reader.get(TASK_ID)) }, + { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals(Instant.ofEpochMilli(251325), reader.get(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofMillis(8216), reader.get(TASK_RUNTIME)) }, + { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) }, ) reader.close() |
