From fa08b63bd749e9fbe1a1d04ef2ebd7a86453fa4b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 11 Sep 2021 10:52:30 +0200 Subject: perf(trace): Keep reader state in own class This change removes the external class that holds the state of the reader and instead puts the state in the reader implementation. Maintaining a separate class for the state increases the complexity and has worse performance characteristics due to the bytecode produced by Kotlin for property accesses. --- .../opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'opendc-trace/opendc-trace-wtf/src') diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt index 7eff0f5a..a755a107 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTrace.kt @@ -43,5 +43,5 @@ public class WtfTrace internal constructor(private val path: Path) : Trace { return WtfTaskTable(path) } - override fun toString(): String = "SwfTrace[$path]" + override fun toString(): String = "WtfTrace[$path]" } -- cgit v1.2.3 From b7be3400bb4b21d0cd7021e2baf1f6ce43aba189 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 10 Sep 2021 22:10:22 +0200 Subject: feat(trace): Add support for WfCommons (WorkflowHub) traces This change adds support for reading WfCommons workflow traces in OpenDC. This functionality is available in the new `opendc-trace-wfformat` module. --- .../org/opendc/trace/wtf/WtfTaskTableReader.kt | 27 ++++++++-------------- .../org/opendc/trace/wtf/WtfTraceFormatTest.kt | 22 ++++++++++-------- 2 files changed, 22 insertions(+), 27 deletions(-) (limited to 'opendc-trace/opendc-trace-wtf/src') diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt index b6789542..5e2463f8 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt @@ -25,6 +25,8 @@ package org.opendc.trace.wtf import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Duration +import java.time.Instant /** * A [TableReader] implementation for the WTF format. @@ -61,14 +63,14 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader record["id"] - TASK_WORKFLOW_ID -> record["workflow_id"] - TASK_SUBMIT_TIME -> record["ts_submit"] - TASK_WAIT_TIME -> record["wait_time"] - TASK_RUNTIME -> record["runtime"] + TASK_ID -> (record["id"] as Long).toString() + TASK_WORKFLOW_ID -> (record["workflow_id"] as Long).toString() + TASK_SUBMIT_TIME -> Instant.ofEpochMilli(record["ts_submit"] as Long) + TASK_WAIT_TIME -> Duration.ofMillis(record["wait_time"] as Long) + TASK_RUNTIME -> Duration.ofMillis(record["runtime"] as Long) TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt() - TASK_PARENTS -> (record["parents"] as ArrayList).map { it["item"] as Long }.toSet() - TASK_CHILDREN -> (record["children"] as ArrayList).map { it["item"] as Long }.toSet() + TASK_PARENTS -> (record["parents"] as ArrayList).map { it["item"].toString() }.toSet() + TASK_CHILDREN -> (record["children"] as ArrayList).map { it["item"].toString() }.toSet() TASK_GROUP_ID -> record["group_id"] TASK_USER_ID -> record["user_id"] else -> throw IllegalArgumentException("Invalid column") @@ -94,16 +96,7 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader): Long { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (column) { - TASK_ID -> record["id"] as Long - TASK_WORKFLOW_ID -> record["workflow_id"] as Long - TASK_SUBMIT_TIME -> record["ts_submit"] as Long - TASK_WAIT_TIME -> record["wait_time"] as Long - TASK_RUNTIME -> record["runtime"] as Long - else -> throw IllegalArgumentException("Invalid column") - } + throw IllegalArgumentException("Invalid column") } override fun getDouble(column: TableColumn): Double { diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index a05a523e..b155f265 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -28,6 +28,8 @@ import org.junit.jupiter.api.assertThrows import org.opendc.trace.* import java.io.File import java.net.URL +import java.time.Duration +import java.time.Instant /** * Test suite for the [WtfTraceFormat] class. @@ -91,20 +93,20 @@ class WtfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(362334516345962206, reader.getLong(TASK_ID)) }, - { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) }, - { assertEquals(245604, reader.getLong(TASK_SUBMIT_TIME)) }, - { assertEquals(8163, reader.getLong(TASK_RUNTIME)) }, - { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) }, + { assertEquals("362334516345962206", reader.get(TASK_ID)) }, + { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals(Instant.ofEpochMilli(245604), reader.get(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofMillis(8163), reader.get(TASK_RUNTIME)) }, + { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) }, ) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(502010169100446658, reader.getLong(TASK_ID)) }, - { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) }, - { assertEquals(251325, reader.getLong(TASK_SUBMIT_TIME)) }, - { assertEquals(8216, reader.getLong(TASK_RUNTIME)) }, - { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) }, + { assertEquals("502010169100446658", reader.get(TASK_ID)) }, + { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, + { assertEquals(Instant.ofEpochMilli(251325), reader.get(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofMillis(8216), reader.get(TASK_RUNTIME)) }, + { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) }, ) reader.close() -- cgit v1.2.3 From 49dd8377c8bfde1e64e411c6a6f921c768b9b53b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 12 Sep 2021 11:22:07 +0200 Subject: refactor(trace): Add API for accessing available table columns This change adds a new API to the Table interface for accessing the table columns that the table supports. This does not necessarily mean that the column will have a value for every row, but that the table format has defined this particular column. --- .../kotlin/org/opendc/trace/wtf/WtfTaskTable.kt | 27 ++++++++++------------ 1 file changed, 12 insertions(+), 15 deletions(-) (limited to 'opendc-trace/opendc-trace-wtf/src') diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt index be26f540..74202718 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTable.kt @@ -35,21 +35,18 @@ internal class WtfTaskTable(private val path: Path) : Table { override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - TASK_ID -> true - TASK_WORKFLOW_ID -> true - TASK_SUBMIT_TIME -> true - TASK_WAIT_TIME -> true - TASK_RUNTIME -> true - TASK_REQ_NCPUS -> true - TASK_PARENTS -> true - TASK_CHILDREN -> true - TASK_GROUP_ID -> true - TASK_USER_ID -> true - else -> false - } - } + override val columns: List> = listOf( + TASK_ID, + TASK_WORKFLOW_ID, + TASK_SUBMIT_TIME, + TASK_WAIT_TIME, + TASK_RUNTIME, + TASK_REQ_NCPUS, + TASK_PARENTS, + TASK_CHILDREN, + TASK_GROUP_ID, + TASK_USER_ID + ) override fun newReader(): TableReader { val reader = LocalParquetReader(path.resolve("tasks/schema-1.0")) -- cgit v1.2.3