summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-format/build.gradle.kts2
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt101
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt112
3 files changed, 214 insertions, 1 deletions
diff --git a/opendc-format/build.gradle.kts b/opendc-format/build.gradle.kts
index 6c87dd99..c0ffeb3e 100644
--- a/opendc-format/build.gradle.kts
+++ b/opendc-format/build.gradle.kts
@@ -44,6 +44,6 @@ dependencies {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
exclude(group = "log4j")
}
-
+
testRuntimeOnly(libs.slf4j.simple)
}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt
new file mode 100644
index 00000000..d8c25a62
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.util
+
+import org.apache.parquet.io.InputFile
+import org.apache.parquet.io.SeekableInputStream
+import java.io.EOFException
+import java.nio.ByteBuffer
+import java.nio.channels.FileChannel
+import java.nio.file.Path
+import java.nio.file.StandardOpenOption
+
+/**
+ * An [InputFile] on the local filesystem.
+ */
+public class LocalInputFile(private val path: Path) : InputFile {
+ /**
+ * The [FileChannel] used for accessing the input path.
+ */
+ private val channel = FileChannel.open(path, StandardOpenOption.READ)
+
+ override fun getLength(): Long = channel.size()
+
+ override fun newStream(): SeekableInputStream = object : SeekableInputStream() {
+ override fun read(buf: ByteBuffer): Int {
+ return channel.read(buf)
+ }
+
+ override fun read(): Int {
+ val single = ByteBuffer.allocate(1)
+ var read: Int
+
+ // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte
+ do {
+ read = channel.read(single)
+ } while (read == 0)
+
+ return if (read == -1) {
+ read
+ } else {
+ single.get(0).toInt() and 0xff
+ }
+ }
+
+ override fun getPos(): Long {
+ return channel.position()
+ }
+
+ override fun seek(newPos: Long) {
+ channel.position(newPos)
+ }
+
+ override fun readFully(bytes: ByteArray) {
+ readFully(ByteBuffer.wrap(bytes))
+ }
+
+ override fun readFully(bytes: ByteArray, start: Int, len: Int) {
+ readFully(ByteBuffer.wrap(bytes, start, len))
+ }
+
+ override fun readFully(buf: ByteBuffer) {
+ var remainder = buf.remaining()
+ while (remainder > 0) {
+ val read = channel.read(buf)
+ remainder -= read
+
+ if (read == -1 && remainder > 0) {
+ throw EOFException()
+ }
+ }
+ }
+
+ override fun close() {
+ channel.close()
+ }
+
+ override fun toString(): String = "NioSeekableInputStream"
+ }
+
+ override fun toString(): String = "LocalInputFile[path=$path]"
+}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt
new file mode 100644
index 00000000..5083f3e1
--- /dev/null
+++ b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt
@@ -0,0 +1,112 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.format.util
+
+import org.apache.parquet.avro.AvroParquetReader
+import org.apache.parquet.hadoop.ParquetReader
+import org.apache.parquet.io.InputFile
+import java.io.File
+import java.io.IOException
+import java.nio.file.Files
+import java.nio.file.Path
+import kotlin.io.path.isDirectory
+
+/**
+ * A helper class to read Parquet files.
+ *
+ * @param path The path to the Parquet file or directory to read.
+ */
+public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
+ /**
+ * The input files to process.
+ */
+ private val filesIterator = if (path.isDirectory())
+ Files.list(path)
+ .filter { !it.isDirectory() }
+ .sorted()
+ .map { LocalInputFile(it) }
+ .iterator()
+ else
+ listOf(LocalInputFile(path)).iterator()
+
+ /**
+ * The Parquet reader to use.
+ */
+ private var reader: ParquetReader<T>? = null
+
+ /**
+ * Construct a [LocalParquetReader] for the specified [file].
+ */
+ public constructor(file: File) : this(file.toPath())
+
+ /**
+ * Read a single entry in the Parquet file.
+ */
+ public fun read(): T? {
+ return try {
+ val next = reader?.read()
+ if (next != null) {
+ next
+ } else {
+ initReader()
+
+ if (reader == null)
+ null
+ else
+ read()
+ }
+ } catch (e: InterruptedException) {
+ throw IOException(e)
+ }
+ }
+
+ /**
+ * Close the Parquet reader.
+ */
+ override fun close() {
+ reader?.close()
+ }
+
+ /**
+ * Initialize the next reader.
+ */
+ private fun initReader() {
+ reader?.close()
+
+ this.reader = if (filesIterator.hasNext()) {
+ createReader(filesIterator.next())
+ } else {
+ null
+ }
+ }
+
+ /**
+ * Create a Parquet reader for the specified file.
+ */
+ private fun createReader(input: InputFile): ParquetReader<T> {
+ return AvroParquetReader
+ .builder<T>(input)
+ .disableCompatibility()
+ .build()
+ }
+}