From 83497bd122983c7fc0d5cbbdc80b98d58c50cd75 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 10 Sep 2021 16:06:53 +0200 Subject: feat(trace): Support Materna traces from GWA This change adds support for the Materna traces from the Grid Workload Trace Archive (GWA). These traces are very similar to the Bitbrains traces, so they share the same base implementation. --- .../trace/bitbrains/BitbrainsResourceStateTable.kt | 1 + .../bitbrains/BitbrainsResourceStateTableReader.kt | 192 ++++++++++++++------- 2 files changed, 129 insertions(+), 64 deletions(-) (limited to 'opendc-trace') diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt index 767ef919..846d5c8a 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt @@ -41,6 +41,7 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, priv Files.walk(path, 1) .filter { !Files.isDirectory(it) && it.extension == "csv" } .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() override val name: String = TABLE_RESOURCE_STATES diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt index 5687ac7f..dab784c2 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt @@ -22,20 +22,42 @@ package org.opendc.trace.bitbrains +import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* +import java.text.NumberFormat import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneOffset +import java.time.format.DateTimeFormatter +import java.time.format.DateTimeParseException +import java.util.* /** * A [TableReader] for the Bitbrains resource state table. */ internal class BitbrainsResourceStateTableReader(private val partition: String, private val parser: CsvParser) : TableReader { /** - * The current parser state. + * The [DateTimeFormatter] used to parse the timestamps in case of the Materna trace. */ - private val state = RowState() + private val formatter = DateTimeFormatter.ofPattern("dd.MM.yyyy HH:mm:ss") + + /** + * The type of timestamps in the trace. + */ + private var timestampType: TimestampType = TimestampType.UNDECIDED + + /** + * The [NumberFormat] used to parse doubles containing a comma. + */ + private val nf = NumberFormat.getInstance(Locale.GERMAN) + + /** + * A flag to indicate that the trace contains decimals with a comma separator. + */ + private var usesCommaDecimalSeparator = false init { parser.schema = schema @@ -43,7 +65,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun nextRow(): Boolean { // Reset the row state - state.reset() + reset() if (!nextStart()) { return false @@ -57,17 +79,32 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, } when (parser.currentName) { - "Timestamp [ms]" -> state.timestamp = Instant.ofEpochSecond(parser.longValue) - "CPU cores" -> state.cpuCores = parser.intValue - "CPU capacity provisioned [MHZ]" -> state.cpuCapacity = parser.doubleValue - "CPU usage [MHZ]" -> state.cpuUsage = parser.doubleValue - "CPU usage [%]" -> state.cpuUsagePct = parser.doubleValue - "Memory capacity provisioned [KB]" -> state.memCapacity = parser.doubleValue - "Memory usage [KB]" -> state.memUsage = parser.doubleValue - "Disk read throughput [KB/s]" -> state.diskRead = parser.doubleValue - "Disk write throughput [KB/s]" -> state.diskWrite = parser.doubleValue - "Network received throughput [KB/s]" -> state.netReceived = parser.doubleValue - "Network transmitted throughput [KB/s]" -> state.netTransmitted = parser.doubleValue + "Timestamp [ms]" -> { + timestamp = when (timestampType) { + TimestampType.UNDECIDED -> { + try { + val res = LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC) + timestampType = TimestampType.DATE_TIME + res + } catch (e: DateTimeParseException) { + timestampType = TimestampType.EPOCH_MILLIS + Instant.ofEpochSecond(parser.longValue) + } + } + TimestampType.DATE_TIME -> LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC) + TimestampType.EPOCH_MILLIS -> Instant.ofEpochSecond(parser.longValue) + } + } + "CPU cores" -> cpuCores = parser.intValue + "CPU capacity provisioned [MHZ]" -> cpuCapacity = parseSafeDouble() + "CPU usage [MHZ]" -> cpuUsage = parseSafeDouble() + "CPU usage [%]" -> cpuUsagePct = parseSafeDouble() / 100.0 // Convert to range [0, 1] + "Memory capacity provisioned [KB]" -> memCapacity = parseSafeDouble() + "Memory usage [KB]" -> memUsage = parseSafeDouble() + "Disk read throughput [KB/s]" -> diskRead = parseSafeDouble() + "Disk write throughput [KB/s]" -> diskWrite = parseSafeDouble() + "Network received throughput [KB/s]" -> netReceived = parseSafeDouble() + "Network transmitted throughput [KB/s]" -> netTransmitted = parseSafeDouble() } } @@ -95,17 +132,17 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun get(column: TableColumn): T { val res: Any? = when (column) { RESOURCE_STATE_ID -> partition - RESOURCE_STATE_TIMESTAMP -> state.timestamp - RESOURCE_STATE_NCPUS -> state.cpuCores - RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity - RESOURCE_STATE_CPU_USAGE -> state.cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct - RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity - RESOURCE_STATE_MEM_USAGE -> state.memUsage - RESOURCE_STATE_DISK_READ -> state.diskRead - RESOURCE_STATE_DISK_WRITE -> state.diskWrite - RESOURCE_STATE_NET_RX -> state.netReceived - RESOURCE_STATE_NET_TX -> state.netTransmitted + RESOURCE_STATE_TIMESTAMP -> timestamp + RESOURCE_STATE_NCPUS -> cpuCores + RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity + RESOURCE_STATE_CPU_USAGE -> cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct + RESOURCE_STATE_MEM_CAPACITY -> memCapacity + RESOURCE_STATE_MEM_USAGE -> memUsage + RESOURCE_STATE_DISK_READ -> diskRead + RESOURCE_STATE_DISK_WRITE -> diskWrite + RESOURCE_STATE_NET_RX -> netReceived + RESOURCE_STATE_NET_TX -> netTransmitted else -> throw IllegalArgumentException("Invalid column") } @@ -119,7 +156,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun getInt(column: TableColumn): Int { return when (column) { - RESOURCE_STATE_NCPUS -> state.cpuCores + RESOURCE_STATE_NCPUS -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } @@ -130,15 +167,15 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun getDouble(column: TableColumn): Double { return when (column) { - RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity - RESOURCE_STATE_CPU_USAGE -> state.cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsagePct - RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity - RESOURCE_STATE_MEM_USAGE -> state.memUsage - RESOURCE_STATE_DISK_READ -> state.diskRead - RESOURCE_STATE_DISK_WRITE -> state.diskWrite - RESOURCE_STATE_NET_RX -> state.netReceived - RESOURCE_STATE_NET_TX -> state.netTransmitted + RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity + RESOURCE_STATE_CPU_USAGE -> cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct + RESOURCE_STATE_MEM_CAPACITY -> memCapacity + RESOURCE_STATE_MEM_USAGE -> memUsage + RESOURCE_STATE_DISK_READ -> diskRead + RESOURCE_STATE_DISK_WRITE -> diskWrite + RESOURCE_STATE_NET_RX -> netReceived + RESOURCE_STATE_NET_TX -> netTransmitted else -> throw IllegalArgumentException("Invalid column") } } @@ -161,37 +198,62 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, } /** - * The current row state. + * Try to parse the current value safely as double. */ - private class RowState { - var timestamp: Instant? = null - var cpuCores = -1 - var cpuCapacity = Double.NaN - var cpuUsage = Double.NaN - var cpuUsagePct = Double.NaN - var memCapacity = Double.NaN - var memUsage = Double.NaN - var diskRead = Double.NaN - var diskWrite = Double.NaN - var netReceived = Double.NaN - var netTransmitted = Double.NaN + private fun parseSafeDouble(): Double { + if (!usesCommaDecimalSeparator) { + try { + return parser.doubleValue + } catch (e: JsonParseException) { + usesCommaDecimalSeparator = true + } + } - /** - * Reset the state. - */ - fun reset() { - timestamp = null - cpuCores = -1 - cpuCapacity = Double.NaN - cpuUsage = Double.NaN - cpuUsagePct = Double.NaN - memCapacity = Double.NaN - memUsage = Double.NaN - diskRead = Double.NaN - diskWrite = Double.NaN - netReceived = Double.NaN - netTransmitted = Double.NaN + val text = parser.text + if (text.isBlank()) { + return 0.0 } + + return nf.parse(text).toDouble() + } + + /** + * State fields of the reader. + */ + private var timestamp: Instant? = null + private var cpuCores = -1 + private var cpuCapacity = Double.NaN + private var cpuUsage = Double.NaN + private var cpuUsagePct = Double.NaN + private var memCapacity = Double.NaN + private var memUsage = Double.NaN + private var diskRead = Double.NaN + private var diskWrite = Double.NaN + private var netReceived = Double.NaN + private var netTransmitted = Double.NaN + + /** + * Reset the state. + */ + private fun reset() { + timestamp = null + cpuCores = -1 + cpuCapacity = Double.NaN + cpuUsage = Double.NaN + cpuUsagePct = Double.NaN + memCapacity = Double.NaN + memUsage = Double.NaN + diskRead = Double.NaN + diskWrite = Double.NaN + netReceived = Double.NaN + netTransmitted = Double.NaN + } + + /** + * The type of the timestamp in the trace. + */ + private enum class TimestampType { + UNDECIDED, DATE_TIME, EPOCH_MILLIS } companion object { @@ -199,15 +261,17 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, * The [CsvSchema] that is used to parse the trace. */ private val schema = CsvSchema.builder() - .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER) + .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER_OR_STRING) .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER) .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER) .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER) .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER) .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER) .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory usage [%]", CsvSchema.ColumnType.NUMBER) .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER) .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Disk size [GB]", CsvSchema.ColumnType.NUMBER) .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER) .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER) .setAllowComments(true) -- cgit v1.2.3 From fa08b63bd749e9fbe1a1d04ef2ebd7a86453fa4b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 11 Sep 2021 10:52:30 +0200 Subject: perf(trace): Keep reader state in own class This change removes the external class that holds the state of the reader and instead puts the state in the reader implementation. Maintaining a separate class for the state increases the complexity and has worse performance characteristics due to the bytecode produced by Kotlin for property accesses. --- .../kotlin/org/opendc/trace/ResourceColumns.kt | 4 +- .../org/opendc/trace/gwf/GwfTaskTableReader.kt | 85 ++++++++++------------ .../main/kotlin/org/opendc/trace/wtf/WtfTrace.kt | 2 +- 3 files changed, 42 insertions(+), 49 deletions(-) (limited to 'opendc-trace') diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt index 44dec95b..e2e5ea6d 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/ResourceColumns.kt @@ -41,7 +41,7 @@ public val RESOURCE_START_TIME: TableColumn = TableColumn("resource:sta * End time for the resource. */ @JvmField -public val RESOURCE_END_TIME: TableColumn = TableColumn("resource:end_time", Instant::class.java) +public val RESOURCE_STOP_TIME: TableColumn = TableColumn("resource:stop_time", Instant::class.java) /** * Number of CPUs for the resource. @@ -50,7 +50,7 @@ public val RESOURCE_END_TIME: TableColumn = TableColumn("resource:end_t public val RESOURCE_NCPUS: TableColumn = intColumn("resource:num_cpus") /** - * Memory capacity for the resource. + * Memory capacity for the resource in KB. */ @JvmField public val RESOURCE_MEM_CAPACITY: TableColumn = doubleColumn("resource:mem_capacity") diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index 64b7d465..fb9099bf 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -32,18 +32,13 @@ import java.util.regex.Pattern * A [TableReader] implementation for the GWF format. */ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { - /** - * The current parser state. - */ - private val state = RowState() - init { parser.schema = schema } override fun nextRow(): Boolean { // Reset the row state - state.reset() + reset() if (!nextStart()) { return false @@ -57,12 +52,12 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } when (parser.currentName) { - "WorkflowID" -> state.workflowId = parser.longValue - "JobID" -> state.jobId = parser.longValue - "SubmitTime" -> state.submitTime = parser.longValue - "RunTime" -> state.runtime = parser.longValue - "NProcs" -> state.nProcs = parser.intValue - "ReqNProcs" -> state.reqNProcs = parser.intValue + "WorkflowID" -> workflowId = parser.longValue + "JobID" -> jobId = parser.longValue + "SubmitTime" -> submitTime = parser.longValue + "RunTime" -> runtime = parser.longValue + "NProcs" -> nProcs = parser.intValue + "ReqNProcs" -> reqNProcs = parser.intValue "Dependencies" -> parseParents(parser.valueAsString) } } @@ -85,13 +80,13 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { override fun get(column: TableColumn): T { val res: Any = when (column) { - TASK_WORKFLOW_ID -> state.workflowId - TASK_ID -> state.jobId - TASK_SUBMIT_TIME -> state.submitTime - TASK_RUNTIME -> state.runtime - TASK_REQ_NCPUS -> state.nProcs - TASK_ALLOC_NCPUS -> state.reqNProcs - TASK_PARENTS -> state.dependencies + TASK_WORKFLOW_ID -> workflowId + TASK_ID -> jobId + TASK_SUBMIT_TIME -> submitTime + TASK_RUNTIME -> runtime + TASK_REQ_NCPUS -> nProcs + TASK_ALLOC_NCPUS -> reqNProcs + TASK_PARENTS -> dependencies else -> throw IllegalArgumentException("Invalid column") } @@ -105,18 +100,18 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { override fun getInt(column: TableColumn): Int { return when (column) { - TASK_REQ_NCPUS -> state.nProcs - TASK_ALLOC_NCPUS -> state.reqNProcs + TASK_REQ_NCPUS -> nProcs + TASK_ALLOC_NCPUS -> reqNProcs else -> throw IllegalArgumentException("Invalid column") } } override fun getLong(column: TableColumn): Long { return when (column) { - TASK_WORKFLOW_ID -> state.workflowId - TASK_ID -> state.jobId - TASK_SUBMIT_TIME -> state.submitTime - TASK_RUNTIME -> state.runtime + TASK_WORKFLOW_ID -> workflowId + TASK_ID -> jobId + TASK_SUBMIT_TIME -> submitTime + TASK_RUNTIME -> runtime else -> throw IllegalArgumentException("Invalid column") } } @@ -166,29 +161,27 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } /** - * The current row state. + * Reader state fields. */ - private class RowState { - var workflowId = -1L - var jobId = -1L - var submitTime = -1L - var runtime = -1L - var nProcs = -1 - var reqNProcs = -1 - var dependencies = emptySet() + private var workflowId = -1L + private var jobId = -1L + private var submitTime = -1L + private var runtime = -1L + private var nProcs = -1 + private var reqNProcs = -1 + private var dependencies = emptySet() - /** - * Reset the state. - */ - fun reset() { - workflowId = -1 - jobId = -1 - submitTime = -1 - runtime = -1 - nProcs = -1 - reqNProcs = -1 - dependencies = emptySet() - } + /** + * Reset the state. + */ + private fun reset() { + workflowId = -1 + jobId = -1 + submitTime = -1 + runtime = -1 + nProcs = -1 + reqNProcs = -1 + dependencies = emptySet() } companion object { diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt index 7eff0f5a..a755a107 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt @@ -43,5 +43,5 @@ public class WtfTrace internal constructor(private val path: Path) : Trace { return WtfTaskTable(path) } - override fun toString(): String = "SwfTrace[$path]" + override fun toString(): String = "WtfTrace[$path]" } -- cgit v1.2.3 From b7be3400bb4b21d0cd7021e2baf1f6ce43aba189 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 10 Sep 2021 22:10:22 +0200 Subject: feat(trace): Add support for WfCommons (WorkflowHub) traces This change adds support for reading WfCommons workflow traces in OpenDC. This functionality is available in the new `opendc-trace-wfformat` module. --- .../main/kotlin/org/opendc/trace/TaskColumns.kt | 17 +- .../org/opendc/trace/gwf/GwfTaskTableReader.kt | 36 +- .../org/opendc/trace/gwf/GwfTraceFormatTest.kt | 12 +- .../org/opendc/trace/swf/SwfTaskTableReader.kt | 18 +- .../org/opendc/trace/swf/SwfTraceFormatTest.kt | 4 +- .../opendc-trace-wfformat/build.gradle.kts | 37 + .../org/opendc/trace/wfformat/WfFormatTaskTable.kt | 59 + .../trace/wfformat/WfFormatTaskTableReader.kt | 234 ++++ .../org/opendc/trace/wfformat/WfFormatTrace.kt | 47 + .../opendc/trace/wfformat/WfFormatTraceFormat.kt | 47 + .../services/org.opendc.trace.spi.TraceFormat | 1 + .../trace/wfformat/WfFormatTaskTableReaderTest.kt | 345 +++++ .../trace/wfformat/WfFormatTraceFormatTest.kt | 133 ++ .../src/test/resources/trace.json | 1342 ++++++++++++++++++++ .../org/opendc/trace/wtf/WtfTaskTableReader.kt | 27 +- .../org/opendc/trace/wtf/WtfTraceFormatTest.kt | 22 +- 16 files changed, 2309 insertions(+), 72 deletions(-) create mode 100644 opendc-trace/opendc-trace-wfformat/build.gradle.kts create mode 100644 opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt create mode 100644 opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt create mode 100644 opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt create mode 100644 opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt create mode 100644 opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat create mode 100644 opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt create mode 100644 opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt create mode 100644 opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json (limited to 'opendc-trace') diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt index 88bbc623..46920dce 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt @@ -23,49 +23,52 @@ @file:JvmName("TaskColumns") package org.opendc.trace +import java.time.Duration +import java.time.Instant + /** * A column containing the task identifier. */ @JvmField -public val TASK_ID: TableColumn = longColumn("task:id") +public val TASK_ID: TableColumn = stringColumn("task:id") /** * A column containing the identifier of the workflow. */ @JvmField -public val TASK_WORKFLOW_ID: TableColumn = longColumn("task:workflow_id") +public val TASK_WORKFLOW_ID: TableColumn = stringColumn("task:workflow_id") /** * A column containing the submit time of the task. */ @JvmField -public val TASK_SUBMIT_TIME: TableColumn = longColumn("task:submit_time") +public val TASK_SUBMIT_TIME: TableColumn = TableColumn("task:submit_time", type = Instant::class.java) /** * A column containing the wait time of the task. */ @JvmField -public val TASK_WAIT_TIME: TableColumn = longColumn("task:wait_time") +public val TASK_WAIT_TIME: TableColumn = TableColumn("task:wait_time", type = Instant::class.java) /** * A column containing the runtime time of the task. */ @JvmField -public val TASK_RUNTIME: TableColumn = longColumn("task:runtime") +public val TASK_RUNTIME: TableColumn = TableColumn("task:runtime", type = Duration::class.java) /** * A column containing the parents of a task. */ @Suppress("UNCHECKED_CAST") @JvmField -public val TASK_PARENTS: TableColumn> = TableColumn("task:parents", type = Set::class.java as Class>) +public val TASK_PARENTS: TableColumn> = TableColumn("task:parents", type = Set::class.java as Class>) /** * A column containing the children of a task. */ @Suppress("UNCHECKED_CAST") @JvmField -public val TASK_CHILDREN: TableColumn> = TableColumn("task:children", type = Set::class.java as Class>) +public val TASK_CHILDREN: TableColumn> = TableColumn("task:children", type = Set::class.java as Class>) /** * A column containing the requested CPUs of a task. diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index fb9099bf..39eb5520 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -26,6 +26,8 @@ import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* +import java.time.Duration +import java.time.Instant import java.util.regex.Pattern /** @@ -52,10 +54,10 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } when (parser.currentName) { - "WorkflowID" -> workflowId = parser.longValue - "JobID" -> jobId = parser.longValue - "SubmitTime" -> submitTime = parser.longValue - "RunTime" -> runtime = parser.longValue + "WorkflowID" -> workflowId = parser.text + "JobID" -> jobId = parser.text + "SubmitTime" -> submitTime = Instant.ofEpochSecond(parser.longValue) + "RunTime" -> runtime = Duration.ofSeconds(parser.longValue) "NProcs" -> nProcs = parser.intValue "ReqNProcs" -> reqNProcs = parser.intValue "Dependencies" -> parseParents(parser.valueAsString) @@ -79,7 +81,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } override fun get(column: TableColumn): T { - val res: Any = when (column) { + val res: Any? = when (column) { TASK_WORKFLOW_ID -> workflowId TASK_ID -> jobId TASK_SUBMIT_TIME -> submitTime @@ -107,13 +109,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } override fun getLong(column: TableColumn): Long { - return when (column) { - TASK_WORKFLOW_ID -> workflowId - TASK_ID -> jobId - TASK_SUBMIT_TIME -> submitTime - TASK_RUNTIME -> runtime - else -> throw IllegalArgumentException("Invalid column") - } + throw IllegalArgumentException("Invalid column") } override fun getDouble(column: TableColumn): Double { @@ -163,10 +159,10 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { /** * Reader state fields. */ - private var workflowId = -1L - private var jobId = -1L - private var submitTime = -1L - private var runtime = -1L + private var workflowId: String? = null + private var jobId: String? = null + private var submitTime: Instant? = null + private var runtime: Duration? = null private var nProcs = -1 private var reqNProcs = -1 private var dependencies = emptySet() @@ -175,10 +171,10 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { * Reset the state. */ private fun reset() { - workflowId = -1 - jobId = -1 - submitTime = -1 - runtime = -1 + workflowId = null + jobId = null + submitTime = null + runtime = null nProcs = -1 reqNProcs = -1 dependencies = emptySet() diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index 6b0568fe..b209b979 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -29,6 +29,8 @@ import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.opendc.trace.* import java.net.URL +import java.time.Duration +import java.time.Instant /** * Test suite for the [GwfTraceFormat] class. @@ -90,11 +92,11 @@ internal class GwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(0L, reader.getLong(TASK_WORKFLOW_ID)) }, - { assertEquals(1L, reader.getLong(TASK_ID)) }, - { assertEquals(16, reader.getLong(TASK_SUBMIT_TIME)) }, - { assertEquals(11, reader.getLong(TASK_RUNTIME)) }, - { assertEquals(setOf(), reader.get(TASK_PARENTS)) }, + { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals("1", reader.get(TASK_ID)) }, + { assertEquals(Instant.ofEpochSecond(16), reader.get(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) }, + { assertEquals(emptySet(), reader.get(TASK_PARENTS)) }, ) } diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt index 5f879a54..3f49c770 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt @@ -24,6 +24,8 @@ package org.opendc.trace.swf import org.opendc.trace.* import java.io.BufferedReader +import java.time.Duration +import java.time.Instant /** * A [TableReader] implementation for the SWF format. @@ -85,10 +87,10 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea override fun get(column: TableColumn): T { val res: Any = when (column) { - TASK_ID -> getLong(TASK_ID) - TASK_SUBMIT_TIME -> getLong(TASK_SUBMIT_TIME) - TASK_WAIT_TIME -> getLong(TASK_WAIT_TIME) - TASK_RUNTIME -> getLong(TASK_RUNTIME) + TASK_ID -> fields[COL_JOB_ID] + TASK_SUBMIT_TIME -> Instant.ofEpochSecond(fields[COL_SUBMIT_TIME].toLong(10)) + TASK_WAIT_TIME -> Duration.ofSeconds(fields[COL_WAIT_TIME].toLong(10)) + TASK_RUNTIME -> Duration.ofSeconds(fields[COL_RUN_TIME].toLong(10)) TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS) TASK_ALLOC_NCPUS -> getInt(TASK_ALLOC_NCPUS) TASK_PARENTS -> { @@ -121,13 +123,7 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea } override fun getLong(column: TableColumn): Long { - return when (column) { - TASK_ID -> fields[COL_JOB_ID].toLong(10) - TASK_SUBMIT_TIME -> fields[COL_SUBMIT_TIME].toLong(10) - TASK_WAIT_TIME -> fields[COL_WAIT_TIME].toLong(10) - TASK_RUNTIME -> fields[COL_RUN_TIME].toLong(10) - else -> throw IllegalArgumentException("Invalid column") - } + throw IllegalArgumentException("Invalid column") } override fun getDouble(column: TableColumn): Double { diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt index 9686891b..828c2bfa 100644 --- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -85,10 +85,10 @@ internal class SwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(1, reader.getLong(TASK_ID)) }, + { assertEquals("1", reader.get(TASK_ID)) }, { assertEquals(306, reader.getInt(TASK_ALLOC_NCPUS)) }, { assertTrue(reader.nextRow()) }, - { assertEquals(2, reader.getLong(TASK_ID)) }, + { assertEquals("2", reader.get(TASK_ID)) }, { assertEquals(17, reader.getInt(TASK_ALLOC_NCPUS)) }, ) diff --git a/opendc-trace/opendc-trace-wfformat/build.gradle.kts b/opendc-trace/opendc-trace-wfformat/build.gradle.kts new file mode 100644 index 00000000..2d336d03 --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/build.gradle.kts @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Support for WfCommons workload traces in OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcTrace.opendcTraceApi) + + implementation(libs.jackson.core) +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt new file mode 100644 index 00000000..907bf7ff --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonFactory +import org.opendc.trace.* +import java.nio.file.Path + +/** + * A [Table] containing the tasks in a WfCommons workload trace. + */ +internal class WfFormatTaskTable(private val factory: JsonFactory, private val path: Path) : Table { + override val name: String = TABLE_TASKS + + override val isSynthetic: Boolean = false + + override fun isSupported(column: TableColumn<*>): Boolean { + return when (column) { + TASK_ID -> true + TASK_WORKFLOW_ID -> true + TASK_RUNTIME -> true + TASK_REQ_NCPUS -> true + TASK_PARENTS -> true + TASK_CHILDREN -> true + else -> false + } + } + + override fun newReader(): TableReader { + val parser = factory.createParser(path.toFile()) + return WfFormatTaskTableReader(parser) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("Invalid partition $partition") + } + + override fun toString(): String = "WfFormatTaskTable" +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt new file mode 100644 index 00000000..4408ba5c --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt @@ -0,0 +1,234 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonParseException +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.core.JsonToken +import org.opendc.trace.* +import java.time.Duration +import kotlin.math.roundToInt + +/** + * A [TableReader] implementation for the WfCommons workload trace format. + */ +internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableReader { + /** + * The current nesting of the parser. + */ + private var level: ParserLevel = ParserLevel.TOP + + override fun nextRow(): Boolean { + reset() + + var hasJob = false + + while (!hasJob) { + when (level) { + ParserLevel.TOP -> { + val token = parser.nextToken() + + // Check whether the document is not empty and starts with an object + if (token == null) { + break + } else if (token != JsonToken.START_OBJECT) { + throw JsonParseException(parser, "Expected object", parser.currentLocation) + } else { + level = ParserLevel.TRACE + } + } + ParserLevel.TRACE -> { + // Seek for the workflow object in the file + if (!seekWorkflow()) { + break + } else if (!parser.isExpectedStartObjectToken) { + throw JsonParseException(parser, "Expected object", parser.currentLocation) + } else { + level = ParserLevel.WORKFLOW + } + } + ParserLevel.WORKFLOW -> { + // Seek for the jobs object in the file + level = if (!seekJobs()) { + ParserLevel.TRACE + } else if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array", parser.currentLocation) + } else { + ParserLevel.JOB + } + } + ParserLevel.JOB -> { + when (parser.nextToken()) { + JsonToken.END_ARRAY -> level = ParserLevel.WORKFLOW + JsonToken.START_OBJECT -> { + parseJob() + hasJob = true + break + } + else -> throw JsonParseException(parser, "Unexpected token", parser.currentLocation) + } + } + } + } + + return hasJob + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + TASK_ID -> true + TASK_WORKFLOW_ID -> true + TASK_RUNTIME -> true + TASK_REQ_NCPUS -> true + TASK_PARENTS -> true + TASK_CHILDREN -> true + else -> false + } + } + + override fun get(column: TableColumn): T { + val res: Any? = when (column) { + TASK_ID -> id + TASK_WORKFLOW_ID -> workflowId + TASK_RUNTIME -> runtime + TASK_PARENTS -> parents + TASK_CHILDREN -> children + TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS) + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn): Int { + return when (column) { + TASK_REQ_NCPUS -> cores + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + parser.close() + } + + /** + * Parse the trace and seek until the workflow description. + */ + private fun seekWorkflow(): Boolean { + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "name" -> workflowId = parser.text + "workflow" -> return true + else -> parser.skipChildren() + } + } + + return false + } + + /** + * Parse the workflow description in the file and seek until the first job. + */ + private fun seekJobs(): Boolean { + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "jobs" -> return true + else -> parser.skipChildren() + } + } + + return false + } + + /** + * Parse a single job in the file. + */ + private fun parseJob() { + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "name" -> id = parser.text + "parents" -> parents = parseIds() + "children" -> children = parseIds() + "runtime" -> runtime = Duration.ofSeconds(parser.numberValue.toLong()) + "cores" -> cores = parser.floatValue.roundToInt() + else -> parser.skipChildren() + } + } + } + + /** + * Parse the parents/children of a job. + */ + private fun parseIds(): Set { + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array", parser.currentLocation) + } + + val ids = mutableSetOf() + + while (parser.nextToken() != JsonToken.END_ARRAY) { + if (parser.currentToken != JsonToken.VALUE_STRING) { + throw JsonParseException(parser, "Expected token", parser.currentLocation) + } + + ids.add(parser.valueAsString) + } + + return ids + } + + private enum class ParserLevel { + TOP, TRACE, WORKFLOW, JOB + } + + /** + * State fields for the parser. + */ + private var id: String? = null + private var workflowId: String? = null + private var runtime: Duration? = null + private var parents: Set? = null + private var children: Set? = null + private var cores = -1 + + private fun reset() { + id = null + runtime = null + parents = null + children = null + cores = -1 + } +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt new file mode 100644 index 00000000..2d9c79fb --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonFactory +import org.opendc.trace.TABLE_TASKS +import org.opendc.trace.Table +import org.opendc.trace.Trace +import java.nio.file.Path + +/** + * [Trace] implementation for the WfCommons workload trace format. + */ +public class WfFormatTrace internal constructor(private val factory: JsonFactory, private val path: Path) : Trace { + override val tables: List = listOf(TABLE_TASKS) + + override fun containsTable(name: String): Boolean = TABLE_TASKS == name + + override fun getTable(name: String): Table? { + return when (name) { + TABLE_TASKS -> WfFormatTaskTable(factory, path) + else -> null + } + } + + override fun toString(): String = "WfFormatTrace[$path]" +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt new file mode 100644 index 00000000..ff8d054c --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonFactory +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * A [TraceFormat] implementation for the WfCommons workload trace format. + */ +public class WfFormatTraceFormat : TraceFormat { + /** + * The [JsonFactory] that is used to created JSON parsers. + */ + private val factory = JsonFactory() + + override val name: String = "wfformat" + + override fun open(url: URL): WfFormatTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return WfFormatTrace(factory, path) + } +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat new file mode 100644 index 00000000..ee3aa2f6 --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat @@ -0,0 +1 @@ +org.opendc.trace.wfformat.WfFormatTraceFormat diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt new file mode 100644 index 00000000..b07f27ed --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt @@ -0,0 +1,345 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonFactory +import com.fasterxml.jackson.core.JsonParseException +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows +import org.opendc.trace.TASK_ID +import org.opendc.trace.TASK_PARENTS + +/** + * Test suite for the [WfFormatTaskTableReader] class. + */ +internal class WfFormatTaskTableReaderTest { + /** + * The [JsonFactory] used to construct the parser. + */ + private val factory = JsonFactory() + + @Test + fun testEmptyInput() { + val content = "" + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertFalse(reader.nextRow()) + reader.close() + } + + @Test + fun testTopLevelArrayInput() { + val content = "[]" + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows { + while (reader.nextRow()) { + continue + } + } + + reader.close() + } + + @Test + fun testNoWorkflow() { + val content = """ + { + "name": "eager-nextflow-chameleon" + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertDoesNotThrow { + while (reader.nextRow()) { + continue + } + } + + reader.close() + } + + @Test + fun testWorkflowArrayType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": [] + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows { + while (reader.nextRow()) { + continue + } + } + + reader.close() + } + + @Test + fun testWorkflowNullType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": null + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows { + while (reader.nextRow()) { + continue + } + } + + reader.close() + } + + @Test + fun testNoJobs() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertDoesNotThrow { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsObjectType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { "jobs": {} } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsNullType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { "jobs": null } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsInvalidChildType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [1] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsValidChildType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test" + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertTrue(reader.nextRow()) + assertEquals("test", reader.get(TASK_ID)) + assertFalse(reader.nextRow()) + + reader.close() + } + + @Test + fun testJobsInvalidParents() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": 1, + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsInvalidParentsItem() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": [1], + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsValidParents() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": ["1"] + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertTrue(reader.nextRow()) + assertEquals(setOf("1"), reader.get(TASK_PARENTS)) + assertFalse(reader.nextRow()) + + reader.close() + } + + @Test + fun testJobsInvalidSecondEntry() { + val content = """ + { + "workflow": { + "jobs": [ + { + "name": "test", + "parents": ["1"] + }, + "test" + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertDoesNotThrow { reader.nextRow() } + assertThrows { reader.nextRow() } + + reader.close() + } + + @Test + fun testDuplicateJobsArray() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": ["1"] + } + ], + "jobs": [ + { + "name": "test2", + "parents": ["test"] + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertTrue(reader.nextRow()) + assertTrue(reader.nextRow()) + assertEquals("test2", reader.get(TASK_ID)) + assertFalse(reader.nextRow()) + + reader.close() + } +} diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt new file mode 100644 index 00000000..0bfc8840 --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows +import org.opendc.trace.* +import java.io.File +import java.net.URL + +/** + * Test suite for the [WfFormatTraceFormat] class. + */ +class WfFormatTraceFormatTest { + @Test + fun testTraceExists() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + assertDoesNotThrow { format.open(input) } + } + + @Test + fun testTraceDoesNotExists() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + assertThrows { format.open(URL(input.toString() + "help")) } + } + + @Test + fun testTables() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + val trace = format.open(input) + + assertEquals(listOf(TABLE_TASKS), trace.tables) + } + + @Test + fun testTableExists() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + val table = format.open(input).getTable(TABLE_TASKS) + + assertNotNull(table) + assertDoesNotThrow { table!!.newReader() } + } + + @Test + fun testTableDoesNotExist() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + val trace = format.open(input) + + assertFalse(trace.containsTable("test")) + assertNull(trace.getTable("test")) + } + + /** + * Smoke test for parsing WfCommons traces. + */ + @Test + fun testTableReader() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val trace = WfFormatTraceFormat().open(input) + val reader = trace.getTable(TABLE_TASKS)!!.newReader() + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("makebwaindex_mammoth_mt_krause.fasta", reader.get(TASK_ID)) }, + { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals(172000, reader.get(TASK_RUNTIME).toMillis()) }, + { assertEquals(emptySet(), reader.get(TASK_PARENTS)) }, + ) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("makeseqdict_mammoth_mt_krause.fasta", reader.get(TASK_ID)) }, + { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals(175000, reader.get(TASK_RUNTIME).toMillis()) }, + { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.get(TASK_PARENTS)) }, + ) + + reader.close() + } + + /** + * Test full iteration of the table. + */ + @Test + fun testTableReaderFull() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val trace = WfFormatTraceFormat().open(input) + val reader = trace.getTable(TABLE_TASKS)!!.newReader() + + assertDoesNotThrow { + while (reader.nextRow()) { + // reader.get(TASK_ID) + } + reader.close() + } + } + + @Test + fun testTableReaderPartition() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + val table = format.open(input).getTable(TABLE_TASKS)!! + + assertThrows { table.newReader("test") } + } +} diff --git a/opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json b/opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json new file mode 100644 index 00000000..d21f024d --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json @@ -0,0 +1,1342 @@ +{ + "name": "eager-nextflow-chameleon", + "description": "Instance generated with WfCommons - https://wfcommons.org", + "createdAt": "2021-09-06T03:43:31.762479", + "schemaVersion": "1.2", + "author": { + "name": "cc", + "email": "support@wfcommons.org" + }, + "wms": { + "name": "Nextflow", + "version": "21.04.3", + "url": "https://www.nextflow.io" + }, + "workflow": { + "executedAt": "20210906T034331+0000", + "makespan": 275, + "jobs": [ + { + "name": "makebwaindex_mammoth_mt_krause.fasta", + "type": "compute", + "runtime": 172.182, + "command": { + "program": "makebwaindex", + "arguments": [ + "bwa", + "index", + "Mammoth_MT_Krause.fasta", + "mkdir", + "BWAIndex", + "&&", + "mv", + "Mammoth_MT_Krause.fasta*", + "BWAIndex" + ] + }, + "parents": [], + "children": [ + "makeseqdict_mammoth_mt_krause.fasta" + ], + "files": [], + "cores": 1.0, + "id": "ID000001", + "category": "makebwaindex", + "avgCPU": 5.8, + "bytesRead": 124, + "bytesWritten": 126, + "memory": 4248 + }, + { + "name": "makeseqdict_mammoth_mt_krause.fasta", + "type": "compute", + "runtime": 175.427, + "command": { + "program": "makeseqdict", + "arguments": [ + "picard", + "-Xmx6144M", + "CreateSequenceDictionary", + "R=Mammoth_MT_Krause.fasta", + "O=\"Mammoth_MT_Krause.dict\"" + ] + }, + "parents": [ + "makebwaindex_mammoth_mt_krause.fasta" + ], + "children": [ + "makefastaindex_mammoth_mt_krause.fasta" + ], + "files": [], + "cores": 1.0, + "id": "ID000003", + "category": "makeseqdict", + "avgCPU": 83.5, + "bytesRead": 22728, + "bytesWritten": 1300, + "memory": 104416 + }, + { + "name": "makefastaindex_mammoth_mt_krause.fasta", + "type": "compute", + "runtime": 170.797, + "command": { + "program": "makefastaindex", + "arguments": [ + "samtools", + "faidx", + "Mammoth_MT_Krause.fasta" + ] + }, + "parents": [ + "makeseqdict_mammoth_mt_krause.fasta" + ], + "children": [ + "output_documentation" + ], + "files": [], + "cores": 1.0, + "id": "ID000002", + "category": "makefastaindex", + "avgCPU": 23.8, + "bytesRead": 66, + "bytesWritten": 4, + "memory": 6096 + }, + { + "name": "output_documentation", + "type": "compute", + "runtime": 173.479, + "command": { + "program": "output_documentation", + "arguments": [ + "markdown_to_html.py", + "output.md", + "-o", + "results_description.html" + ] + }, + "parents": [ + "makefastaindex_mammoth_mt_krause.fasta" + ], + "children": [ + "get_software_versions" + ], + "files": [], + "cores": 1.0, + "id": "ID000005", + "category": "output_documentation", + "avgCPU": 84.0, + "bytesRead": 8222, + "bytesWritten": 15165, + "memory": 11488 + }, + { + "name": "get_software_versions", + "type": "compute", + "runtime": 183.445, + "command": { + "program": "get_software_versions", + "arguments": [ + "echo", + "2.3.5", + "&>", + "v_pipeline.txt", + "echo", + "21.04.3", + "&>", + "v_nextflow.txt", + "fastqc", + "--version", + "&>", + "v_fastqc.txt", + "2>&1", + "||", + "true", + "AdapterRemoval", + "--version", + "&>", + "v_adapterremoval.txt", + "2>&1", + "||", + "true", + "fastp", + "--version", + "&>", + "v_fastp.txt", + "2>&1", + "||", + "true", + "bwa", + "&>", + "v_bwa.txt", + "2>&1", + "||", + "true", + "circulargenerator", + "--help", + "|", + "head", + "-n", + "1", + "&>", + "v_circulargenerator.txt", + "2>&1", + "||", + "true", + "samtools", + "--version", + "&>", + "v_samtools.txt", + "2>&1", + "||", + "true", + "dedup", + "-v", + "&>", + "v_dedup.txt", + "2>&1", + "||", + "true", + "##", + "bioconda", + "recipe", + "of", + "picard", + "is", + "incorrectly", + "set", + "up", + "and", + "extra", + "warning", + "made", + "with", + "stderr,", + "this", + "ugly", + "command", + "ensures", + "only", + "version", + "exported", + "(", + "exec", + "7>&1", + "picard", + "MarkDuplicates", + "--version", + "2>&1", + ">&7", + "|", + "grep", + "-v", + "/", + ">&2", + ")", + "2>", + "v_markduplicates.txt", + "||", + "true", + "qualimap", + "--version", + "&>", + "v_qualimap.txt", + "2>&1", + "||", + "true", + "preseq", + "&>", + "v_preseq.txt", + "2>&1", + "||", + "true", + "gatk", + "--version", + "2>&1", + "|", + "head", + "-n", + "1", + ">", + "v_gatk.txt", + "2>&1", + "||", + "true", + "gatk3", + "--version", + "2>&1", + ">", + "v_gatk3.txt", + "2>&1", + "||", + "true", + "freebayes", + "--version", + "&>", + "v_freebayes.txt", + "2>&1", + "||", + "true", + "bedtools", + "--version", + "&>", + "v_bedtools.txt", + "2>&1", + "||", + "true", + "damageprofiler", + "--version", + "&>", + "v_damageprofiler.txt", + "2>&1", + "||", + "true", + "bam", + "--version", + "&>", + "v_bamutil.txt", + "2>&1", + "||", + "true", + "pmdtools", + "--version", + "&>", + "v_pmdtools.txt", + "2>&1", + "||", + "true", + "angsd", + "-h", + "|&", + "head", + "-n", + "1", + "|", + "cut", + "-d", + "-f3-4", + "&>", + "v_angsd.txt", + "2>&1", + "||", + "true", + "multivcfanalyzer", + "--help", + "|", + "head", + "-n", + "1", + "&>", + "v_multivcfanalyzer.txt", + "2>&1", + "||", + "true", + "malt-run", + "--help", + "|&", + "tail", + "-n", + "3", + "|", + "head", + "-n", + "1", + "|", + "cut", + "-f", + "2", + "-d(", + "|", + "cut", + "-f", + "1", + "-d", + ",", + "&>", + "v_malt.txt", + "2>&1", + "||", + "true", + "MaltExtract", + "--help", + "|", + "head", + "-n", + "2", + "|", + "tail", + "-n", + "1", + "&>", + "v_maltextract.txt", + "2>&1", + "||", + "true", + "multiqc", + "--version", + "&>", + "v_multiqc.txt", + "2>&1", + "||", + "true", + "vcf2genome", + "-h", + "|&", + "head", + "-n", + "1", + "&>", + "v_vcf2genome.txt", + "||", + "true", + "mtnucratio", + "--help", + "&>", + "v_mtnucratiocalculator.txt", + "||", + "true", + "sexdeterrmine", + "--version", + "&>", + "v_sexdeterrmine.txt", + "||", + "true", + "kraken2", + "--version", + "|", + "head", + "-n", + "1", + "&>", + "v_kraken.txt", + "||", + "true", + "endorS.py", + "--version", + "&>", + "v_endorSpy.txt", + "||", + "true", + "pileupCaller", + "--version", + "&>", + "v_sequencetools.txt", + "2>&1", + "||", + "true", + "bowtie2", + "--version", + "|", + "grep", + "-a", + "bowtie2-.*", + "-fdebug", + ">", + "v_bowtie2.txt", + "||", + "true", + "eigenstrat_snp_coverage", + "--version", + "|", + "cut", + "-d", + "-f2", + ">v_eigenstrat_snp_coverage.txt", + "||", + "true", + "mapDamage", + "--version", + ">", + "v_mapdamage.txt", + "||", + "true", + "bbduk.sh", + "|", + "grep", + "Last", + "modified", + "|", + "cut", + "-d", + "-f", + "3-99", + ">", + "v_bbduk.txt", + "||", + "true", + "scrape_software_versions.py", + "&>", + "software_versions_mqc.yaml" + ] + }, + "parents": [ + "output_documentation" + ], + "children": [ + "fastqc_jk2782_l1", + "fastqc_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000006", + "category": "get_software_versions", + "avgCPU": 147.8, + "bytesRead": 172760, + "bytesWritten": 1048, + "memory": 387324 + }, + { + "name": "fastqc_jk2782_l1", + "type": "compute", + "runtime": 175.205, + "command": { + "program": "fastqc", + "arguments": [ + "fastqc", + "-t", + "2", + "-q", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq.gz", + "JK2782_TGGCCGATCAACGA_L008_R2_001.fastq.gz.tengrand.fq.gz", + "rename", + "s/_fastqc.zip$/_raw_fastqc.zip/", + "*_fastqc.zip", + "rename", + "s/_fastqc.html$/_raw_fastqc.html/", + "*_fastqc.html" + ] + }, + "parents": [ + "get_software_versions" + ], + "children": [ + "adapter_removal_jk2782_l1", + "adapter_removal_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000007", + "category": "fastqc", + "avgCPU": 161.8, + "bytesRead": 35981, + "bytesWritten": 3967, + "memory": 270124 + }, + { + "name": "adapter_removal_jk2782_l1", + "type": "compute", + "runtime": 172.643, + "command": { + "program": "adapter_removal", + "arguments": [ + "mkdir", + "-p", + "output", + "AdapterRemoval", + "--file1", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq.gz", + "--file2", + "JK2782_TGGCCGATCAACGA_L008_R2_001.fastq.gz.tengrand.fq.gz", + "--basename", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe", + "--gzip", + "--threads", + "2", + "--qualitymax", + "41", + "--collapse", + "--trimns", + "--trimqualities", + "--adapter1", + "AGATCGGAAGAGCACACGTCTGAACTCCAGTCAC", + "--adapter2", + "AGATCGGAAGAGCGTCGTGTAGGGAAAGAGTGTA", + "--minlength", + "30", + "--minquality", + "20", + "--minadapteroverlap", + "1", + "cat", + "*.collapsed.gz", + "*.collapsed.truncated.gz", + "*.singleton.truncated.gz", + "*.pair1.truncated.gz", + "*.pair2.truncated.gz", + ">", + "output/JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.tmp.fq.gz", + "mv", + "*.settings", + "output/", + "##", + "Add", + "R_", + "and", + "L_", + "for", + "unmerged", + "reads", + "for", + "DeDup", + "compatibility", + "AdapterRemovalFixPrefix", + "-Xmx4g", + "output/JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.tmp.fq.gz", + "|", + "pigz", + "-p", + "1", + ">", + "output/JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz" + ] + }, + "parents": [ + "fastqc_jk2782_l1", + "fastqc_jk2802_l2" + ], + "children": [ + "fastqc_after_clipping_jk2782_l1", + "fastqc_after_clipping_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000008", + "category": "adapter_removal", + "avgCPU": 160.9, + "bytesRead": 17357, + "bytesWritten": 4405, + "memory": 79308 + }, + { + "name": "fastqc_jk2802_l2", + "type": "compute", + "runtime": 177.338, + "command": { + "program": "fastqc", + "arguments": [ + "fastqc", + "-q", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq.gz", + "rename", + "s/_fastqc.zip$/_raw_fastqc.zip/", + "*_fastqc.zip", + "rename", + "s/_fastqc.html$/_raw_fastqc.html/", + "*_fastqc.html" + ] + }, + "parents": [ + "get_software_versions" + ], + "children": [ + "adapter_removal_jk2782_l1", + "adapter_removal_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000009", + "category": "fastqc", + "avgCPU": 120.1, + "bytesRead": 24457, + "bytesWritten": 2181, + "memory": 181060 + }, + { + "name": "adapter_removal_jk2802_l2", + "type": "compute", + "runtime": 174.313, + "command": { + "program": "adapter_removal", + "arguments": [ + "mkdir", + "-p", + "output", + "AdapterRemoval", + "--file1", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq.gz", + "--basename", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se", + "--gzip", + "--threads", + "2", + "--qualitymax", + "41", + "--trimns", + "--trimqualities", + "--adapter1", + "AGATCGGAAGAGCACACGTCTGAACTCCAGTCAC", + "--adapter2", + "AGATCGGAAGAGCGTCGTGTAGGGAAAGAGTGTA", + "--minlength", + "30", + "--minquality", + "20", + "--minadapteroverlap", + "1", + "mv", + "*.settings", + "*.se.truncated.gz", + "output/" + ] + }, + "parents": [ + "fastqc_jk2782_l1", + "fastqc_jk2802_l2" + ], + "children": [ + "fastqc_after_clipping_jk2782_l1", + "fastqc_after_clipping_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000010", + "category": "adapter_removal", + "avgCPU": 106.5, + "bytesRead": 683, + "bytesWritten": 897, + "memory": 12136 + }, + { + "name": "fastqc_after_clipping_jk2782_l1", + "type": "compute", + "runtime": 15.371, + "command": { + "program": "fastqc_after_clipping", + "arguments": [ + "fastqc", + "-t", + "2", + "-q", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz" + ] + }, + "parents": [ + "adapter_removal_jk2782_l1", + "adapter_removal_jk2802_l2" + ], + "children": [ + "bwa_jk2802", + "bwa_jk2782" + ], + "files": [], + "cores": 2.0, + "id": "ID000013", + "category": "fastqc_after_clipping", + "avgCPU": 133.3, + "bytesRead": 23788, + "bytesWritten": 1998, + "memory": 215020 + }, + { + "name": "fastqc_after_clipping_jk2802_l2", + "type": "compute", + "runtime": 15.272, + "command": { + "program": "fastqc_after_clipping", + "arguments": [ + "fastqc", + "-t", + "2", + "-q", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se.truncated.gz" + ] + }, + "parents": [ + "adapter_removal_jk2782_l1", + "adapter_removal_jk2802_l2" + ], + "children": [ + "bwa_jk2802", + "bwa_jk2782" + ], + "files": [], + "cores": 2.0, + "id": "ID000014", + "category": "fastqc_after_clipping", + "avgCPU": 124.1, + "bytesRead": 23882, + "bytesWritten": 2143, + "memory": 213064 + }, + { + "name": "bwa_jk2802", + "type": "compute", + "runtime": 9.566, + "command": { + "program": "bwa", + "arguments": [ + "bwa", + "aln", + "-t", + "2", + "BWAIndex/Mammoth_MT_Krause.fasta", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se.truncated.gz", + "-n", + "0.04", + "-l", + "1024", + "-k", + "2", + "-o", + "1", + "-f", + "JK2802.sai", + "bwa", + "samse", + "-r", + "\"@RGtID:ILLUMINA-JK2802tSM:JK2802tPL:illuminatPU:ILLUMINA-JK2802-SE\"", + "BWAIndex/Mammoth_MT_Krause.fasta", + "JK2802.sai", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se.truncated.gz", + "|", + "samtools", + "sort", + "-@", + "1", + "-O", + "bam", + "-", + ">", + "\"JK2802\"_\"SE\".mapped.bam", + "samtools", + "index", + "\"JK2802\"_\"SE\".mapped.bam" + ] + }, + "parents": [ + "fastqc_after_clipping_jk2782_l1", + "fastqc_after_clipping_jk2802_l2" + ], + "children": [ + "samtools_flagstat_jk2782", + "samtools_flagstat_jk2802" + ], + "files": [], + "cores": 2.0, + "id": "ID000016", + "category": "bwa", + "avgCPU": 15.7, + "bytesRead": 3774, + "bytesWritten": 3367, + "memory": 10628 + }, + { + "name": "bwa_jk2782", + "type": "compute", + "runtime": 9.652, + "command": { + "program": "bwa", + "arguments": [ + "bwa", + "aln", + "-t", + "2", + "BWAIndex/Mammoth_MT_Krause.fasta", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz", + "-n", + "0.04", + "-l", + "1024", + "-k", + "2", + "-o", + "1", + "-f", + "JK2782.sai", + "bwa", + "samse", + "-r", + "\"@RGtID:ILLUMINA-JK2782tSM:JK2782tPL:illuminatPU:ILLUMINA-JK2782-PE\"", + "BWAIndex/Mammoth_MT_Krause.fasta", + "JK2782.sai", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz", + "|", + "samtools", + "sort", + "-@", + "1", + "-O", + "bam", + "-", + ">", + "\"JK2782\"_\"PE\".mapped.bam", + "samtools", + "index", + "\"JK2782\"_\"PE\".mapped.bam" + ] + }, + "parents": [ + "fastqc_after_clipping_jk2782_l1", + "fastqc_after_clipping_jk2802_l2" + ], + "children": [ + "samtools_flagstat_jk2782", + "samtools_flagstat_jk2802" + ], + "files": [], + "cores": 2.0, + "id": "ID000015", + "category": "bwa", + "avgCPU": 69.8, + "bytesRead": 3705, + "bytesWritten": 3355, + "memory": 12876 + }, + { + "name": "samtools_flagstat_jk2782", + "type": "compute", + "runtime": 13.011, + "command": { + "program": "samtools_flagstat", + "arguments": [ + "samtools", + "flagstat", + "JK2782_PE.mapped.bam", + ">", + "JK2782_flagstat.stats" + ] + }, + "parents": [ + "bwa_jk2802", + "bwa_jk2782" + ], + "children": [ + "markduplicates_jk2782", + "markduplicates_jk2802" + ], + "files": [], + "cores": 1.0, + "id": "ID000026", + "category": "samtools_flagstat", + "avgCPU": 30.1, + "bytesRead": 478, + "bytesWritten": 5, + "memory": 6468 + }, + { + "name": "samtools_flagstat_jk2802", + "type": "compute", + "runtime": 13.129, + "command": { + "program": "samtools_flagstat", + "arguments": [ + "samtools", + "flagstat", + "JK2802_SE.mapped.bam", + ">", + "JK2802_flagstat.stats" + ] + }, + "parents": [ + "bwa_jk2802", + "bwa_jk2782" + ], + "children": [ + "markduplicates_jk2782", + "markduplicates_jk2802" + ], + "files": [], + "cores": 1.0, + "id": "ID000024", + "category": "samtools_flagstat", + "avgCPU": 118.5, + "bytesRead": 551, + "bytesWritten": 5 + }, + { + "name": "markduplicates_jk2782", + "type": "compute", + "runtime": 22.655, + "command": { + "program": "markduplicates", + "arguments": [ + "mv", + "JK2782_PE.mapped.bam", + "JK2782.bam", + "picard", + "-Xmx4096M", + "MarkDuplicates", + "INPUT=JK2782.bam", + "OUTPUT=JK2782_rmdup.bam", + "REMOVE_DUPLICATES=TRUE", + "AS=TRUE", + "METRICS_FILE=\"JK2782_rmdup.metrics\"", + "VALIDATION_STRINGENCY=SILENT", + "samtools", + "index", + "JK2782_rmdup.bam" + ] + }, + "parents": [ + "samtools_flagstat_jk2782", + "samtools_flagstat_jk2802" + ], + "children": [ + "preseq_jk2782", + "preseq_jk2802" + ], + "files": [], + "cores": 2.0, + "id": "ID000021", + "category": "markduplicates", + "avgCPU": 173.6, + "bytesRead": 24055, + "bytesWritten": 2319, + "memory": 1400048 + }, + { + "name": "markduplicates_jk2802", + "type": "compute", + "runtime": 21.545, + "command": { + "program": "markduplicates", + "arguments": [ + "mv", + "JK2802_SE.mapped.bam", + "JK2802.bam", + "picard", + "-Xmx4096M", + "MarkDuplicates", + "INPUT=JK2802.bam", + "OUTPUT=JK2802_rmdup.bam", + "REMOVE_DUPLICATES=TRUE", + "AS=TRUE", + "METRICS_FILE=\"JK2802_rmdup.metrics\"", + "VALIDATION_STRINGENCY=SILENT", + "samtools", + "index", + "JK2802_rmdup.bam" + ] + }, + "parents": [ + "samtools_flagstat_jk2782", + "samtools_flagstat_jk2802" + ], + "children": [ + "preseq_jk2782", + "preseq_jk2802" + ], + "files": [], + "cores": 2.0, + "id": "ID000020", + "category": "markduplicates", + "avgCPU": 182.6, + "bytesRead": 24242, + "bytesWritten": 2466, + "memory": 1404624 + }, + { + "name": "preseq_jk2782", + "type": "compute", + "runtime": 12.299, + "command": { + "program": "preseq", + "arguments": [ + "preseq", + "c_curve", + "-s", + "1000", + "-o", + "JK2782_PE.mapped.ccurve", + "-B", + "JK2782_PE.mapped.bam" + ] + }, + "parents": [ + "markduplicates_jk2782", + "markduplicates_jk2802" + ], + "children": [ + "endorspy_jk2782", + "endorspy_jk2802" + ], + "files": [], + "cores": 1.0, + "id": "ID000030", + "category": "preseq", + "avgCPU": 81.9, + "bytesRead": 473, + "bytesWritten": 4, + "memory": 12032 + }, + { + "name": "preseq_jk2802", + "type": "compute", + "runtime": 10.188, + "command": { + "program": "preseq", + "arguments": [ + "preseq", + "c_curve", + "-s", + "1000", + "-o", + "JK2802_SE.mapped.ccurve", + "-B", + "JK2802_SE.mapped.bam" + ] + }, + "parents": [ + "markduplicates_jk2782", + "markduplicates_jk2802" + ], + "children": [ + "endorspy_jk2782", + "endorspy_jk2802" + ], + "files": [], + "cores": 1.0, + "id": "ID000027", + "category": "preseq", + "avgCPU": 77.6, + "bytesRead": 548, + "bytesWritten": 4, + "memory": 11972 + }, + { + "name": "endorspy_jk2782", + "type": "compute", + "runtime": 7.537, + "command": { + "program": "endorspy", + "arguments": [ + "endorS.py", + "-o", + "json", + "-n", + "JK2782", + "JK2782_flagstat.stats" + ] + }, + "parents": [ + "preseq_jk2782", + "preseq_jk2802" + ], + "children": [ + "damageprofiler_jk2802", + "damageprofiler_jk2782" + ], + "files": [], + "cores": 1.0, + "id": "ID000031", + "category": "endorspy", + "avgCPU": 44.7, + "bytesRead": 623, + "bytesWritten": 4, + "memory": 12264 + }, + { + "name": "endorspy_jk2802", + "type": "compute", + "runtime": 8.0, + "command": { + "program": "endorspy", + "arguments": [ + "endorS.py", + "-o", + "json", + "-n", + "JK2802", + "JK2802_flagstat.stats" + ] + }, + "parents": [ + "preseq_jk2782", + "preseq_jk2802" + ], + "children": [ + "damageprofiler_jk2802", + "damageprofiler_jk2782" + ], + "files": [], + "cores": 1.0, + "id": "ID000032", + "category": "endorspy", + "avgCPU": 54.0, + "bytesRead": 623, + "bytesWritten": 4, + "memory": 12224 + }, + { + "name": "damageprofiler_jk2802", + "type": "compute", + "runtime": 18.596, + "command": { + "program": "damageprofiler", + "arguments": [ + "damageprofiler", + "-Xmx4g", + "-i", + "JK2802_rmdup.bam", + "-r", + "Mammoth_MT_Krause.fasta", + "-l", + "100", + "-t", + "15", + "-o", + ".", + "-yaxis_damageplot", + "0.30" + ] + }, + "parents": [ + "endorspy_jk2782", + "endorspy_jk2802" + ], + "children": [ + "qualimap_jk2802", + "qualimap_jk2782" + ], + "files": [], + "cores": 1.0, + "id": "ID000033", + "category": "damageprofiler", + "avgCPU": 88.6, + "bytesRead": 25744, + "bytesWritten": 391, + "memory": 242940 + }, + { + "name": "damageprofiler_jk2782", + "type": "compute", + "runtime": 16.736, + "command": { + "program": "damageprofiler", + "arguments": [ + "damageprofiler", + "-Xmx4g", + "-i", + "JK2782_rmdup.bam", + "-r", + "Mammoth_MT_Krause.fasta", + "-l", + "100", + "-t", + "15", + "-o", + ".", + "-yaxis_damageplot", + "0.30" + ] + }, + "parents": [ + "endorspy_jk2782", + "endorspy_jk2802" + ], + "children": [ + "qualimap_jk2802", + "qualimap_jk2782" + ], + "files": [], + "cores": 1.0, + "id": "ID000036", + "category": "damageprofiler", + "avgCPU": 88.3, + "bytesRead": 25661, + "bytesWritten": 327, + "memory": 198276 + }, + { + "name": "qualimap_jk2802", + "type": "compute", + "runtime": 15.368, + "command": { + "program": "qualimap", + "arguments": [ + "qualimap", + "bamqc", + "-bam", + "JK2802_rmdup.bam", + "-nt", + "2", + "-outdir", + ".", + "-outformat", + "\"HTML\"", + "--java-mem-size=4G" + ] + }, + "parents": [ + "damageprofiler_jk2802", + "damageprofiler_jk2782" + ], + "children": [ + "multiqc_1" + ], + "files": [], + "cores": 2.0, + "id": "ID000053", + "category": "qualimap", + "avgCPU": 177.7, + "bytesRead": 35038, + "bytesWritten": 1712, + "memory": 209440 + }, + { + "name": "qualimap_jk2782", + "type": "compute", + "runtime": 14.223, + "command": { + "program": "qualimap", + "arguments": [ + "qualimap", + "bamqc", + "-bam", + "JK2782_rmdup.bam", + "-nt", + "2", + "-outdir", + ".", + "-outformat", + "\"HTML\"", + "--java-mem-size=4G" + ] + }, + "parents": [ + "damageprofiler_jk2802", + "damageprofiler_jk2782" + ], + "children": [ + "multiqc_1" + ], + "files": [], + "cores": 2.0, + "id": "ID000054", + "category": "qualimap", + "avgCPU": 181.9, + "bytesRead": 34954, + "bytesWritten": 1937, + "memory": 232196 + }, + { + "name": "multiqc_1", + "type": "compute", + "runtime": 46.376, + "command": { + "program": "multiqc", + "arguments": [ + "multiqc", + "-f", + "multiqc_config.yaml", + "." + ] + }, + "parents": [ + "qualimap_jk2802", + "qualimap_jk2782" + ], + "children": [], + "files": [], + "cores": 1.0, + "id": "ID000056", + "category": "multiqc", + "avgCPU": 93.0, + "bytesRead": 1215169, + "bytesWritten": 22599, + "memory": 139496 + } + ] + } +} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt index b6789542..5e2463f8 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt @@ -25,6 +25,8 @@ package org.opendc.trace.wtf import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Duration +import java.time.Instant /** * A [TableReader] implementation for the WTF format. @@ -61,14 +63,14 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader record["id"] - TASK_WORKFLOW_ID -> record["workflow_id"] - TASK_SUBMIT_TIME -> record["ts_submit"] - TASK_WAIT_TIME -> record["wait_time"] - TASK_RUNTIME -> record["runtime"] + TASK_ID -> (record["id"] as Long).toString() + TASK_WORKFLOW_ID -> (record["workflow_id"] as Long).toString() + TASK_SUBMIT_TIME -> Instant.ofEpochMilli(record["ts_submit"] as Long) + TASK_WAIT_TIME -> Duration.ofMillis(record["wait_time"] as Long) + TASK_RUNTIME -> Duration.ofMillis(record["runtime"] as Long) TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt() - TASK_PARENTS -> (record["parents"] as ArrayList).map { it["item"] as Long }.toSet() - TASK_CHILDREN -> (record["children"] as ArrayList).map { it["item"] as Long }.toSet() + TASK_PARENTS -> (record["parents"] as ArrayList).map { it["item"].toString() }.toSet() + TASK_CHILDREN -> (record["children"] as ArrayList).map { it["item"].toString() }.toSet() TASK_GROUP_ID -> record["group_id"] TASK_USER_ID -> record["user_id"] else -> throw IllegalArgumentException("Invalid column") @@ -94,16 +96,7 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader): Long { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (column) { - TASK_ID -> record["id"] as Long - TASK_WORKFLOW_ID -> record["workflow_id"] as Long - TASK_SUBMIT_TIME -> record["ts_submit"] as Long - TASK_WAIT_TIME -> record["wait_time"] as Long - TASK_RUNTIME -> record["runtime"] as Long - else -> throw IllegalArgumentException("Invalid column") - } + throw IllegalArgumentException("Invalid column") } override fun getDouble(column: TableColumn): Double { diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index a05a523e..b155f265 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -28,6 +28,8 @@ import org.junit.jupiter.api.assertThrows import org.opendc.trace.* import java.io.File import java.net.URL +import java.time.Duration +import java.time.Instant /** * Test suite for the [WtfTraceFormat] class. @@ -91,20 +93,20 @@ class WtfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(362334516345962206, reader.getLong(TASK_ID)) }, - { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) }, - { assertEquals(245604, reader.getLong(TASK_SUBMIT_TIME)) }, - { assertEquals(8163, reader.getLong(TASK_RUNTIME)) }, - { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) }, + { assertEquals("362334516345962206", reader.get(TASK_ID)) }, + { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals(Instant.ofEpochMilli(245604), reader.get(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofMillis(8163), reader.get(TASK_RUNTIME)) }, + { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) }, ) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(502010169100446658, reader.getLong(TASK_ID)) }, - { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) }, - { assertEquals(251325, reader.getLong(TASK_SUBMIT_TIME)) }, - { assertEquals(8216, reader.getLong(TASK_RUNTIME)) }, - { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) }, + { assertEquals("502010169100446658", reader.get(TASK_ID)) }, + { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals(Instant.ofEpochMilli(251325), reader.get(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofMillis(8216), reader.get(TASK_RUNTIME)) }, + { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) }, ) reader.close() -- cgit v1.2.3 From 49dd8377c8bfde1e64e411c6a6f921c768b9b53b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 12 Sep 2021 11:22:07 +0200 Subject: refactor(trace): Add API for accessing available table columns This change adds a new API to the Table interface for accessing the table columns that the table supports. This does not necessarily mean that the column will have a value for every row, but that the table format has defined this particular column. --- .../src/main/kotlin/org/opendc/trace/Table.kt | 4 +-- .../trace/bitbrains/BitbrainsResourceStateTable.kt | 31 ++++++++++------------ .../kotlin/org/opendc/trace/gwf/GwfTaskTable.kt | 21 +++++++-------- .../kotlin/org/opendc/trace/swf/SwfTaskTable.kt | 27 +++++++++---------- .../org/opendc/trace/wfformat/WfFormatTaskTable.kt | 19 ++++++------- .../kotlin/org/opendc/trace/wtf/WtfTaskTable.kt | 27 +++++++++---------- 6 files changed, 57 insertions(+), 72 deletions(-) (limited to 'opendc-trace') diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index 11e5d6b7..6aca2051 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -37,9 +37,9 @@ public interface Table { public val isSynthetic: Boolean /** - * Determine whether the specified [column] is supported by this table. + * The list of columns supported in this table. */ - public fun isSupported(column: TableColumn<*>): Boolean + public val columns: List> /** * Open a [TableReader] for this table. diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt index 846d5c8a..883bf8f4 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt @@ -47,23 +47,20 @@ internal class BitbrainsResourceStateTable(private val factory: CsvFactory, priv override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_STATE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_NCPUS -> true - RESOURCE_STATE_CPU_CAPACITY -> true - RESOURCE_STATE_CPU_USAGE -> true - RESOURCE_STATE_CPU_USAGE_PCT -> true - RESOURCE_STATE_MEM_CAPACITY -> true - RESOURCE_STATE_MEM_USAGE -> true - RESOURCE_STATE_DISK_READ -> true - RESOURCE_STATE_DISK_WRITE -> true - RESOURCE_STATE_NET_RX -> true - RESOURCE_STATE_NET_TX -> true - else -> false - } - } + override val columns: List> = listOf( + RESOURCE_STATE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_NCPUS, + RESOURCE_STATE_CPU_CAPACITY, + RESOURCE_STATE_CPU_USAGE, + RESOURCE_STATE_CPU_USAGE_PCT, + RESOURCE_STATE_MEM_CAPACITY, + RESOURCE_STATE_MEM_USAGE, + RESOURCE_STATE_DISK_READ, + RESOURCE_STATE_DISK_WRITE, + RESOURCE_STATE_NET_RX, + RESOURCE_STATE_NET_TX, + ) override fun newReader(): TableReader { val it = partitions.iterator() diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt index 80a99d10..fd7bd068 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTable.kt @@ -34,18 +34,15 @@ internal class GwfTaskTable(private val factory: CsvFactory, private val url: UR override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - TASK_WORKFLOW_ID -> true - TASK_ID -> true - TASK_SUBMIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_ALLOC_NCPUS -> true - TASK_PARENTS -> true - else -> false - } - } + override val columns: List> = listOf( + TASK_WORKFLOW_ID, + TASK_ID, + TASK_SUBMIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS + ) override fun newReader(): TableReader { return GwfTaskTableReader(factory.createParser(url)) diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt index 12a51a2f..7ec0d607 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTable.kt @@ -34,21 +34,18 @@ internal class SwfTaskTable(private val path: Path) : Table { override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - TASK_ID -> true - TASK_SUBMIT_TIME -> true - TASK_WAIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_ALLOC_NCPUS -> true - TASK_PARENTS -> true - TASK_STATUS -> true - TASK_GROUP_ID -> true - TASK_USER_ID -> true - else -> false - } - } + override val columns: List> = listOf( + TASK_ID, + TASK_SUBMIT_TIME, + TASK_WAIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_ALLOC_NCPUS, + TASK_PARENTS, + TASK_STATUS, + TASK_GROUP_ID, + TASK_USER_ID + ) override fun newReader(): TableReader { val reader = path.bufferedReader() diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt index 907bf7ff..7b7f979f 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt @@ -34,17 +34,14 @@ internal class WfFormatTaskTable(private val factory: JsonFactory, private val p override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - TASK_ID -> true - TASK_WORKFLOW_ID -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_PARENTS -> true - TASK_CHILDREN -> true - else -> false - } - } + override val columns: List> = listOf( + TASK_ID, + TASK_WORKFLOW_ID, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_PARENTS, + TASK_CHILDREN + ) override fun newReader(): TableReader { val parser = factory.createParser(path.toFile()) diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt index be26f540..74202718 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt @@ -35,21 +35,18 @@ internal class WtfTaskTable(private val path: Path) : Table { override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - TASK_ID -> true - TASK_WORKFLOW_ID -> true - TASK_SUBMIT_TIME -> true - TASK_WAIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_PARENTS -> true - TASK_CHILDREN -> true - TASK_GROUP_ID -> true - TASK_USER_ID -> true - else -> false - } - } + override val columns: List> = listOf( + TASK_ID, + TASK_WORKFLOW_ID, + TASK_SUBMIT_TIME, + TASK_WAIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_PARENTS, + TASK_CHILDREN, + TASK_GROUP_ID, + TASK_USER_ID + ) override fun newReader(): TableReader { val reader = LocalParquetReader(path.resolve("tasks/schema-1.0")) -- cgit v1.2.3 From 3fb1eac8290181638a6571e4d7a49e53b7f3d7d1 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 12 Sep 2021 11:40:15 +0200 Subject: feat(trace): Add synthetic resource table for Bitbrains format This change adds a synthetic resource table for the Bitbrains format, which can be used to list the available partitions in the trace. --- .../trace/bitbrains/BitbrainsResourceStateTable.kt | 2 +- .../trace/bitbrains/BitbrainsResourceTable.kt | 61 ++++++++++++ .../bitbrains/BitbrainsResourceTableReader.kt | 108 +++++++++++++++++++++ .../org/opendc/trace/bitbrains/BitbrainsTrace.kt | 12 +-- .../trace/bitbrains/BitbrainsTraceFormatTest.kt | 23 ++++- 5 files changed, 195 insertions(+), 11 deletions(-) create mode 100644 opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt create mode 100644 opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt (limited to 'opendc-trace') diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt index 883bf8f4..c9e5954d 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTable.kt @@ -33,7 +33,7 @@ import kotlin.io.path.nameWithoutExtension /** * The resource state [Table] in the Bitbrains format. */ -internal class BitbrainsResourceStateTable(private val factory: CsvFactory, private val path: Path) : Table { +internal class BitbrainsResourceStateTable(private val factory: CsvFactory, path: Path) : Table { /** * The partitions that belong to the table. */ diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt new file mode 100644 index 00000000..bc4f0b7d --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTable.kt @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension + +/** + * The resources [Table] in the Bitbrains format. + */ +internal class BitbrainsResourceTable(private val factory: CsvFactory, path: Path) : Table { + /** + * The VMs that belong to the table. + */ + private val vms = + Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + + override val name: String = TABLE_RESOURCES + + override val isSynthetic: Boolean = true + + override val columns: List> = listOf(RESOURCE_ID) + + override fun newReader(): TableReader { + return BitbrainsResourceTableReader(factory, vms) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("Unknown partition $partition") + } + + override fun toString(): String = "BitbrainsResourceTable" +} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt new file mode 100644 index 00000000..c02dc5ae --- /dev/null +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.nio.file.Path + +/** + * A [TableReader] for the Bitbrains resource table. + */ +internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms: Map) : TableReader { + /** + * An iterator to iterate over the resource entries. + */ + private val it = vms.iterator() + + override fun nextRow(): Boolean { + reset() + + while (it.hasNext()) { + val (name, path) = it.next() + + val parser = factory.createParser(path.toFile()) + val reader = BitbrainsResourceStateTableReader(name, parser) + + try { + if (!reader.nextRow()) { + continue + } + + id = reader.get(RESOURCE_STATE_ID) + return true + } finally { + reader.close() + } + } + + return false + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_ID -> true + else -> false + } + } + + override fun get(column: TableColumn): T { + val res: Any? = when (column) { + RESOURCE_ID -> id + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn): Int { + throw IllegalArgumentException("Invalid column") + } + + override fun getLong(column: TableColumn): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun close() {} + + /** + * State fields of the reader. + */ + private var id: String? = null + + /** + * Reset the state of the reader. + */ + private fun reset() { + id = null + } +} diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt index 5a2d4243..bcd8dd52 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTrace.kt @@ -30,16 +30,16 @@ import java.nio.file.Path * [Trace] implementation for the Bitbrains format. */ public class BitbrainsTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { - override val tables: List = listOf(TABLE_RESOURCE_STATES) + override val tables: List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - override fun containsTable(name: String): Boolean = TABLE_RESOURCE_STATES == name + override fun containsTable(name: String): Boolean = tables.contains(name) override fun getTable(name: String): Table? { - if (!containsTable(name)) { - return null + return when (name) { + TABLE_RESOURCES -> BitbrainsResourceTable(factory, path) + TABLE_RESOURCE_STATES -> BitbrainsResourceStateTable(factory, path) + else -> null } - - return BitbrainsResourceStateTable(factory, path) } override fun toString(): String = "BitbrainsTrace[$path]" diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt index 550805d3..ff4a33f8 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt @@ -25,9 +25,7 @@ package org.opendc.trace.bitbrains import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows -import org.opendc.trace.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.RESOURCE_STATE_TIMESTAMP -import org.opendc.trace.TABLE_RESOURCE_STATES +import org.opendc.trace.* import java.net.URL /** @@ -58,7 +56,7 @@ class BitbrainsTraceFormatTest { val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) val trace = format.open(url) - assertEquals(listOf(TABLE_RESOURCE_STATES), trace.tables) + assertEquals(listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES), trace.tables) } @Test @@ -81,6 +79,23 @@ class BitbrainsTraceFormatTest { assertNull(trace.getTable("test")) } + @Test + fun testResources() { + val format = BitbrainsTraceFormat() + val url = checkNotNull(BitbrainsTraceFormatTest::class.java.getResource("/bitbrains.csv")) + val trace = format.open(url) + + val reader = trace.getTable(TABLE_RESOURCES)!!.newReader() + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("bitbrains", reader.get(RESOURCE_ID)) }, + { assertFalse(reader.nextRow()) } + ) + + reader.close() + } + @Test fun testSmoke() { val format = BitbrainsTraceFormat() -- cgit v1.2.3 From 992b65396f55c0e12b36823d191dea8e03dd45ba Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 12 Sep 2021 11:46:03 +0200 Subject: feat(trace): Support dynamic resolving of trace formats This change enables users to open traces of various trace formats by dynamically specifying the format name. The trace API will use the service loader to resolve the available trace formats on the classpath. --- .../src/main/kotlin/org/opendc/trace/Trace.kt | 35 ++++++++++++++++++++++ 1 file changed, 35 insertions(+) (limited to 'opendc-trace') diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt index 36e93b52..0ae45e86 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt @@ -22,6 +22,11 @@ package org.opendc.trace +import org.opendc.trace.spi.TraceFormat +import java.io.File +import java.net.URL +import java.nio.file.Path + /** * A trace is a collection of related tables that characterize a workload. */ @@ -40,4 +45,34 @@ public interface Trace { * Obtain a [Table] with the specified [name]. */ public fun getTable(name: String): Table? + + public companion object { + /** + * Open a [Trace] at the specified [url] in the given [format]. + * + * @throws IllegalArgumentException if [format] is not supported. + */ + public fun open(url: URL, format: String): Trace { + val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } + return provider.open(url) + } + + /** + * Open a [Trace] at the specified [path] in the given [format]. + * + * @throws IllegalArgumentException if [format] is not supported. + */ + public fun open(path: File, format: String): Trace { + return open(path.toURI().toURL(), format) + } + + /** + * Open a [Trace] at the specified [path] in the given [format]. + * + * @throws IllegalArgumentException if [format] is not supported. + */ + public fun open(path: Path, format: String): Trace { + return open(path.toUri().toURL(), format) + } + } } -- cgit v1.2.3