diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-01 21:16:43 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-01 21:19:54 +0200 |
| commit | ee057033b4c534fdd3e8a9d2320d75035d30f27a (patch) | |
| tree | d266630cdfe6dcf515424b14ed7153e1547ce14f /opendc-trace/opendc-trace-parquet | |
| parent | 5c1b52bc771cddafed26da3c26612aeb115a3c0e (diff) | |
refactor(trace/parquet): Support custom ReadSupport implementations
This change updates the `LocalParquetReader` implementation to support
custom `ReadSupport` implementations, so we do not have to rely on the
Avro implementation necessarily.
Diffstat (limited to 'opendc-trace/opendc-trace-parquet')
| -rw-r--r-- | opendc-trace/opendc-trace-parquet/build.gradle.kts | 2 | ||||
| -rw-r--r-- | opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt (renamed from opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt) | 4 | ||||
| -rw-r--r-- | opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt | 53 |
3 files changed, 42 insertions, 17 deletions
diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts index 302c0b14..e6415586 100644 --- a/opendc-trace/opendc-trace-parquet/build.gradle.kts +++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts @@ -32,7 +32,7 @@ dependencies { api(libs.parquet) { exclude(group = "org.apache.hadoop") } - runtimeOnly(libs.hadoop.common) { + api(libs.hadoop.common) { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") exclude(group = "org.apache.hadoop") diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt index 086b900b..a655d39f 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -21,7 +21,7 @@ */ @file:JvmName("AvroUtils") -package org.opendc.trace.util.parquet +package org.opendc.trace.util.avro import org.apache.avro.LogicalTypes import org.apache.avro.Schema diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt index ef9eaeb3..bb2bb10d 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt @@ -24,6 +24,7 @@ package org.opendc.trace.util.parquet import org.apache.parquet.avro.AvroParquetReader import org.apache.parquet.hadoop.ParquetReader +import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.InputFile import java.io.File import java.io.IOException @@ -32,11 +33,15 @@ import java.nio.file.Path import kotlin.io.path.isDirectory /** - * A helper class to read Parquet files. + * A helper class to read Parquet files from the filesystem. + * + * This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets. * * @param path The path to the Parquet file or directory to read. + * @param factory Function to construct a [ParquetReader] for a local [InputFile]. */ -public class LocalParquetReader<out T>(path: Path) : AutoCloseable { +public class LocalParquetReader<out T>(path: Path, + private val factory: (InputFile) -> ParquetReader<T> = avro()) : AutoCloseable { /** * The input files to process. */ @@ -93,20 +98,40 @@ public class LocalParquetReader<out T>(path: Path) : AutoCloseable { private fun initReader() { reader?.close() - this.reader = if (filesIterator.hasNext()) { - createReader(filesIterator.next()) - } else { - null + try { + this.reader = if (filesIterator.hasNext()) { + factory(filesIterator.next()) + } else { + null + } + } catch (e: Throwable) { + this.reader = null + throw e } } - /** - * Create a Parquet reader for the specified file. - */ - private fun createReader(input: InputFile): ParquetReader<T> { - return AvroParquetReader - .builder<T>(input) - .disableCompatibility() - .build() + public companion object { + /** + * A factory for reading Avro Parquet files. + */ + public fun <T> avro(): (InputFile) -> ParquetReader<T> { + return { input -> + AvroParquetReader + .builder<T>(input) + .disableCompatibility() + .build() + } + } + + /** + * A factory for reading Parquet files with custom [ReadSupport]. + */ + public fun <T> custom(readSupport: ReadSupport<T>): (InputFile) -> ParquetReader<T> { + return { input -> + object : ParquetReader.Builder<T>(input) { + override fun getReadSupport(): ReadSupport<T> = readSupport + }.build() + } + } } } |
