diff options
Diffstat (limited to 'opendc-trace/opendc-trace-parquet/src')
| -rw-r--r-- | opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt | 36 | ||||
| -rw-r--r-- | opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt (renamed from opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt) | 41 | ||||
| -rw-r--r-- | opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt | 118 |
3 files changed, 137 insertions, 58 deletions
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt index ef9eaeb3..eef83956 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt @@ -22,8 +22,8 @@ package org.opendc.trace.util.parquet -import org.apache.parquet.avro.AvroParquetReader import org.apache.parquet.hadoop.ParquetReader +import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.InputFile import java.io.File import java.io.IOException @@ -32,11 +32,17 @@ import java.nio.file.Path import kotlin.io.path.isDirectory /** - * A helper class to read Parquet files. + * A helper class to read Parquet files from the filesystem. + * + * This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets. * * @param path The path to the Parquet file or directory to read. + * @param readSupport Helper class to perform conversion from Parquet to [T]. */ -public class LocalParquetReader<out T>(path: Path) : AutoCloseable { +public class LocalParquetReader<out T>( + path: Path, + private val readSupport: ReadSupport<T> +) : AutoCloseable { /** * The input files to process. */ @@ -57,7 +63,7 @@ public class LocalParquetReader<out T>(path: Path) : AutoCloseable { /** * Construct a [LocalParquetReader] for the specified [file]. */ - public constructor(file: File) : this(file.toPath()) + public constructor(file: File, readSupport: ReadSupport<T>) : this(file.toPath(), readSupport) /** * Read a single entry in the Parquet file. @@ -93,20 +99,24 @@ public class LocalParquetReader<out T>(path: Path) : AutoCloseable { private fun initReader() { reader?.close() - this.reader = if (filesIterator.hasNext()) { - createReader(filesIterator.next()) - } else { - null + try { + this.reader = if (filesIterator.hasNext()) { + createReader(filesIterator.next()) + } else { + null + } + } catch (e: Throwable) { + this.reader = null + throw e } } /** - * Create a Parquet reader for the specified file. + * Construct a [ParquetReader] for the specified [input] with a custom [ReadSupport]. */ private fun createReader(input: InputFile): ParquetReader<T> { - return AvroParquetReader - .builder<T>(input) - .disableCompatibility() - .build() + return object : ParquetReader.Builder<T>(input) { + override fun getReadSupport(): ReadSupport<@UnsafeVariance T> = this@LocalParquetReader.readSupport + }.build() } } diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt index 086b900b..b5eb1deb 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,25 +20,36 @@ * SOFTWARE. */ -@file:JvmName("AvroUtils") package org.opendc.trace.util.parquet -import org.apache.avro.LogicalTypes -import org.apache.avro.Schema +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.OutputFile +import java.nio.file.Path /** - * Schema for UUID type. + * Helper class for writing Parquet records to local disk. */ -public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) +public class LocalParquetWriter { + /** + * A [ParquetWriter.Builder] implementation supporting custom [OutputFile]s and [WriteSupport] implementations. + */ + public class Builder<T> internal constructor( + output: OutputFile, + private val writeSupport: WriteSupport<T> + ) : ParquetWriter.Builder<T, Builder<T>>(output) { + override fun self(): Builder<T> = this -/** - * Schema for timestamp type. - */ -public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) + override fun getWriteSupport(conf: Configuration): WriteSupport<T> = writeSupport + } -/** - * Helper function to make a [Schema] field optional. - */ -public fun Schema.optional(): Schema { - return Schema.createUnion(Schema.create(Schema.Type.NULL), this) + public companion object { + /** + * Create a [Builder] instance that writes a Parquet file at the specified [path]. + */ + @JvmStatic + public fun <T> builder(path: Path, writeSupport: WriteSupport<T>): Builder<T> = + Builder(LocalOutputFile(path), writeSupport) + } } diff --git a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt index 8ef4d1fb..be354319 100644 --- a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt +++ b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt @@ -22,36 +22,81 @@ package org.opendc.trace.util.parquet -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.avro.AvroParquetWriter +import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.Converter +import org.apache.parquet.io.api.GroupConverter +import org.apache.parquet.io.api.PrimitiveConverter +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Type +import org.apache.parquet.schema.Types import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals -import java.io.File import java.nio.file.FileAlreadyExistsException +import java.nio.file.Files import java.nio.file.NoSuchFileException +import java.nio.file.Path /** * Test suite for the Parquet helper classes. */ internal class ParquetTest { - private val schema = SchemaBuilder - .record("test") - .namespace("org.opendc.format.util") - .fields() - .name("field").type().intType().noDefault() - .endRecord() + private lateinit var path: Path - private lateinit var file: File + private val schema = Types.buildMessage() + .addField( + Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED) + .named("field") + ) + .named("test") + private val writeSupport = object : WriteSupport<Int>() { + lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(schema, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: Int) { + val consumer = recordConsumer + + consumer.startMessage() + consumer.startField("field", 0) + consumer.addInteger(record) + consumer.endField("field", 0) + consumer.endMessage() + } + } + + private val readSupport = object : ReadSupport<Int>() { + override fun init( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType + ): ReadContext = ReadContext(fileSchema) + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType, + readContext: ReadContext + ): RecordMaterializer<Int> = TestRecordMaterializer() + } /** - * Setup the test + * Set up the test */ @BeforeEach fun setUp() { - file = File.createTempFile("opendc", "parquet") + path = Files.createTempFile("opendc", "parquet") } /** @@ -59,7 +104,7 @@ internal class ParquetTest { */ @AfterEach fun tearDown() { - file.delete() + Files.deleteIfExists(path) } /** @@ -68,29 +113,24 @@ internal class ParquetTest { @Test fun testSmoke() { val n = 4 - val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(file)) - .withSchema(schema) + val writer = LocalParquetWriter.builder(path, writeSupport) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build() try { repeat(n) { i -> - val record = GenericData.Record(schema) - record.put("field", i) - writer.write(record) + writer.write(i) } } finally { writer.close() } - val reader = AvroParquetReader.builder<GenericData.Record>(LocalInputFile(file)) - .build() - + val reader = LocalParquetReader(path, readSupport) var counter = 0 try { while (true) { val record = reader.read() ?: break - assertEquals(counter++, record.get("field")) + assertEquals(counter++, record) } } finally { reader.close() @@ -105,9 +145,7 @@ internal class ParquetTest { @Test fun testOverwrite() { assertThrows<FileAlreadyExistsException> { - AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(file)) - .withSchema(schema) - .build() + LocalParquetWriter.builder(path, writeSupport).build() } } @@ -116,10 +154,30 @@ internal class ParquetTest { */ @Test fun testNonExistent() { - file.delete() + Files.deleteIfExists(path) assertThrows<NoSuchFileException> { - AvroParquetReader.builder<GenericData.Record>(LocalInputFile(file)) - .build() + LocalParquetReader(path, readSupport) + } + } + + private class TestRecordMaterializer : RecordMaterializer<Int>() { + private var current: Int = 0 + private val fieldConverter = object : PrimitiveConverter() { + override fun addInt(value: Int) { + current = value + } + } + private val root = object : GroupConverter() { + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return fieldConverter + } + override fun start() {} + override fun end() {} } + + override fun getCurrentRecord(): Int = current + + override fun getRootConverter(): GroupConverter = root } } |
