summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-parquet/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-trace/opendc-trace-parquet/src/main')
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt36
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt (renamed from opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt)41
2 files changed, 49 insertions, 28 deletions
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
index ef9eaeb3..eef83956 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
@@ -22,8 +22,8 @@
package org.opendc.trace.util.parquet
-import org.apache.parquet.avro.AvroParquetReader
import org.apache.parquet.hadoop.ParquetReader
+import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.io.InputFile
import java.io.File
import java.io.IOException
@@ -32,11 +32,17 @@ import java.nio.file.Path
import kotlin.io.path.isDirectory
/**
- * A helper class to read Parquet files.
+ * A helper class to read Parquet files from the filesystem.
+ *
+ * This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets.
*
* @param path The path to the Parquet file or directory to read.
+ * @param readSupport Helper class to perform conversion from Parquet to [T].
*/
-public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
+public class LocalParquetReader<out T>(
+ path: Path,
+ private val readSupport: ReadSupport<T>
+) : AutoCloseable {
/**
* The input files to process.
*/
@@ -57,7 +63,7 @@ public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
/**
* Construct a [LocalParquetReader] for the specified [file].
*/
- public constructor(file: File) : this(file.toPath())
+ public constructor(file: File, readSupport: ReadSupport<T>) : this(file.toPath(), readSupport)
/**
* Read a single entry in the Parquet file.
@@ -93,20 +99,24 @@ public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
private fun initReader() {
reader?.close()
- this.reader = if (filesIterator.hasNext()) {
- createReader(filesIterator.next())
- } else {
- null
+ try {
+ this.reader = if (filesIterator.hasNext()) {
+ createReader(filesIterator.next())
+ } else {
+ null
+ }
+ } catch (e: Throwable) {
+ this.reader = null
+ throw e
}
}
/**
- * Create a Parquet reader for the specified file.
+ * Construct a [ParquetReader] for the specified [input] with a custom [ReadSupport].
*/
private fun createReader(input: InputFile): ParquetReader<T> {
- return AvroParquetReader
- .builder<T>(input)
- .disableCompatibility()
- .build()
+ return object : ParquetReader.Builder<T>(input) {
+ override fun getReadSupport(): ReadSupport<@UnsafeVariance T> = this@LocalParquetReader.readSupport
+ }.build()
}
}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt
index 086b900b..b5eb1deb 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,25 +20,36 @@
* SOFTWARE.
*/
-@file:JvmName("AvroUtils")
package org.opendc.trace.util.parquet
-import org.apache.avro.LogicalTypes
-import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.OutputFile
+import java.nio.file.Path
/**
- * Schema for UUID type.
+ * Helper class for writing Parquet records to local disk.
*/
-public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING))
+public class LocalParquetWriter {
+ /**
+ * A [ParquetWriter.Builder] implementation supporting custom [OutputFile]s and [WriteSupport] implementations.
+ */
+ public class Builder<T> internal constructor(
+ output: OutputFile,
+ private val writeSupport: WriteSupport<T>
+ ) : ParquetWriter.Builder<T, Builder<T>>(output) {
+ override fun self(): Builder<T> = this
-/**
- * Schema for timestamp type.
- */
-public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))
+ override fun getWriteSupport(conf: Configuration): WriteSupport<T> = writeSupport
+ }
-/**
- * Helper function to make a [Schema] field optional.
- */
-public fun Schema.optional(): Schema {
- return Schema.createUnion(Schema.create(Schema.Type.NULL), this)
+ public companion object {
+ /**
+ * Create a [Builder] instance that writes a Parquet file at the specified [path].
+ */
+ @JvmStatic
+ public fun <T> builder(path: Path, writeSupport: WriteSupport<T>): Builder<T> =
+ Builder(LocalOutputFile(path), writeSupport)
+ }
}