diff options
Diffstat (limited to 'opendc-trace/opendc-trace-parquet/src/main')
| -rw-r--r-- | opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt | 36 | ||||
| -rw-r--r-- | opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt (renamed from opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt) | 41 |
2 files changed, 49 insertions, 28 deletions
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt index ef9eaeb3..eef83956 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt @@ -22,8 +22,8 @@ package org.opendc.trace.util.parquet -import org.apache.parquet.avro.AvroParquetReader import org.apache.parquet.hadoop.ParquetReader +import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.InputFile import java.io.File import java.io.IOException @@ -32,11 +32,17 @@ import java.nio.file.Path import kotlin.io.path.isDirectory /** - * A helper class to read Parquet files. + * A helper class to read Parquet files from the filesystem. + * + * This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets. * * @param path The path to the Parquet file or directory to read. + * @param readSupport Helper class to perform conversion from Parquet to [T]. */ -public class LocalParquetReader<out T>(path: Path) : AutoCloseable { +public class LocalParquetReader<out T>( + path: Path, + private val readSupport: ReadSupport<T> +) : AutoCloseable { /** * The input files to process. */ @@ -57,7 +63,7 @@ public class LocalParquetReader<out T>(path: Path) : AutoCloseable { /** * Construct a [LocalParquetReader] for the specified [file]. */ - public constructor(file: File) : this(file.toPath()) + public constructor(file: File, readSupport: ReadSupport<T>) : this(file.toPath(), readSupport) /** * Read a single entry in the Parquet file. @@ -93,20 +99,24 @@ public class LocalParquetReader<out T>(path: Path) : AutoCloseable { private fun initReader() { reader?.close() - this.reader = if (filesIterator.hasNext()) { - createReader(filesIterator.next()) - } else { - null + try { + this.reader = if (filesIterator.hasNext()) { + createReader(filesIterator.next()) + } else { + null + } + } catch (e: Throwable) { + this.reader = null + throw e } } /** - * Create a Parquet reader for the specified file. + * Construct a [ParquetReader] for the specified [input] with a custom [ReadSupport]. */ private fun createReader(input: InputFile): ParquetReader<T> { - return AvroParquetReader - .builder<T>(input) - .disableCompatibility() - .build() + return object : ParquetReader.Builder<T>(input) { + override fun getReadSupport(): ReadSupport<@UnsafeVariance T> = this@LocalParquetReader.readSupport + }.build() } } diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt index 086b900b..b5eb1deb 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,25 +20,36 @@ * SOFTWARE. */ -@file:JvmName("AvroUtils") package org.opendc.trace.util.parquet -import org.apache.avro.LogicalTypes -import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.OutputFile +import java.nio.file.Path /** - * Schema for UUID type. + * Helper class for writing Parquet records to local disk. */ -public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) +public class LocalParquetWriter { + /** + * A [ParquetWriter.Builder] implementation supporting custom [OutputFile]s and [WriteSupport] implementations. + */ + public class Builder<T> internal constructor( + output: OutputFile, + private val writeSupport: WriteSupport<T> + ) : ParquetWriter.Builder<T, Builder<T>>(output) { + override fun self(): Builder<T> = this -/** - * Schema for timestamp type. - */ -public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) + override fun getWriteSupport(conf: Configuration): WriteSupport<T> = writeSupport + } -/** - * Helper function to make a [Schema] field optional. - */ -public fun Schema.optional(): Schema { - return Schema.createUnion(Schema.create(Schema.Type.NULL), this) + public companion object { + /** + * Create a [Builder] instance that writes a Parquet file at the specified [path]. + */ + @JvmStatic + public fun <T> builder(path: Path, writeSupport: WriteSupport<T>): Builder<T> = + Builder(LocalOutputFile(path), writeSupport) + } } |
