summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-wtf/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-10 22:10:22 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-11 11:33:25 +0200
commitb7be3400bb4b21d0cd7021e2baf1f6ce43aba189 (patch)
tree7e44a27d5d10e9f0d4b3c0dd3546fbb513175b96 /opendc-trace/opendc-trace-wtf/src
parent9e8ea96270701e643f95b18d2b91583d9fca08d2 (diff)
feat(trace): Add support for WfCommons (WorkflowHub) traces
This change adds support for reading WfCommons workflow traces in OpenDC. This functionality is available in the new `opendc-trace-wfformat` module.
Diffstat (limited to 'opendc-trace/opendc-trace-wtf/src')
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt27
-rw-r--r--opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt22
2 files changed, 22 insertions, 27 deletions
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
index b6789542..5e2463f8 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
@@ -25,6 +25,8 @@ package org.opendc.trace.wtf
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Duration
+import java.time.Instant
/**
* A [TableReader] implementation for the WTF format.
@@ -61,14 +63,14 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
@Suppress("UNCHECKED_CAST")
val res: Any = when (column) {
- TASK_ID -> record["id"]
- TASK_WORKFLOW_ID -> record["workflow_id"]
- TASK_SUBMIT_TIME -> record["ts_submit"]
- TASK_WAIT_TIME -> record["wait_time"]
- TASK_RUNTIME -> record["runtime"]
+ TASK_ID -> (record["id"] as Long).toString()
+ TASK_WORKFLOW_ID -> (record["workflow_id"] as Long).toString()
+ TASK_SUBMIT_TIME -> Instant.ofEpochMilli(record["ts_submit"] as Long)
+ TASK_WAIT_TIME -> Duration.ofMillis(record["wait_time"] as Long)
+ TASK_RUNTIME -> Duration.ofMillis(record["runtime"] as Long)
TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt()
- TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"] as Long }.toSet()
- TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"] as Long }.toSet()
+ TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
+ TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
TASK_GROUP_ID -> record["group_id"]
TASK_USER_ID -> record["user_id"]
else -> throw IllegalArgumentException("Invalid column")
@@ -94,16 +96,7 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
}
override fun getLong(column: TableColumn<Long>): Long {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (column) {
- TASK_ID -> record["id"] as Long
- TASK_WORKFLOW_ID -> record["workflow_id"] as Long
- TASK_SUBMIT_TIME -> record["ts_submit"] as Long
- TASK_WAIT_TIME -> record["wait_time"] as Long
- TASK_RUNTIME -> record["runtime"] as Long
- else -> throw IllegalArgumentException("Invalid column")
- }
+ throw IllegalArgumentException("Invalid column")
}
override fun getDouble(column: TableColumn<Double>): Double {
diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
index a05a523e..b155f265 100644
--- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
@@ -28,6 +28,8 @@ import org.junit.jupiter.api.assertThrows
import org.opendc.trace.*
import java.io.File
import java.net.URL
+import java.time.Duration
+import java.time.Instant
/**
* Test suite for the [WtfTraceFormat] class.
@@ -91,20 +93,20 @@ class WtfTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals(362334516345962206, reader.getLong(TASK_ID)) },
- { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) },
- { assertEquals(245604, reader.getLong(TASK_SUBMIT_TIME)) },
- { assertEquals(8163, reader.getLong(TASK_RUNTIME)) },
- { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) },
+ { assertEquals("362334516345962206", reader.get(TASK_ID)) },
+ { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) },
+ { assertEquals(Instant.ofEpochMilli(245604), reader.get(TASK_SUBMIT_TIME)) },
+ { assertEquals(Duration.ofMillis(8163), reader.get(TASK_RUNTIME)) },
+ { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) },
)
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals(502010169100446658, reader.getLong(TASK_ID)) },
- { assertEquals(1078341553348591493, reader.getLong(TASK_WORKFLOW_ID)) },
- { assertEquals(251325, reader.getLong(TASK_SUBMIT_TIME)) },
- { assertEquals(8216, reader.getLong(TASK_RUNTIME)) },
- { assertEquals(setOf(584055316413447529, 133113685133695608, 1008582348422865408), reader.get(TASK_PARENTS)) },
+ { assertEquals("502010169100446658", reader.get(TASK_ID)) },
+ { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) },
+ { assertEquals(Instant.ofEpochMilli(251325), reader.get(TASK_SUBMIT_TIME)) },
+ { assertEquals(Duration.ofMillis(8216), reader.get(TASK_RUNTIME)) },
+ { assertEquals(setOf("584055316413447529", "133113685133695608", "1008582348422865408"), reader.get(TASK_PARENTS)) },
)
reader.close()