summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-wtf/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-10 22:10:22 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-11 11:33:25 +0200
commitb7be3400bb4b21d0cd7021e2baf1f6ce43aba189 (patch)
tree7e44a27d5d10e9f0d4b3c0dd3546fbb513175b96 /opendc-trace/opendc-trace-wtf/src/main
parent9e8ea96270701e643f95b18d2b91583d9fca08d2 (diff)
feat(trace): Add support for WfCommons (WorkflowHub) traces
This change adds support for reading WfCommons workflow traces in OpenDC. This functionality is available in the new `opendc-trace-wfformat` module.
Diffstat (limited to 'opendc-trace/opendc-trace-wtf/src/main')
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt27
1 files changed, 10 insertions, 17 deletions
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
index b6789542..5e2463f8 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
@@ -25,6 +25,8 @@ package org.opendc.trace.wtf
import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Duration
+import java.time.Instant
/**
* A [TableReader] implementation for the WTF format.
@@ -61,14 +63,14 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
@Suppress("UNCHECKED_CAST")
val res: Any = when (column) {
- TASK_ID -> record["id"]
- TASK_WORKFLOW_ID -> record["workflow_id"]
- TASK_SUBMIT_TIME -> record["ts_submit"]
- TASK_WAIT_TIME -> record["wait_time"]
- TASK_RUNTIME -> record["runtime"]
+ TASK_ID -> (record["id"] as Long).toString()
+ TASK_WORKFLOW_ID -> (record["workflow_id"] as Long).toString()
+ TASK_SUBMIT_TIME -> Instant.ofEpochMilli(record["ts_submit"] as Long)
+ TASK_WAIT_TIME -> Duration.ofMillis(record["wait_time"] as Long)
+ TASK_RUNTIME -> Duration.ofMillis(record["runtime"] as Long)
TASK_REQ_NCPUS -> (record["resource_amount_requested"] as Double).toInt()
- TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"] as Long }.toSet()
- TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"] as Long }.toSet()
+ TASK_PARENTS -> (record["parents"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
+ TASK_CHILDREN -> (record["children"] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
TASK_GROUP_ID -> record["group_id"]
TASK_USER_ID -> record["user_id"]
else -> throw IllegalArgumentException("Invalid column")
@@ -94,16 +96,7 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
}
override fun getLong(column: TableColumn<Long>): Long {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (column) {
- TASK_ID -> record["id"] as Long
- TASK_WORKFLOW_ID -> record["workflow_id"] as Long
- TASK_SUBMIT_TIME -> record["ts_submit"] as Long
- TASK_WAIT_TIME -> record["wait_time"] as Long
- TASK_RUNTIME -> record["runtime"] as Long
- else -> throw IllegalArgumentException("Invalid column")
- }
+ throw IllegalArgumentException("Invalid column")
}
override fun getDouble(column: TableColumn<Double>): Double {