summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-parquet/src/main/kotlin
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-02 11:44:48 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-02 15:37:03 +0200
commit9411845b3f26536a1e6ea40504e396f19d25a09a (patch)
treeeccf44c25f0074a40b42797c2a3147f213ddab86 /opendc-trace/opendc-trace-parquet/src/main/kotlin
parentae0b12987dca93c05e44341963511ac8cf802793 (diff)
refactor(trace/parquet): Drop dependency on Avro
This change updates the Parquet support library in OpenDC to not rely on Avro, but instead interface directly with Parquet's reading and writing functionality, providing less overhead.
Diffstat (limited to 'opendc-trace/opendc-trace-parquet/src/main/kotlin')
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt44
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt39
2 files changed, 11 insertions, 72 deletions
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt
deleted file mode 100644
index a655d39f..00000000
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-@file:JvmName("AvroUtils")
-package org.opendc.trace.util.avro
-
-import org.apache.avro.LogicalTypes
-import org.apache.avro.Schema
-
-/**
- * Schema for UUID type.
- */
-public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING))
-
-/**
- * Schema for timestamp type.
- */
-public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))
-
-/**
- * Helper function to make a [Schema] field optional.
- */
-public fun Schema.optional(): Schema {
- return Schema.createUnion(Schema.create(Schema.Type.NULL), this)
-}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
index 3e6f19a2..eef83956 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
@@ -22,7 +22,6 @@
package org.opendc.trace.util.parquet
-import org.apache.parquet.avro.AvroParquetReader
import org.apache.parquet.hadoop.ParquetReader
import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.io.InputFile
@@ -38,11 +37,11 @@ import kotlin.io.path.isDirectory
* This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets.
*
* @param path The path to the Parquet file or directory to read.
- * @param factory Function to construct a [ParquetReader] for a local [InputFile].
+ * @param readSupport Helper class to perform conversion from Parquet to [T].
*/
public class LocalParquetReader<out T>(
path: Path,
- private val factory: (InputFile) -> ParquetReader<T> = avro()
+ private val readSupport: ReadSupport<T>
) : AutoCloseable {
/**
* The input files to process.
@@ -64,7 +63,7 @@ public class LocalParquetReader<out T>(
/**
* Construct a [LocalParquetReader] for the specified [file].
*/
- public constructor(file: File) : this(file.toPath())
+ public constructor(file: File, readSupport: ReadSupport<T>) : this(file.toPath(), readSupport)
/**
* Read a single entry in the Parquet file.
@@ -102,7 +101,7 @@ public class LocalParquetReader<out T>(
try {
this.reader = if (filesIterator.hasNext()) {
- factory(filesIterator.next())
+ createReader(filesIterator.next())
} else {
null
}
@@ -112,28 +111,12 @@ public class LocalParquetReader<out T>(
}
}
- public companion object {
- /**
- * A factory for reading Avro Parquet files.
- */
- public fun <T> avro(): (InputFile) -> ParquetReader<T> {
- return { input ->
- AvroParquetReader
- .builder<T>(input)
- .disableCompatibility()
- .build()
- }
- }
-
- /**
- * A factory for reading Parquet files with custom [ReadSupport].
- */
- public fun <T> custom(readSupport: ReadSupport<T>): (InputFile) -> ParquetReader<T> {
- return { input ->
- object : ParquetReader.Builder<T>(input) {
- override fun getReadSupport(): ReadSupport<T> = readSupport
- }.build()
- }
- }
+ /**
+ * Construct a [ParquetReader] for the specified [input] with a custom [ReadSupport].
+ */
+ private fun createReader(input: InputFile): ParquetReader<T> {
+ return object : ParquetReader.Builder<T>(input) {
+ override fun getReadSupport(): ReadSupport<@UnsafeVariance T> = this@LocalParquetReader.readSupport
+ }.build()
}
}