summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-wtf/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-12-13 23:59:47 +0000
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-12-14 00:24:24 +0000
commit3542350909b1213240e5097a1793a7c0733f6196 (patch)
treee602b041f6c054e40e01548ae918cc7faf21e2a7 /opendc-trace/opendc-trace-wtf/src
parentde739998fde6b856e40f8a98f78efddc0c57f167 (diff)
fix(trace/wtf): Disable Parquet strict typing
This change fixes an issue where some of the traces from the Workflow Trace Archive would fail to load with the trace format in OpenDC. This was caused by one of the fields being stored as a double, while the formats expects it to be a long. Parquet does not support unioning primitive types. Therefore, we have to disable strict type checking when reading the file. Furthermore, we need to support double entries for storing the workflow ids.
Diffstat (limited to 'opendc-trace/opendc-trace-wtf/src')
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt5
-rw-r--r--opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt15
-rwxr-xr-xopendc-trace/opendc-trace-wtf/src/test/resources/shell/tasks/schema-1.0/part.0.parquetbin0 -> 259381 bytes
-rwxr-xr-xopendc-trace/opendc-trace-wtf/src/test/resources/shell/workflows/schema-1.0/part.0.parquetbin0 -> 63858 bytes
-rwxr-xr-xopendc-trace/opendc-trace-wtf/src/test/resources/shell/workload/schema-1.0/generic_information.json1
6 files changed, 22 insertions, 1 deletions
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
index bf834778..c25b512c 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
@@ -78,7 +78,7 @@ public class WtfTraceFormat : TraceFormat {
override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
return when (table) {
TABLE_TASKS -> {
- val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection))
+ val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection), strictTyping = false)
WtfTaskTableReader(reader)
}
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt
index f188a3ff..055be0c3 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt
@@ -30,6 +30,7 @@ import org.apache.parquet.schema.MessageType
import java.time.Duration
import java.time.Instant
import kotlin.math.roundToInt
+import kotlin.math.roundToLong
/**
* A [RecordMaterializer] for [Task] records.
@@ -145,6 +146,10 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<
override fun addLong(value: Long) {
relations.add(value.toString())
}
+
+ override fun addDouble(value: Double) {
+ relations.add(value.roundToLong().toString())
+ }
}
private val listConverter = object : GroupConverter() {
diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
index 9cd9538f..0457098c 100644
--- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
@@ -123,4 +123,19 @@ class WtfTraceFormatTest {
reader = format.newReader(path, TABLE_TASKS, null)
}
}
+
+ @DisplayName("TableReader for Tasks (Shell trace)")
+ @Nested
+ inner class ShellTasksTableReaderTest : TableReaderTestKit() {
+ override lateinit var reader: TableReader
+ override lateinit var columns: List<TableColumn>
+
+ @BeforeEach
+ fun setUp() {
+ val path = Paths.get("src/test/resources/shell")
+
+ columns = format.getDetails(path, TABLE_TASKS).columns
+ reader = format.newReader(path, TABLE_TASKS, null)
+ }
+ }
}
diff --git a/opendc-trace/opendc-trace-wtf/src/test/resources/shell/tasks/schema-1.0/part.0.parquet b/opendc-trace/opendc-trace-wtf/src/test/resources/shell/tasks/schema-1.0/part.0.parquet
new file mode 100755
index 00000000..31256990
--- /dev/null
+++ b/opendc-trace/opendc-trace-wtf/src/test/resources/shell/tasks/schema-1.0/part.0.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-wtf/src/test/resources/shell/workflows/schema-1.0/part.0.parquet b/opendc-trace/opendc-trace-wtf/src/test/resources/shell/workflows/schema-1.0/part.0.parquet
new file mode 100755
index 00000000..872469d5
--- /dev/null
+++ b/opendc-trace/opendc-trace-wtf/src/test/resources/shell/workflows/schema-1.0/part.0.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-wtf/src/test/resources/shell/workload/schema-1.0/generic_information.json b/opendc-trace/opendc-trace-wtf/src/test/resources/shell/workload/schema-1.0/generic_information.json
new file mode 100755
index 00000000..5949ab59
--- /dev/null
+++ b/opendc-trace/opendc-trace-wtf/src/test/resources/shell/workload/schema-1.0/generic_information.json
@@ -0,0 +1 @@
+{"total_workflows": 3403, "total_tasks": 10208, "domain": "Industrial", "date_start": null, "date_end": null, "num_sites": 3403, "num_resources": 10208.0, "num_users": 1, "num_groups": 1, "total_resource_seconds": 89229.863, "authors": ["Shenjun Ma", "Alexey Ilyushkin", "Alexander Stegehuis", "Alexandru Iosup"], "min_resource_task": 1.0, "max_resource_task": 1.0, "std_resource_task": 0.0, "mean_resource_task": 1.0, "median_resource_task": 1.0, "first_quartile_resource_task": 1.0, "third_quartile_resource_task": 1.0, "cov_resource_task": 0.0, "min_memory": -1, "max_memory": -1, "std_memory": 0.0, "mean_memory": -1.0, "median_memory": -1, "first_quartile_memory": -1, "third_quartile_memory": -1, "cov_memory": -0.0, "min_network_usage": -1, "max_network_usage": -1, "std_network_usage": 0.0, "mean_network_usage": -1.0, "median_network_usage": -1, "first_quartile_network_usage": -1, "third_quartile_network_usage": -1, "cov_network_usage": -0.0, "min_disk_space_usage": -1, "max_disk_space_usage": -1, "std_disk_space_usage": 0.0, "mean_disk_space_usage": -1.0, "median_disk_space_usage": -1, "first_quartile_disk_space_usage": -1, "third_quartile_disk_space_usage": -1, "cov_disk_space_usage": -0.0, "min_energy": -1, "max_energy": -1, "std_energy": 0.0, "mean_energy": -1.0, "median_energy": -1, "first_quartile_energy": -1, "third_quartile_energy": -1, "cov_energy": -0.0, "workload_description": "Chronos is a trace from Shell's Chronos IoT production system. It contains pipelines where sensor data is obtained, checked if values are within range (e.g. temperature, operational status, etc.), and the outcomes are written to persistent storage."} \ No newline at end of file