summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-parquet
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-01 21:16:43 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-01 21:19:54 +0200
commitee057033b4c534fdd3e8a9d2320d75035d30f27a (patch)
treed266630cdfe6dcf515424b14ed7153e1547ce14f /opendc-trace/opendc-trace-parquet
parent5c1b52bc771cddafed26da3c26612aeb115a3c0e (diff)
refactor(trace/parquet): Support custom ReadSupport implementations
This change updates the `LocalParquetReader` implementation to support custom `ReadSupport` implementations, so we do not have to rely on the Avro implementation necessarily.
Diffstat (limited to 'opendc-trace/opendc-trace-parquet')
-rw-r--r--opendc-trace/opendc-trace-parquet/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt (renamed from opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt)4
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt53
3 files changed, 42 insertions, 17 deletions
diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts
index 302c0b14..e6415586 100644
--- a/opendc-trace/opendc-trace-parquet/build.gradle.kts
+++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts
@@ -32,7 +32,7 @@ dependencies {
api(libs.parquet) {
exclude(group = "org.apache.hadoop")
}
- runtimeOnly(libs.hadoop.common) {
+ api(libs.hadoop.common) {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
exclude(group = "log4j")
exclude(group = "org.apache.hadoop")
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt
index 086b900b..a655d39f 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -21,7 +21,7 @@
*/
@file:JvmName("AvroUtils")
-package org.opendc.trace.util.parquet
+package org.opendc.trace.util.avro
import org.apache.avro.LogicalTypes
import org.apache.avro.Schema
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
index ef9eaeb3..bb2bb10d 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
@@ -24,6 +24,7 @@ package org.opendc.trace.util.parquet
import org.apache.parquet.avro.AvroParquetReader
import org.apache.parquet.hadoop.ParquetReader
+import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.io.InputFile
import java.io.File
import java.io.IOException
@@ -32,11 +33,15 @@ import java.nio.file.Path
import kotlin.io.path.isDirectory
/**
- * A helper class to read Parquet files.
+ * A helper class to read Parquet files from the filesystem.
+ *
+ * This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets.
*
* @param path The path to the Parquet file or directory to read.
+ * @param factory Function to construct a [ParquetReader] for a local [InputFile].
*/
-public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
+public class LocalParquetReader<out T>(path: Path,
+ private val factory: (InputFile) -> ParquetReader<T> = avro()) : AutoCloseable {
/**
* The input files to process.
*/
@@ -93,20 +98,40 @@ public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
private fun initReader() {
reader?.close()
- this.reader = if (filesIterator.hasNext()) {
- createReader(filesIterator.next())
- } else {
- null
+ try {
+ this.reader = if (filesIterator.hasNext()) {
+ factory(filesIterator.next())
+ } else {
+ null
+ }
+ } catch (e: Throwable) {
+ this.reader = null
+ throw e
}
}
- /**
- * Create a Parquet reader for the specified file.
- */
- private fun createReader(input: InputFile): ParquetReader<T> {
- return AvroParquetReader
- .builder<T>(input)
- .disableCompatibility()
- .build()
+ public companion object {
+ /**
+ * A factory for reading Avro Parquet files.
+ */
+ public fun <T> avro(): (InputFile) -> ParquetReader<T> {
+ return { input ->
+ AvroParquetReader
+ .builder<T>(input)
+ .disableCompatibility()
+ .build()
+ }
+ }
+
+ /**
+ * A factory for reading Parquet files with custom [ReadSupport].
+ */
+ public fun <T> custom(readSupport: ReadSupport<T>): (InputFile) -> ParquetReader<T> {
+ return { input ->
+ object : ParquetReader.Builder<T>(input) {
+ override fun getReadSupport(): ReadSupport<T> = readSupport
+ }.build()
+ }
+ }
}
}