diff options
7 files changed, 28 insertions, 3 deletions
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt index 031bad60..de8a56d0 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt @@ -38,10 +38,12 @@ import kotlin.io.path.isDirectory * * @param path The path to the Parquet file or directory to read. * @param readSupport Helper class to perform conversion from Parquet to [T]. + * @param strictTyping A flag to disable strict typing of primitive types. */ public class LocalParquetReader<out T>( path: Path, - private val readSupport: ReadSupport<T> + private val readSupport: ReadSupport<T>, + private val strictTyping: Boolean = true ) : AutoCloseable { /** * The input files to process. @@ -119,6 +121,8 @@ public class LocalParquetReader<out T>( private fun createReader(input: InputFile): ParquetReader<T> { return object : ParquetReader.Builder<T>(input) { override fun getReadSupport(): ReadSupport<@UnsafeVariance T> = this@LocalParquetReader.readSupport - }.build() + } + .set("parquet.strict.typing", strictTyping.toString()) + .build() } } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index bf834778..c25b512c 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -78,7 +78,7 @@ public class WtfTraceFormat : TraceFormat { override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { return when (table) { TABLE_TASKS -> { - val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection)) + val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection), strictTyping = false) WtfTaskTableReader(reader) } else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt index f188a3ff..055be0c3 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt @@ -30,6 +30,7 @@ import org.apache.parquet.schema.MessageType import java.time.Duration import java.time.Instant import kotlin.math.roundToInt +import kotlin.math.roundToLong /** * A [RecordMaterializer] for [Task] records. @@ -145,6 +146,10 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer< override fun addLong(value: Long) { relations.add(value.toString()) } + + override fun addDouble(value: Double) { + relations.add(value.roundToLong().toString()) + } } private val listConverter = object : GroupConverter() { diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index 9cd9538f..0457098c 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -123,4 +123,19 @@ class WtfTraceFormatTest { reader = format.newReader(path, TABLE_TASKS, null) } } + + @DisplayName("TableReader for Tasks (Shell trace)") + @Nested + inner class ShellTasksTableReaderTest : TableReaderTestKit() { + override lateinit var reader: TableReader + override lateinit var columns: List<TableColumn> + + @BeforeEach + fun setUp() { + val path = Paths.get("src/test/resources/shell") + + columns = format.getDetails(path, TABLE_TASKS).columns + reader = format.newReader(path, TABLE_TASKS, null) + } + } } diff --git a/opendc-trace/opendc-trace-wtf/src/test/resources/shell/tasks/schema-1.0/part.0.parquet b/opendc-trace/opendc-trace-wtf/src/test/resources/shell/tasks/schema-1.0/part.0.parquet Binary files differnew file mode 100755 index 00000000..31256990 --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/test/resources/shell/tasks/schema-1.0/part.0.parquet diff --git a/opendc-trace/opendc-trace-wtf/src/test/resources/shell/workflows/schema-1.0/part.0.parquet b/opendc-trace/opendc-trace-wtf/src/test/resources/shell/workflows/schema-1.0/part.0.parquet Binary files differnew file mode 100755 index 00000000..872469d5 --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/test/resources/shell/workflows/schema-1.0/part.0.parquet diff --git a/opendc-trace/opendc-trace-wtf/src/test/resources/shell/workload/schema-1.0/generic_information.json b/opendc-trace/opendc-trace-wtf/src/test/resources/shell/workload/schema-1.0/generic_information.json new file mode 100755 index 00000000..5949ab59 --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/test/resources/shell/workload/schema-1.0/generic_information.json @@ -0,0 +1 @@ +{"total_workflows": 3403, "total_tasks": 10208, "domain": "Industrial", "date_start": null, "date_end": null, "num_sites": 3403, "num_resources": 10208.0, "num_users": 1, "num_groups": 1, "total_resource_seconds": 89229.863, "authors": ["Shenjun Ma", "Alexey Ilyushkin", "Alexander Stegehuis", "Alexandru Iosup"], "min_resource_task": 1.0, "max_resource_task": 1.0, "std_resource_task": 0.0, "mean_resource_task": 1.0, "median_resource_task": 1.0, "first_quartile_resource_task": 1.0, "third_quartile_resource_task": 1.0, "cov_resource_task": 0.0, "min_memory": -1, "max_memory": -1, "std_memory": 0.0, "mean_memory": -1.0, "median_memory": -1, "first_quartile_memory": -1, "third_quartile_memory": -1, "cov_memory": -0.0, "min_network_usage": -1, "max_network_usage": -1, "std_network_usage": 0.0, "mean_network_usage": -1.0, "median_network_usage": -1, "first_quartile_network_usage": -1, "third_quartile_network_usage": -1, "cov_network_usage": -0.0, "min_disk_space_usage": -1, "max_disk_space_usage": -1, "std_disk_space_usage": 0.0, "mean_disk_space_usage": -1.0, "median_disk_space_usage": -1, "first_quartile_disk_space_usage": -1, "third_quartile_disk_space_usage": -1, "cov_disk_space_usage": -0.0, "min_energy": -1, "max_energy": -1, "std_energy": 0.0, "mean_energy": -1.0, "median_energy": -1, "first_quartile_energy": -1, "third_quartile_energy": -1, "cov_energy": -0.0, "workload_description": "Chronos is a trace from Shell's Chronos IoT production system. It contains pipelines where sensor data is obtained, checked if values are within range (e.g. temperature, operational status, etc.), and the outcomes are written to persistent storage."}
\ No newline at end of file |
