diff options
19 files changed, 2319 insertions, 80 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 4e2fc777..ddede2e8 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -52,6 +52,7 @@ clikt = { module = "com.github.ajalt.clikt:clikt", version.ref = "clikt" } progressbar = { module = "me.tongfei:progressbar", version.ref = "progressbar" } # Format +jackson-core = { module = "com.fasterxml.jackson.core:jackson-core", version.ref = "jackson" } jackson-module-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" } jackson-datatype-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" } jackson-dataformat-csv = { module = "com.fasterxml.jackson.dataformat:jackson-dataformat-csv", version.ref = "jackson" } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt index 88bbc623..46920dce 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TaskColumns.kt @@ -23,49 +23,52 @@ @file:JvmName("TaskColumns") package org.opendc.trace +import java.time.Duration +import java.time.Instant + /** * A column containing the task identifier. */ @JvmField -public val TASK_ID: TableColumn<Long> = longColumn("task:id") +public val TASK_ID: TableColumn<String> = stringColumn("task:id") /** * A column containing the identifier of the workflow. */ @JvmField -public val TASK_WORKFLOW_ID: TableColumn<Long> = longColumn("task:workflow_id") +public val TASK_WORKFLOW_ID: TableColumn<String> = stringColumn("task:workflow_id") /** * A column containing the submit time of the task. */ @JvmField -public val TASK_SUBMIT_TIME: TableColumn<Long> = longColumn("task:submit_time") +public val TASK_SUBMIT_TIME: TableColumn<Instant> = TableColumn("task:submit_time", type = Instant::class.java) /** * A column containing the wait time of the task. */ @JvmField -public val TASK_WAIT_TIME: TableColumn<Long> = longColumn("task:wait_time") +public val TASK_WAIT_TIME: TableColumn<Instant> = TableColumn("task:wait_time", type = Instant::class.java) /** * A column containing the runtime time of the task. */ @JvmField -public val TASK_RUNTIME: TableColumn<Long> = longColumn("task:runtime") +public val TASK_RUNTIME: TableColumn<Duration> = TableColumn("task:runtime", type = Duration::class.java) /** * A column containing the parents of a task. */ @Suppress("UNCHECKED_CAST") @JvmField -public val TASK_PARENTS: TableColumn<Set<Long>> = TableColumn("task:parents", type = Set::class.java as Class<Set<Long>>) +public val TASK_PARENTS: TableColumn<Set<String>> = TableColumn("task:parents", type = Set::class.java as Class<Set<String>>) /** * A column containing the children of a task. */ @Suppress("UNCHECKED_CAST") @JvmField -public val TASK_CHILDREN: TableColumn<Set<Long>> = TableColumn("task:children", type = Set::class.java as Class<Set<Long>>) +public val TASK_CHILDREN: TableColumn<Set<String>> = TableColumn("task:children", type = Set::class.java as Class<Set<String>>) /** * A column containing the requested CPUs of a task. diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index fb9099bf..39eb5520 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -26,6 +26,8 @@ import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* +import java.time.Duration +import java.time.Instant import java.util.regex.Pattern /** @@ -52,10 +54,10 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } when (parser.currentName) { - "WorkflowID" -> workflowId = parser.longValue - "JobID" -> jobId = parser.longValue - "SubmitTime" -> submitTime = parser.longValue - "RunTime" -> runtime = parser.longValue + "WorkflowID" -> workflowId = parser.text + "JobID" -> jobId = parser.text + "SubmitTime" -> submitTime = Instant.ofEpochSecond(parser.longValue) + "RunTime" -> runtime = Duration.ofSeconds(parser.longValue) "NProcs" -> nProcs = parser.intValue "ReqNProcs" -> reqNProcs = parser.intValue "Dependencies" -> parseParents(parser.valueAsString) @@ -79,7 +81,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } override fun <T> get(column: TableColumn<T>): T { - val res: Any = when (column) { + val res: Any? = when (column) { TASK_WORKFLOW_ID -> workflowId TASK_ID -> jobId TASK_SUBMIT_TIME -> submitTime @@ -107,13 +109,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { } override fun getLong(column: TableColumn<Long>): Long { - return when (column) { - TASK_WORKFLOW_ID -> workflowId - TASK_ID -> jobId - TASK_SUBMIT_TIME -> submitTime - TASK_RUNTIME -> runtime - else -> throw IllegalArgumentException("Invalid column") - } + throw IllegalArgumentException("Invalid column") } override fun getDouble(column: TableColumn<Double>): Double { @@ -163,10 +159,10 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { /** * Reader state fields. */ - private var workflowId = -1L - private var jobId = -1L - private var submitTime = -1L - private var runtime = -1L + private var workflowId: String? = null + private var jobId: String? = null + private var submitTime: Instant? = null + private var runtime: Duration? = null private var nProcs = -1 private var reqNProcs = -1 private var dependencies = emptySet<Long>() @@ -175,10 +171,10 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { * Reset the state. */ private fun reset() { - workflowId = -1 - jobId = -1 - submitTime = -1 - runtime = -1 + workflowId = null + jobId = null + submitTime = null + runtime = null nProcs = -1 reqNProcs = -1 dependencies = emptySet() diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index 6b0568fe..b209b979 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -29,6 +29,8 @@ import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.opendc.trace.* import java.net.URL +import java.time.Duration +import java.time.Instant /** * Test suite for the [GwfTraceFormat] class. @@ -90,11 +92,11 @@ internal class GwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(0L, reader.getLong(TASK_WORKFLOW_ID)) }, - { assertEquals(1L, reader.getLong(TASK_ID)) }, - { assertEquals(16, reader.getLong(TASK_SUBMIT_TIME)) }, - { assertEquals(11, reader.getLong(TASK_RUNTIME)) }, - { assertEquals(setOf<Long>(), reader.get(TASK_PARENTS)) }, + { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals("1", reader.get(TASK_ID)) }, + { assertEquals(Instant.ofEpochSecond(16), reader.get(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) }, + { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) }, ) } diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt index 5f879a54..3f49c770 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt @@ -24,6 +24,8 @@ package org.opendc.trace.swf import org.opendc.trace.* import java.io.BufferedReader +import java.time.Duration +import java.time.Instant /** * A [TableReader] implementation for the SWF format. @@ -85,10 +87,10 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea override fun <T> get(column: TableColumn<T>): T { val res: Any = when (column) { - TASK_ID -> getLong(TASK_ID) - TASK_SUBMIT_TIME -> getLong(TASK_SUBMIT_TIME) - TASK_WAIT_TIME -> getLong(TASK_WAIT_TIME) - TASK_RUNTIME -> getLong(TASK_RUNTIME) + TASK_ID -> fields[COL_JOB_ID] + TASK_SUBMIT_TIME -> Instant.ofEpochSecond(fields[COL_SUBMIT_TIME].toLong(10)) + TASK_WAIT_TIME -> Duration.ofSeconds(fields[COL_WAIT_TIME].toLong(10)) + TASK_RUNTIME -> Duration.ofSeconds(fields[COL_RUN_TIME].toLong(10)) TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS) TASK_ALLOC_NCPUS -> getInt(TASK_ALLOC_NCPUS) TASK_PARENTS -> { @@ -121,13 +123,7 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea } override fun getLong(column: TableColumn<Long>): Long { - return when (column) { - TASK_ID -> fields[COL_JOB_ID].toLong(10) - TASK_SUBMIT_TIME -> fields[COL_SUBMIT_TIME].toLong(10) - TASK_WAIT_TIME -> fields[COL_WAIT_TIME].toLong(10) - TASK_RUNTIME -> fields[COL_RUN_TIME].toLong(10) - else -> throw IllegalArgumentException("Invalid column") - } + throw IllegalArgumentException("Invalid column") } override fun getDouble(column: TableColumn<Double>): Double { diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt index 9686891b..828c2bfa 100644 --- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -85,10 +85,10 @@ internal class SwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(1, reader.getLong(TASK_ID)) }, + { assertEquals("1", reader.get(TASK_ID)) }, { assertEquals(306, reader.getInt(TASK_ALLOC_NCPUS)) }, { assertTrue(reader.nextRow()) }, - { assertEquals(2, reader.getLong(TASK_ID)) }, + { assertEquals("2", reader.get(TASK_ID)) }, { assertEquals(17, reader.getInt(TASK_ALLOC_NCPUS)) }, ) diff --git a/opendc-trace/opendc-trace-wfformat/build.gradle.kts b/opendc-trace/opendc-trace-wfformat/build.gradle.kts new file mode 100644 index 00000000..2d336d03 --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/build.gradle.kts @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Support for WfCommons workload traces in OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(projects.opendcTrace.opendcTraceApi) + + implementation(libs.jackson.core) +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt new file mode 100644 index 00000000..907bf7ff --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTable.kt @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonFactory +import org.opendc.trace.* +import java.nio.file.Path + +/** + * A [Table] containing the tasks in a WfCommons workload trace. + */ +internal class WfFormatTaskTable(private val factory: JsonFactory, private val path: Path) : Table { + override val name: String = TABLE_TASKS + + override val isSynthetic: Boolean = false + + override fun isSupported(column: TableColumn<*>): Boolean { + return when (column) { + TASK_ID -> true + TASK_WORKFLOW_ID -> true + TASK_RUNTIME -> true + TASK_REQ_NCPUS -> true + TASK_PARENTS -> true + TASK_CHILDREN -> true + else -> false + } + } + + override fun newReader(): TableReader { + val parser = factory.createParser(path.toFile()) + return WfFormatTaskTableReader(parser) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("Invalid partition $partition") + } + + override fun toString(): String = "WfFormatTaskTable" +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt new file mode 100644 index 00000000..4408ba5c --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt @@ -0,0 +1,234 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonParseException +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.core.JsonToken +import org.opendc.trace.* +import java.time.Duration +import kotlin.math.roundToInt + +/** + * A [TableReader] implementation for the WfCommons workload trace format. + */ +internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableReader { + /** + * The current nesting of the parser. + */ + private var level: ParserLevel = ParserLevel.TOP + + override fun nextRow(): Boolean { + reset() + + var hasJob = false + + while (!hasJob) { + when (level) { + ParserLevel.TOP -> { + val token = parser.nextToken() + + // Check whether the document is not empty and starts with an object + if (token == null) { + break + } else if (token != JsonToken.START_OBJECT) { + throw JsonParseException(parser, "Expected object", parser.currentLocation) + } else { + level = ParserLevel.TRACE + } + } + ParserLevel.TRACE -> { + // Seek for the workflow object in the file + if (!seekWorkflow()) { + break + } else if (!parser.isExpectedStartObjectToken) { + throw JsonParseException(parser, "Expected object", parser.currentLocation) + } else { + level = ParserLevel.WORKFLOW + } + } + ParserLevel.WORKFLOW -> { + // Seek for the jobs object in the file + level = if (!seekJobs()) { + ParserLevel.TRACE + } else if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array", parser.currentLocation) + } else { + ParserLevel.JOB + } + } + ParserLevel.JOB -> { + when (parser.nextToken()) { + JsonToken.END_ARRAY -> level = ParserLevel.WORKFLOW + JsonToken.START_OBJECT -> { + parseJob() + hasJob = true + break + } + else -> throw JsonParseException(parser, "Unexpected token", parser.currentLocation) + } + } + } + } + + return hasJob + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + TASK_ID -> true + TASK_WORKFLOW_ID -> true + TASK_RUNTIME -> true + TASK_REQ_NCPUS -> true + TASK_PARENTS -> true + TASK_CHILDREN -> true + else -> false + } + } + + override fun <T> get(column: TableColumn<T>): T { + val res: Any? = when (column) { + TASK_ID -> id + TASK_WORKFLOW_ID -> workflowId + TASK_RUNTIME -> runtime + TASK_PARENTS -> parents + TASK_CHILDREN -> children + TASK_REQ_NCPUS -> getInt(TASK_REQ_NCPUS) + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn<Boolean>): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn<Int>): Int { + return when (column) { + TASK_REQ_NCPUS -> cores + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn<Long>): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn<Double>): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + parser.close() + } + + /** + * Parse the trace and seek until the workflow description. + */ + private fun seekWorkflow(): Boolean { + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "name" -> workflowId = parser.text + "workflow" -> return true + else -> parser.skipChildren() + } + } + + return false + } + + /** + * Parse the workflow description in the file and seek until the first job. + */ + private fun seekJobs(): Boolean { + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "jobs" -> return true + else -> parser.skipChildren() + } + } + + return false + } + + /** + * Parse a single job in the file. + */ + private fun parseJob() { + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "name" -> id = parser.text + "parents" -> parents = parseIds() + "children" -> children = parseIds() + "runtime" -> runtime = Duration.ofSeconds(parser.numberValue.toLong()) + "cores" -> cores = parser.floatValue.roundToInt() + else -> parser.skipChildren() + } + } + } + + /** + * Parse the parents/children of a job. + */ + private fun parseIds(): Set<String> { + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array", parser.currentLocation) + } + + val ids = mutableSetOf<String>() + + while (parser.nextToken() != JsonToken.END_ARRAY) { + if (parser.currentToken != JsonToken.VALUE_STRING) { + throw JsonParseException(parser, "Expected token", parser.currentLocation) + } + + ids.add(parser.valueAsString) + } + + return ids + } + + private enum class ParserLevel { + TOP, TRACE, WORKFLOW, JOB + } + + /** + * State fields for the parser. + */ + private var id: String? = null + private var workflowId: String? = null + private var runtime: Duration? = null + private var parents: Set<String>? = null + private var children: Set<String>? = null + private var cores = -1 + + private fun reset() { + id = null + runtime = null + parents = null + children = null + cores = -1 + } +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt new file mode 100644 index 00000000..2d9c79fb --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTrace.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonFactory +import org.opendc.trace.TABLE_TASKS +import org.opendc.trace.Table +import org.opendc.trace.Trace +import java.nio.file.Path + +/** + * [Trace] implementation for the WfCommons workload trace format. + */ +public class WfFormatTrace internal constructor(private val factory: JsonFactory, private val path: Path) : Trace { + override val tables: List<String> = listOf(TABLE_TASKS) + + override fun containsTable(name: String): Boolean = TABLE_TASKS == name + + override fun getTable(name: String): Table? { + return when (name) { + TABLE_TASKS -> WfFormatTaskTable(factory, path) + else -> null + } + } + + override fun toString(): String = "WfFormatTrace[$path]" +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt new file mode 100644 index 00000000..ff8d054c --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonFactory +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * A [TraceFormat] implementation for the WfCommons workload trace format. + */ +public class WfFormatTraceFormat : TraceFormat { + /** + * The [JsonFactory] that is used to created JSON parsers. + */ + private val factory = JsonFactory() + + override val name: String = "wfformat" + + override fun open(url: URL): WfFormatTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return WfFormatTrace(factory, path) + } +} diff --git a/opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat new file mode 100644 index 00000000..ee3aa2f6 --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat @@ -0,0 +1 @@ +org.opendc.trace.wfformat.WfFormatTraceFormat diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt new file mode 100644 index 00000000..b07f27ed --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt @@ -0,0 +1,345 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonFactory +import com.fasterxml.jackson.core.JsonParseException +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows +import org.opendc.trace.TASK_ID +import org.opendc.trace.TASK_PARENTS + +/** + * Test suite for the [WfFormatTaskTableReader] class. + */ +internal class WfFormatTaskTableReaderTest { + /** + * The [JsonFactory] used to construct the parser. + */ + private val factory = JsonFactory() + + @Test + fun testEmptyInput() { + val content = "" + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertFalse(reader.nextRow()) + reader.close() + } + + @Test + fun testTopLevelArrayInput() { + val content = "[]" + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { + while (reader.nextRow()) { + continue + } + } + + reader.close() + } + + @Test + fun testNoWorkflow() { + val content = """ + { + "name": "eager-nextflow-chameleon" + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertDoesNotThrow { + while (reader.nextRow()) { + continue + } + } + + reader.close() + } + + @Test + fun testWorkflowArrayType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": [] + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { + while (reader.nextRow()) { + continue + } + } + + reader.close() + } + + @Test + fun testWorkflowNullType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": null + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { + while (reader.nextRow()) { + continue + } + } + + reader.close() + } + + @Test + fun testNoJobs() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertDoesNotThrow { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsObjectType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { "jobs": {} } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsNullType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { "jobs": null } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsInvalidChildType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [1] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsValidChildType() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test" + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertTrue(reader.nextRow()) + assertEquals("test", reader.get(TASK_ID)) + assertFalse(reader.nextRow()) + + reader.close() + } + + @Test + fun testJobsInvalidParents() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": 1, + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsInvalidParentsItem() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": [1], + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertThrows<JsonParseException> { reader.nextRow() } + + reader.close() + } + + @Test + fun testJobsValidParents() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": ["1"] + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertTrue(reader.nextRow()) + assertEquals(setOf("1"), reader.get(TASK_PARENTS)) + assertFalse(reader.nextRow()) + + reader.close() + } + + @Test + fun testJobsInvalidSecondEntry() { + val content = """ + { + "workflow": { + "jobs": [ + { + "name": "test", + "parents": ["1"] + }, + "test" + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertDoesNotThrow { reader.nextRow() } + assertThrows<JsonParseException> { reader.nextRow() } + + reader.close() + } + + @Test + fun testDuplicateJobsArray() { + val content = """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": ["1"] + } + ], + "jobs": [ + { + "name": "test2", + "parents": ["test"] + } + ] + } + } + """.trimIndent() + val parser = factory.createParser(content) + val reader = WfFormatTaskTableReader(parser) + + assertTrue(reader.nextRow()) + assertTrue(reader.nextRow()) + assertEquals("test2", reader.get(TASK_ID)) + assertFalse(reader.nextRow()) + + reader.close() + } +} diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt new file mode 100644 index 00000000..0bfc8840 --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows +import org.opendc.trace.* +import java.io.File +import java.net.URL + +/** + * Test suite for the [WfFormatTraceFormat] class. + */ +class WfFormatTraceFormatTest { + @Test + fun testTraceExists() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + assertDoesNotThrow { format.open(input) } + } + + @Test + fun testTraceDoesNotExists() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + assertThrows<IllegalArgumentException> { format.open(URL(input.toString() + "help")) } + } + + @Test + fun testTables() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + val trace = format.open(input) + + assertEquals(listOf(TABLE_TASKS), trace.tables) + } + + @Test + fun testTableExists() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + val table = format.open(input).getTable(TABLE_TASKS) + + assertNotNull(table) + assertDoesNotThrow { table!!.newReader() } + } + + @Test + fun testTableDoesNotExist() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + val trace = format.open(input) + + assertFalse(trace.containsTable("test")) + assertNull(trace.getTable("test")) + } + + /** + * Smoke test for parsing WfCommons traces. + */ + @Test + fun testTableReader() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val trace = WfFormatTraceFormat().open(input) + val reader = trace.getTable(TABLE_TASKS)!!.newReader() + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("makebwaindex_mammoth_mt_krause.fasta", reader.get(TASK_ID)) }, + { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals(172000, reader.get(TASK_RUNTIME).toMillis()) }, + { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) }, + ) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("makeseqdict_mammoth_mt_krause.fasta", reader.get(TASK_ID)) }, + { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals(175000, reader.get(TASK_RUNTIME).toMillis()) }, + { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.get(TASK_PARENTS)) }, + ) + + reader.close() + } + + /** + * Test full iteration of the table. + */ + @Test + fun testTableReaderFull() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val trace = WfFormatTraceFormat().open(input) + val reader = trace.getTable(TABLE_TASKS)!!.newReader() + + assertDoesNotThrow { + while (reader.nextRow()) { + // reader.get(TASK_ID) + } + reader.close() + } + } + + @Test + fun testTableReaderPartition() { + val input = File("src/test/resources/trace.json").toURI().toURL() + val format = WfFormatTraceFormat() + val table = format.open(input).getTable(TABLE_TASKS)!! + + assertThrows<IllegalArgumentException> { table.newReader("test") } + } +} diff --git a/opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json b/opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json new file mode 100644 index 00000000..d21f024d --- /dev/null +++ b/opendc-trace/opendc-trace-wfformat/src/test/resources/trace.json @@ -0,0 +1,1342 @@ +{ + "name": "eager-nextflow-chameleon", + "description": "Instance generated with WfCommons - https://wfcommons.org", + "createdAt": "2021-09-06T03:43:31.762479", + "schemaVersion": "1.2", + "author": { + "name": "cc", + "email": "support@wfcommons.org" + }, + "wms": { + "name": "Nextflow", + "version": "21.04.3", + "url": "https://www.nextflow.io" + }, + "workflow": { + "executedAt": "20210906T034331+0000", + "makespan": 275, + "jobs": [ + { + "name": "makebwaindex_mammoth_mt_krause.fasta", + "type": "compute", + "runtime": 172.182, + "command": { + "program": "makebwaindex", + "arguments": [ + "bwa", + "index", + "Mammoth_MT_Krause.fasta", + "mkdir", + "BWAIndex", + "&&", + "mv", + "Mammoth_MT_Krause.fasta*", + "BWAIndex" + ] + }, + "parents": [], + "children": [ + "makeseqdict_mammoth_mt_krause.fasta" + ], + "files": [], + "cores": 1.0, + "id": "ID000001", + "category": "makebwaindex", + "avgCPU": 5.8, + "bytesRead": 124, + "bytesWritten": 126, + "memory": 4248 + }, + { + "name": "makeseqdict_mammoth_mt_krause.fasta", + "type": "compute", + "runtime": 175.427, + "command": { + "program": "makeseqdict", + "arguments": [ + "picard", + "-Xmx6144M", + "CreateSequenceDictionary", + "R=Mammoth_MT_Krause.fasta", + "O=\"Mammoth_MT_Krause.dict\"" + ] + }, + "parents": [ + "makebwaindex_mammoth_mt_krause.fasta" + ], + "children": [ + "makefastaindex_mammoth_mt_krause.fasta" + ], + "files": [], + "cores": 1.0, + "id": "ID000003", + "category": "makeseqdict", + "avgCPU": 83.5, + "bytesRead": 22728, + "bytesWritten": 1300, + "memory": 104416 + }, + { + "name": "makefastaindex_mammoth_mt_krause.fasta", + "type": "compute", + "runtime": 170.797, + "command": { + "program": "makefastaindex", + "arguments": [ + "samtools", + "faidx", + "Mammoth_MT_Krause.fasta" + ] + }, + "parents": [ + "makeseqdict_mammoth_mt_krause.fasta" + ], + "children": [ + "output_documentation" + ], + "files": [], + "cores": 1.0, + "id": "ID000002", + "category": "makefastaindex", + "avgCPU": 23.8, + "bytesRead": 66, + "bytesWritten": 4, + "memory": 6096 + }, + { + "name": "output_documentation", + "type": "compute", + "runtime": 173.479, + "command": { + "program": "output_documentation", + "arguments": [ + "markdown_to_html.py", + "output.md", + "-o", + "results_description.html" + ] + }, + "parents": [ + "makefastaindex_mammoth_mt_krause.fasta" + ], + "children": [ + "get_software_versions" + ], + "files": [], + "cores": 1.0, + "id": "ID000005", + "category": "output_documentation", + "avgCPU": 84.0, + "bytesRead": 8222, + "bytesWritten": 15165, + "memory": 11488 + }, + { + "name": "get_software_versions", + "type": "compute", + "runtime": 183.445, + "command": { + "program": "get_software_versions", + "arguments": [ + "echo", + "2.3.5", + "&>", + "v_pipeline.txt", + "echo", + "21.04.3", + "&>", + "v_nextflow.txt", + "fastqc", + "--version", + "&>", + "v_fastqc.txt", + "2>&1", + "||", + "true", + "AdapterRemoval", + "--version", + "&>", + "v_adapterremoval.txt", + "2>&1", + "||", + "true", + "fastp", + "--version", + "&>", + "v_fastp.txt", + "2>&1", + "||", + "true", + "bwa", + "&>", + "v_bwa.txt", + "2>&1", + "||", + "true", + "circulargenerator", + "--help", + "|", + "head", + "-n", + "1", + "&>", + "v_circulargenerator.txt", + "2>&1", + "||", + "true", + "samtools", + "--version", + "&>", + "v_samtools.txt", + "2>&1", + "||", + "true", + "dedup", + "-v", + "&>", + "v_dedup.txt", + "2>&1", + "||", + "true", + "##", + "bioconda", + "recipe", + "of", + "picard", + "is", + "incorrectly", + "set", + "up", + "and", + "extra", + "warning", + "made", + "with", + "stderr,", + "this", + "ugly", + "command", + "ensures", + "only", + "version", + "exported", + "(", + "exec", + "7>&1", + "picard", + "MarkDuplicates", + "--version", + "2>&1", + ">&7", + "|", + "grep", + "-v", + "/", + ">&2", + ")", + "2>", + "v_markduplicates.txt", + "||", + "true", + "qualimap", + "--version", + "&>", + "v_qualimap.txt", + "2>&1", + "||", + "true", + "preseq", + "&>", + "v_preseq.txt", + "2>&1", + "||", + "true", + "gatk", + "--version", + "2>&1", + "|", + "head", + "-n", + "1", + ">", + "v_gatk.txt", + "2>&1", + "||", + "true", + "gatk3", + "--version", + "2>&1", + ">", + "v_gatk3.txt", + "2>&1", + "||", + "true", + "freebayes", + "--version", + "&>", + "v_freebayes.txt", + "2>&1", + "||", + "true", + "bedtools", + "--version", + "&>", + "v_bedtools.txt", + "2>&1", + "||", + "true", + "damageprofiler", + "--version", + "&>", + "v_damageprofiler.txt", + "2>&1", + "||", + "true", + "bam", + "--version", + "&>", + "v_bamutil.txt", + "2>&1", + "||", + "true", + "pmdtools", + "--version", + "&>", + "v_pmdtools.txt", + "2>&1", + "||", + "true", + "angsd", + "-h", + "|&", + "head", + "-n", + "1", + "|", + "cut", + "-d", + "-f3-4", + "&>", + "v_angsd.txt", + "2>&1", + "||", + "true", + "multivcfanalyzer", + "--help", + "|", + "head", + "-n", + "1", + "&>", + "v_multivcfanalyzer.txt", + "2>&1", + "||", + "true", + "malt-run", + "--help", + "|&", + "tail", + "-n", + "3", + "|", + "head", + "-n", + "1", + "|", + "cut", + "-f", + "2", + "-d(", + "|", + "cut", + "-f", + "1", + "-d", + ",", + "&>", + "v_malt.txt", + "2>&1", + "||", + "true", + "MaltExtract", + "--help", + "|", + "head", + "-n", + "2", + "|", + "tail", + "-n", + "1", + "&>", + "v_maltextract.txt", + "2>&1", + "||", + "true", + "multiqc", + "--version", + "&>", + "v_multiqc.txt", + "2>&1", + "||", + "true", + "vcf2genome", + "-h", + "|&", + "head", + "-n", + "1", + "&>", + "v_vcf2genome.txt", + "||", + "true", + "mtnucratio", + "--help", + "&>", + "v_mtnucratiocalculator.txt", + "||", + "true", + "sexdeterrmine", + "--version", + "&>", + "v_sexdeterrmine.txt", + "||", + "true", + "kraken2", + "--version", + "|", + "head", + "-n", + "1", + "&>", + "v_kraken.txt", + "||", + "true", + "endorS.py", + "--version", + "&>", + "v_endorSpy.txt", + "||", + "true", + "pileupCaller", + "--version", + "&>", + "v_sequencetools.txt", + "2>&1", + "||", + "true", + "bowtie2", + "--version", + "|", + "grep", + "-a", + "bowtie2-.*", + "-fdebug", + ">", + "v_bowtie2.txt", + "||", + "true", + "eigenstrat_snp_coverage", + "--version", + "|", + "cut", + "-d", + "-f2", + ">v_eigenstrat_snp_coverage.txt", + "||", + "true", + "mapDamage", + "--version", + ">", + "v_mapdamage.txt", + "||", + "true", + "bbduk.sh", + "|", + "grep", + "Last", + "modified", + "|", + "cut", + "-d", + "-f", + "3-99", + ">", + "v_bbduk.txt", + "||", + "true", + "scrape_software_versions.py", + "&>", + "software_versions_mqc.yaml" + ] + }, + "parents": [ + "output_documentation" + ], + "children": [ + "fastqc_jk2782_l1", + "fastqc_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000006", + "category": "get_software_versions", + "avgCPU": 147.8, + "bytesRead": 172760, + "bytesWritten": 1048, + "memory": 387324 + }, + { + "name": "fastqc_jk2782_l1", + "type": "compute", + "runtime": 175.205, + "command": { + "program": "fastqc", + "arguments": [ + "fastqc", + "-t", + "2", + "-q", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq.gz", + "JK2782_TGGCCGATCAACGA_L008_R2_001.fastq.gz.tengrand.fq.gz", + "rename", + "s/_fastqc.zip$/_raw_fastqc.zip/", + "*_fastqc.zip", + "rename", + "s/_fastqc.html$/_raw_fastqc.html/", + "*_fastqc.html" + ] + }, + "parents": [ + "get_software_versions" + ], + "children": [ + "adapter_removal_jk2782_l1", + "adapter_removal_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000007", + "category": "fastqc", + "avgCPU": 161.8, + "bytesRead": 35981, + "bytesWritten": 3967, + "memory": 270124 + }, + { + "name": "adapter_removal_jk2782_l1", + "type": "compute", + "runtime": 172.643, + "command": { + "program": "adapter_removal", + "arguments": [ + "mkdir", + "-p", + "output", + "AdapterRemoval", + "--file1", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq.gz", + "--file2", + "JK2782_TGGCCGATCAACGA_L008_R2_001.fastq.gz.tengrand.fq.gz", + "--basename", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe", + "--gzip", + "--threads", + "2", + "--qualitymax", + "41", + "--collapse", + "--trimns", + "--trimqualities", + "--adapter1", + "AGATCGGAAGAGCACACGTCTGAACTCCAGTCAC", + "--adapter2", + "AGATCGGAAGAGCGTCGTGTAGGGAAAGAGTGTA", + "--minlength", + "30", + "--minquality", + "20", + "--minadapteroverlap", + "1", + "cat", + "*.collapsed.gz", + "*.collapsed.truncated.gz", + "*.singleton.truncated.gz", + "*.pair1.truncated.gz", + "*.pair2.truncated.gz", + ">", + "output/JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.tmp.fq.gz", + "mv", + "*.settings", + "output/", + "##", + "Add", + "R_", + "and", + "L_", + "for", + "unmerged", + "reads", + "for", + "DeDup", + "compatibility", + "AdapterRemovalFixPrefix", + "-Xmx4g", + "output/JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.tmp.fq.gz", + "|", + "pigz", + "-p", + "1", + ">", + "output/JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz" + ] + }, + "parents": [ + "fastqc_jk2782_l1", + "fastqc_jk2802_l2" + ], + "children": [ + "fastqc_after_clipping_jk2782_l1", + "fastqc_after_clipping_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000008", + "category": "adapter_removal", + "avgCPU": 160.9, + "bytesRead": 17357, + "bytesWritten": 4405, + "memory": 79308 + }, + { + "name": "fastqc_jk2802_l2", + "type": "compute", + "runtime": 177.338, + "command": { + "program": "fastqc", + "arguments": [ + "fastqc", + "-q", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq.gz", + "rename", + "s/_fastqc.zip$/_raw_fastqc.zip/", + "*_fastqc.zip", + "rename", + "s/_fastqc.html$/_raw_fastqc.html/", + "*_fastqc.html" + ] + }, + "parents": [ + "get_software_versions" + ], + "children": [ + "adapter_removal_jk2782_l1", + "adapter_removal_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000009", + "category": "fastqc", + "avgCPU": 120.1, + "bytesRead": 24457, + "bytesWritten": 2181, + "memory": 181060 + }, + { + "name": "adapter_removal_jk2802_l2", + "type": "compute", + "runtime": 174.313, + "command": { + "program": "adapter_removal", + "arguments": [ + "mkdir", + "-p", + "output", + "AdapterRemoval", + "--file1", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq.gz", + "--basename", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se", + "--gzip", + "--threads", + "2", + "--qualitymax", + "41", + "--trimns", + "--trimqualities", + "--adapter1", + "AGATCGGAAGAGCACACGTCTGAACTCCAGTCAC", + "--adapter2", + "AGATCGGAAGAGCGTCGTGTAGGGAAAGAGTGTA", + "--minlength", + "30", + "--minquality", + "20", + "--minadapteroverlap", + "1", + "mv", + "*.settings", + "*.se.truncated.gz", + "output/" + ] + }, + "parents": [ + "fastqc_jk2782_l1", + "fastqc_jk2802_l2" + ], + "children": [ + "fastqc_after_clipping_jk2782_l1", + "fastqc_after_clipping_jk2802_l2" + ], + "files": [], + "cores": 2.0, + "id": "ID000010", + "category": "adapter_removal", + "avgCPU": 106.5, + "bytesRead": 683, + "bytesWritten": 897, + "memory": 12136 + }, + { + "name": "fastqc_after_clipping_jk2782_l1", + "type": "compute", + "runtime": 15.371, + "command": { + "program": "fastqc_after_clipping", + "arguments": [ + "fastqc", + "-t", + "2", + "-q", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz" + ] + }, + "parents": [ + "adapter_removal_jk2782_l1", + "adapter_removal_jk2802_l2" + ], + "children": [ + "bwa_jk2802", + "bwa_jk2782" + ], + "files": [], + "cores": 2.0, + "id": "ID000013", + "category": "fastqc_after_clipping", + "avgCPU": 133.3, + "bytesRead": 23788, + "bytesWritten": 1998, + "memory": 215020 + }, + { + "name": "fastqc_after_clipping_jk2802_l2", + "type": "compute", + "runtime": 15.272, + "command": { + "program": "fastqc_after_clipping", + "arguments": [ + "fastqc", + "-t", + "2", + "-q", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se.truncated.gz" + ] + }, + "parents": [ + "adapter_removal_jk2782_l1", + "adapter_removal_jk2802_l2" + ], + "children": [ + "bwa_jk2802", + "bwa_jk2782" + ], + "files": [], + "cores": 2.0, + "id": "ID000014", + "category": "fastqc_after_clipping", + "avgCPU": 124.1, + "bytesRead": 23882, + "bytesWritten": 2143, + "memory": 213064 + }, + { + "name": "bwa_jk2802", + "type": "compute", + "runtime": 9.566, + "command": { + "program": "bwa", + "arguments": [ + "bwa", + "aln", + "-t", + "2", + "BWAIndex/Mammoth_MT_Krause.fasta", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se.truncated.gz", + "-n", + "0.04", + "-l", + "1024", + "-k", + "2", + "-o", + "1", + "-f", + "JK2802.sai", + "bwa", + "samse", + "-r", + "\"@RGtID:ILLUMINA-JK2802tSM:JK2802tPL:illuminatPU:ILLUMINA-JK2802-SE\"", + "BWAIndex/Mammoth_MT_Krause.fasta", + "JK2802.sai", + "JK2802_AGAATAACCTACCA_L008_R1_001.fastq.gz.tengrand.fq_L2.se.truncated.gz", + "|", + "samtools", + "sort", + "-@", + "1", + "-O", + "bam", + "-", + ">", + "\"JK2802\"_\"SE\".mapped.bam", + "samtools", + "index", + "\"JK2802\"_\"SE\".mapped.bam" + ] + }, + "parents": [ + "fastqc_after_clipping_jk2782_l1", + "fastqc_after_clipping_jk2802_l2" + ], + "children": [ + "samtools_flagstat_jk2782", + "samtools_flagstat_jk2802" + ], + "files": [], + "cores": 2.0, + "id": "ID000016", + "category": "bwa", + "avgCPU": 15.7, + "bytesRead": 3774, + "bytesWritten": 3367, + "memory": 10628 + }, + { + "name": "bwa_jk2782", + "type": "compute", + "runtime": 9.652, + "command": { + "program": "bwa", + "arguments": [ + "bwa", + "aln", + "-t", + "2", + "BWAIndex/Mammoth_MT_Krause.fasta", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz", + "-n", + "0.04", + "-l", + "1024", + "-k", + "2", + "-o", + "1", + "-f", + "JK2782.sai", + "bwa", + "samse", + "-r", + "\"@RGtID:ILLUMINA-JK2782tSM:JK2782tPL:illuminatPU:ILLUMINA-JK2782-PE\"", + "BWAIndex/Mammoth_MT_Krause.fasta", + "JK2782.sai", + "JK2782_TGGCCGATCAACGA_L008_R1_001.fastq.gz.tengrand.fq_L1.pe.combined.fq.gz", + "|", + "samtools", + "sort", + "-@", + "1", + "-O", + "bam", + "-", + ">", + "\"JK2782\"_\"PE\".mapped.bam", + "samtools", + "index", + "\"JK2782\"_\"PE\".mapped.bam" + ] + }, + "parents": [ + "fastqc_after_clipping_jk2782_l1", + "fastqc_after_clipping_jk2802_l2" + ], + "children": [ + "samtools_flagstat_jk2782", + "samtools_flagstat_jk2802" + ], + "files": [], + "cores": 2.0, + "id": "ID000015", + "category": "bwa", + "avgCPU": 69.8, + "bytesRead": 3705, + "bytesWritten": 3355, + "memory": 12876 + }, + { + "name": "samtools_flagstat_jk2782", + "type": "compute", + "runtime": 13.011, + "command": { + "program": "samtools_flagstat", + "arguments": [ + "samtools", + "flagstat", + "JK2782_PE.mapped.bam", + ">", + "JK2782_flagstat.stats" + ] + }, + "parents": [ + "bwa_jk2802", + "bwa_jk2782" + ], + "children": [ + "markduplicates_jk2782", + "markduplicates_jk2802" + ], + "files": [], + "cores": 1.0, + "id": "ID000026", + "category": "samtools_flagstat", + "avgCPU": 30.1, + "bytesRead": 478, + "bytesWritten": 5, + "memory": 6468 + }, + { + "name": "samtools_flagstat_jk2802", + "type": "compute", + "runtime": 13.129, + "command": { + "program": "samtools_flagstat", + "arguments": [ + "samtools", + "flagstat", + "JK2802_SE.mapped.bam", + ">", + "JK2802_flagstat.stats" + ] + }, + "parents": [ + "bwa_jk2802", + "bwa_jk2782" + ], + "children": [ + "markduplicates_jk2782", + "markduplicates_jk2802" + ], + "files": [], + "cores": 1.0, + "id": "ID000024", + "category": "samtools_flagstat", + "avgCPU": 118.5, + "bytesRead": 551, + "bytesWritten": 5 + }, + { + "name": "markduplicates_jk2782", + "type": "compute", + "runtime": 22.655, + "command": { + "program": "markduplicates", + "arguments": [ + "mv", + "JK2782_PE.mapped.bam", + "JK2782.bam", + "picard", + "-Xmx4096M", + "MarkDuplicates", + "INPUT=JK2782.bam", + "OUTPUT=JK2782_rmdup.bam", + "REMOVE_DUPLICATES=TRUE", + "AS=TRUE", + "METRICS_FILE=\"JK2782_rmdup.metrics\"", + "VALIDATION_STRINGENCY=SILENT", + "samtools", + "index", + "JK2782_rmdup.bam" + ] + }, + "parents": [ + "samtools_flagstat_jk2782", + "samtools_flagstat_jk2802" + ], + "children": [ + "preseq_jk2782", + "preseq_jk2802" + ], + "files": [], + "cores": 2.0, + "id": "ID000021", + "category": "markduplicates", + "avgCPU": 173.6, + "bytesRead": 24055, + "bytesWritten": 2319, + "memory": 1400048 + }, + { + "name": "markduplicates_jk2802", + "type": "compute", + "runtime": 21.545, + "command": { + "program": "markduplicates", + "arguments": [ + "mv", + "JK2802_SE.mapped.bam", + "JK2802.bam", + "picard", + "-Xmx4096M", + "MarkDuplicates", + "INPUT=JK2802.bam", + "OUTPUT=JK2802_rmdup.bam", + "REMOVE_DUPLICATES=TRUE", + "AS=TRUE", + "METRICS_FILE=\"JK2802_rmdup.metrics\"", + "VALIDATION_STRINGENCY=SILENT", + "samtools", + "index", + "JK2802_rmdup.bam" + ] + }, + "parents": [ + "samtools_flagstat_jk2782", + "samtools_flagstat_jk2802" + ], + "children": [ + "preseq_jk2782", + "preseq_jk2802" + ], + "files": [], + "cores": 2.0, + "id": "ID000020", + "category": "markduplicates", + "avgCPU": 182.6, + "bytesRead": 24242, + "bytesWritten": 2466, + "memory": 1404624 + }, + { + "name": "preseq_jk2782", + "type": "compute", + "runtime": 12.299, + "command": { + "program": "preseq", + "arguments": [ + "preseq", + "c_curve", + "-s", + "1000", + "-o", + "JK2782_PE.mapped.ccurve", + "-B", + "JK2782_PE.mapped.bam" + ] + }, + "parents": [ + "markduplicates_jk2782", + "markduplicates_jk2802" + ], + "children": [ + "endorspy_jk2782", + "endorspy_jk2802" + ], + "files": [], + "cores": 1.0, + "id": "ID000030", + "category": "preseq", + "avgCPU": 81.9, + "bytesRead": 473, + "bytesWritten": 4, + "memory": 12032 + }, + { + "name": "preseq_jk2802", + "type": "compute", + "runtime": 10.188, + "command": { + "program": "preseq", + "arguments": [ + "preseq", + "c_curve", + "-s", + "1000", + "-o", + "JK2802_SE.mapped.ccurve", + "-B", + "JK2802_SE.mapped.bam" + ] + }, + "parents": [ + "markduplicates_jk2782", + "markduplicates_jk2802" + ], + "children": [ + "endorspy_jk2782", + "endorspy_jk2802" + ], + "files": [], + "cores": 1.0, + "id": "ID000027", + "category": "preseq", + "avgCPU": 77.6, + "bytesRead": 548, + "bytesWritten": 4, + "memory": 11972 + }, + { + "name": "endorspy_jk2782", + "type": "compute", + "runtime": 7.537, + "command": { + "program": "endorspy", + "arguments": [ + "endorS.py", + "-o", + "json", + "-n", + "JK2782", + "JK2782_flagstat.stats" + ] + }, + "parents": [ + "preseq_jk2782", + "preseq_jk2802" + ], + "children": [ + "damageprofiler_jk2802", + "damageprofiler_jk2782" + ], + "files": [], + "cores": 1.0, + "id": "ID000031", + "category": "endorspy", + "avgCPU": 44.7, + "bytesRead": 623, + "bytesWritten": 4, + "memory": 12264 + }, + { + "name": "endorspy_jk2802", + "type": "compute", + "runtime": 8.0, + "command": { + "program": "endorspy", + "arguments": [ + "endorS.py", + "-o", + "json", + "-n", + "JK2802", + "JK2802_flagstat.stats" + ] + }, + "parents": [ + "preseq_jk2782", + "preseq_jk2802" + ], + "children": [ + "damageprofiler_jk2802", + "damageprofiler_jk2782" + ], + "files": [], + "cores": 1.0, + "id": "ID000032", + "category": "endorspy", + "avgCPU": 54.0, + "bytesRead": 623, + "bytesWritten": 4, + "memory": 12224 + }, + { + "name": "damageprofiler_jk2802", + "type": "compute", + "runtime": 18.596, + "command": { + "program": "damageprofiler", + "arguments": [ + "damageprofiler", + "-Xmx4g", + "-i", + "JK2802_rmdup.bam", + "-r", + "Mammoth_MT_Krause.fasta", + "-l", + "100", + "-t", + "15", + "-o", + ".", + "-yaxis_damageplot", + "0.30" + ] + }, + "parents": [ + "endorspy_jk2782", + "endorspy_jk2802" + ], + "children": [ + "qualimap_jk2802", + "qualimap_jk2782" + ], + "files": [], + "cores": 1.0, + "id": "ID000033", + "category": "damageprofiler", + "avgCPU": 88.6, + "bytesRead": 25744, + "bytesWritten": 391, + "memory": 242940 + }, + { + "name": "damageprofiler_jk2782", + "type": "compute", + "runtime": 16.736, + "command": { + "program": "damageprofiler", + "arguments": [ + "damageprofiler", + "-Xmx4g", + "-i", + "JK2782_rmdup.bam", + "-r", + "Mammoth_MT_Krause.fasta", + "-l", + "100", + "-t", + "15", + "-o", + ".", + "-yaxis_damageplot", + "0.30" + ] + }, + "parents": [ + "endorspy_jk2782", + "endorspy_jk2802" + ], + "children": [ + "qualimap_jk2802", + "qualimap_jk2782" + ], + "files": [], + "cores": 1.0, + "id": "ID000036", + "category": "damageprofiler", + "avgCPU": 88.3, + "bytesRead": 25661, + "bytesWritten": 327, + "memory": 198276 + }, + { + "name": "qualimap_jk2802", + "type": "compute", + "runtime": 15.368, + "command": { + "program": "qualimap", + "arguments": [ + "qualimap", + "bamqc", + "-bam", + "JK2802_rmdup.bam", + "-nt", + "2", + "-outdir", + ".", + "-outformat", + "\"HTML\"", + "--java-mem-size=4G" + ] + }, + "parents": [ + "damageprofiler_jk2802", + "damageprofiler_jk2782" + ], + "children": [ + "multiqc_1" + ], + "files": [], + "cores": 2.0, + "id": "ID000053", + "category": "qualimap", + "avgCPU": 177.7, + "bytesRead": 35038, + "bytesWritten": 1712, + "memory": 209440 + }, + { + "name": "qualimap_jk2782", + "type": "compute", + "runtime": 14.223, + "command": { + "program": "qualimap", + "arguments": [ + "qualimap", + "bamqc", + "-bam", + "JK2782_rmdup.bam", + "-nt", + "2", + "-outdir", + ".", + "-outformat", + "\"HTML\"", + "--java-mem-size=4G" + ] + }, + "parents": [ + "damageprofiler_jk2802", + "damageprofiler_jk2782" + ], + "children": [ + "multiqc_1" + ], + "files": [], + "cores": 2.0, + "id": "ID000054", + "category": "qualimap", + "avgCPU": 181.9, + "bytesRead": 34954, + "bytesWritten": 1937, + "memory": 232196 + }, + { + "name": "multiqc_1", + "type": "compute", + "runtime": 46.376, + "command": { + "program": "multiqc", + "arguments": [ + "multiqc", + "-f", + "multiqc_config.yaml", + "." + ] + }, + "parents": [ + "qualimap_jk2802", + "qualimap_jk2782" + ], + "children": [], + "files": [], + "cores": 1.0, + "id": "ID000056", + "category": "multiqc", + "avgCPU": 93.0, + "bytesRead": 1215169, + "bytesWritten": 22599, + "memory": 139496 + } + ] + } +} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt index b6789542..5e2463f8 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt @@ -25,6 +25,8 @@ package org.opendc.trace.wtf import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Duration +import java.time.Instant /** * A [TableReader] implementation for the WTF format. @@ -61,14 +63,14 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic @Suppress("UNCHECKED_CAST") val res: Any = when (column) { - TASK_ID -> record["id"] - TASK_WORKFLOW_ID -> record["workflow_id"] - TASK_SUBMIT_TIME -> record["ts_submit"] - TASK_WAIT_TIME -> record["wait_time"] - TASK_RUNTIME -> record["runtime"] + TASK_ID -> (record["id"] as Long).toString() + TASK_WORKFLOW_ID -> (record["workflow_id"] as Long).toString() + TASK_SUBMIT_TIME -> Instant.ofEpochMilli(record["ts_submit"] as Long) + TASK_WAIT_TIME -> Duration.ofMillis(record["wait_time"] as Long) + TASK_RUNTIME -> Duration.ofMillis(record["runtime"] as Long) TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt() - TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"] as Long }.toSet() - TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"] as Long }.toSet() + TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet() + TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet() TASK_GROUP_ID -> record["group_id"] TASK_USER_ID -> record["user_id"] else -> throw IllegalArgumentException("Invalid column") @@ -94,16 +96,7 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic } override fun getLong(column: TableColumn<Long>): Long { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (column) { - TASK_ID -> record["id"] as Long - TASK_WORKFLOW_ID -> record["workflow_id"] as Long - TASK_SUBMIT_TIME -> record["ts_submit"] as Long - TASK_WAIT_TIME -> record["wait_time"] as Long - TASK_RUNTIME -> record["runtime"] as Long - else -> throw IllegalArgumentException("Invalid column") - } + throw IllegalArgumentException("Invalid column") } override fun getDouble(column: TableColumn<Double>): Double { diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index a05a523e..b155f265 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -28,6 +28,8 @@ import org.junit.jupiter.api.assertThrows import org.opendc.trace.* import java.io.File import java.net.URL +import java.time.Duration +import java.time.Instant /** * Test suite for the [WtfTraceFormat] class. @@ -91,20 +93,20 @@ class WtfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(362334516345962206, reader.getLong(TASK_ID)) }, - { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) }, - { assertEquals(245604, reader.getLong(TASK_SUBMIT_TIME)) }, - { assertEquals(8163, reader.getLong(TASK_RUNTIME)) }, - { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) }, + { assertEquals("362334516345962206", reader.get(TASK_ID)) }, + { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals(Instant.ofEpochMilli(245604), reader.get(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofMillis(8163), reader.get(TASK_RUNTIME)) }, + { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) }, ) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(502010169100446658, reader.getLong(TASK_ID)) }, - { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) }, - { assertEquals(251325, reader.getLong(TASK_SUBMIT_TIME)) }, - { assertEquals(8216, reader.getLong(TASK_RUNTIME)) }, - { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) }, + { assertEquals("502010169100446658", reader.get(TASK_ID)) }, + { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals(Instant.ofEpochMilli(251325), reader.get(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofMillis(8216), reader.get(TASK_RUNTIME)) }, + { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) }, ) reader.close() diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt index a390fe08..9ee3736e 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/TraceReplayer.kt @@ -81,17 +81,17 @@ internal class TraceReplayer(private val trace: Trace) { try { while (reader.nextRow()) { // Bag of tasks without workflow ID all share the same workflow - val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.getLong(TASK_WORKFLOW_ID) else 0L + val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.get(TASK_WORKFLOW_ID).toLong() else 0L val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) } - val id = reader.getLong(TASK_ID) + val id = reader.get(TASK_ID).toLong() val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS)) reader.getInt(TASK_ALLOC_NCPUS) else reader.getInt(TASK_REQ_NCPUS) - val submitTime = reader.getLong(TASK_SUBMIT_TIME) - val runtime = reader.getLong(TASK_RUNTIME) - val flops: Long = 4000 * runtime * grantedCpus + val submitTime = reader.get(TASK_SUBMIT_TIME) + val runtime = reader.get(TASK_RUNTIME) + val flops: Long = 4000 * runtime.seconds * grantedCpus val workload = SimFlopsWorkload(flops) val task = Task( UUID(0L, id), @@ -100,14 +100,14 @@ internal class TraceReplayer(private val trace: Trace) { mapOf( "workload" to workload, WORKFLOW_TASK_CORES to grantedCpus, - WORKFLOW_TASK_DEADLINE to (runtime * 1000) + WORKFLOW_TASK_DEADLINE to runtime.toMillis() ), ) tasks[id] = task - taskDependencies[task] = reader.get(TASK_PARENTS) + taskDependencies[task] = reader.get(TASK_PARENTS).map { it.toLong() }.toSet() - (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime) { a, b -> min(a as Long, b as Long) } + (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) } (workflow.tasks as MutableSet<Task>).add(task) } diff --git a/settings.gradle.kts b/settings.gradle.kts index 427cdb52..60e67e2b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -49,6 +49,7 @@ include(":opendc-trace:opendc-trace-api") include(":opendc-trace:opendc-trace-gwf") include(":opendc-trace:opendc-trace-swf") include(":opendc-trace:opendc-trace-wtf") +include(":opendc-trace:opendc-trace-wfformat") include(":opendc-trace:opendc-trace-bitbrains") include(":opendc-trace:opendc-trace-parquet") include(":opendc-harness:opendc-harness-api") |
