From 9097811e0ac6872de3e4ff5f521d8859870b1000 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 8 Jun 2021 23:42:48 +0200 Subject: format: Add implementation of local Parquet InputFile This change adds an implementation of Parquet's local InputFile in order to eliminate the dependency on the entire Hadoop system. This implementation allows users to read Parquet files locally without needing a Parquet filesystem implementation. --- opendc-format/build.gradle.kts | 2 +- .../org/opendc/format/util/LocalInputFile.kt | 101 +++++++++++++++++++ .../org/opendc/format/util/LocalParquetReader.kt | 112 +++++++++++++++++++++ 3 files changed, 214 insertions(+), 1 deletion(-) create mode 100644 opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt create mode 100644 opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt diff --git a/opendc-format/build.gradle.kts b/opendc-format/build.gradle.kts index 6c87dd99..c0ffeb3e 100644 --- a/opendc-format/build.gradle.kts +++ b/opendc-format/build.gradle.kts @@ -44,6 +44,6 @@ dependencies { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") } - + testRuntimeOnly(libs.slf4j.simple) } diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt new file mode 100644 index 00000000..d8c25a62 --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.util + +import org.apache.parquet.io.InputFile +import org.apache.parquet.io.SeekableInputStream +import java.io.EOFException +import java.nio.ByteBuffer +import java.nio.channels.FileChannel +import java.nio.file.Path +import java.nio.file.StandardOpenOption + +/** + * An [InputFile] on the local filesystem. + */ +public class LocalInputFile(private val path: Path) : InputFile { + /** + * The [FileChannel] used for accessing the input path. + */ + private val channel = FileChannel.open(path, StandardOpenOption.READ) + + override fun getLength(): Long = channel.size() + + override fun newStream(): SeekableInputStream = object : SeekableInputStream() { + override fun read(buf: ByteBuffer): Int { + return channel.read(buf) + } + + override fun read(): Int { + val single = ByteBuffer.allocate(1) + var read: Int + + // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte + do { + read = channel.read(single) + } while (read == 0) + + return if (read == -1) { + read + } else { + single.get(0).toInt() and 0xff + } + } + + override fun getPos(): Long { + return channel.position() + } + + override fun seek(newPos: Long) { + channel.position(newPos) + } + + override fun readFully(bytes: ByteArray) { + readFully(ByteBuffer.wrap(bytes)) + } + + override fun readFully(bytes: ByteArray, start: Int, len: Int) { + readFully(ByteBuffer.wrap(bytes, start, len)) + } + + override fun readFully(buf: ByteBuffer) { + var remainder = buf.remaining() + while (remainder > 0) { + val read = channel.read(buf) + remainder -= read + + if (read == -1 && remainder > 0) { + throw EOFException() + } + } + } + + override fun close() { + channel.close() + } + + override fun toString(): String = "NioSeekableInputStream" + } + + override fun toString(): String = "LocalInputFile[path=$path]" +} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt new file mode 100644 index 00000000..5083f3e1 --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.util + +import org.apache.parquet.avro.AvroParquetReader +import org.apache.parquet.hadoop.ParquetReader +import org.apache.parquet.io.InputFile +import java.io.File +import java.io.IOException +import java.nio.file.Files +import java.nio.file.Path +import kotlin.io.path.isDirectory + +/** + * A helper class to read Parquet files. + * + * @param path The path to the Parquet file or directory to read. + */ +public class LocalParquetReader(path: Path) : AutoCloseable { + /** + * The input files to process. + */ + private val filesIterator = if (path.isDirectory()) + Files.list(path) + .filter { !it.isDirectory() } + .sorted() + .map { LocalInputFile(it) } + .iterator() + else + listOf(LocalInputFile(path)).iterator() + + /** + * The Parquet reader to use. + */ + private var reader: ParquetReader? = null + + /** + * Construct a [LocalParquetReader] for the specified [file]. + */ + public constructor(file: File) : this(file.toPath()) + + /** + * Read a single entry in the Parquet file. + */ + public fun read(): T? { + return try { + val next = reader?.read() + if (next != null) { + next + } else { + initReader() + + if (reader == null) + null + else + read() + } + } catch (e: InterruptedException) { + throw IOException(e) + } + } + + /** + * Close the Parquet reader. + */ + override fun close() { + reader?.close() + } + + /** + * Initialize the next reader. + */ + private fun initReader() { + reader?.close() + + this.reader = if (filesIterator.hasNext()) { + createReader(filesIterator.next()) + } else { + null + } + } + + /** + * Create a Parquet reader for the specified file. + */ + private fun createReader(input: InputFile): ParquetReader { + return AvroParquetReader + .builder(input) + .disableCompatibility() + .build() + } +} -- cgit v1.2.3