diff options
Diffstat (limited to 'opendc-trace/opendc-trace-parquet')
6 files changed, 136 insertions, 116 deletions
diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts index 2217a017..4cdd4350 100644 --- a/opendc-trace/opendc-trace-parquet/build.gradle.kts +++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts @@ -22,13 +22,13 @@ description = "Parquet helpers for traces in OpenDC" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } dependencies { - /* This configuration is necessary for a slim dependency on Apache Parquet */ + // This configuration is necessary for a slim dependency on Apache Parquet api(libs.parquet) { exclude(group = "org.apache.hadoop") } diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt index fd2e00cd..a60b426a 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt @@ -47,61 +47,66 @@ public class LocalInputFile(private val path: Path) : InputFile { override fun getLength(): Long = channel.size() - override fun newStream(): SeekableInputStream = object : SeekableInputStream() { - override fun read(buf: ByteBuffer): Int { - return channel.read(buf) - } + override fun newStream(): SeekableInputStream = + object : SeekableInputStream() { + override fun read(buf: ByteBuffer): Int { + return channel.read(buf) + } - override fun read(): Int { - val single = ByteBuffer.allocate(1) - var read: Int + override fun read(): Int { + val single = ByteBuffer.allocate(1) + var read: Int - // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte - do { - read = channel.read(single) - } while (read == 0) + // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte + do { + read = channel.read(single) + } while (read == 0) - return if (read == -1) { - read - } else { - single.get(0).toInt() and 0xff + return if (read == -1) { + read + } else { + single.get(0).toInt() and 0xff + } } - } - override fun getPos(): Long { - return channel.position() - } + override fun getPos(): Long { + return channel.position() + } - override fun seek(newPos: Long) { - channel.position(newPos) - } + override fun seek(newPos: Long) { + channel.position(newPos) + } - override fun readFully(bytes: ByteArray) { - readFully(ByteBuffer.wrap(bytes)) - } + override fun readFully(bytes: ByteArray) { + readFully(ByteBuffer.wrap(bytes)) + } - override fun readFully(bytes: ByteArray, start: Int, len: Int) { - readFully(ByteBuffer.wrap(bytes, start, len)) - } + override fun readFully( + bytes: ByteArray, + start: Int, + len: Int, + ) { + readFully(ByteBuffer.wrap(bytes, start, len)) + } - override fun readFully(buf: ByteBuffer) { - var remainder = buf.remaining() - while (remainder > 0) { - val read = channel.read(buf) - remainder -= read + override fun readFully(buf: ByteBuffer) { + var remainder = buf.remaining() + while (remainder > 0) { + val read = channel.read(buf) + remainder -= read - if (read == -1 && remainder > 0) { - throw EOFException() + if (read == -1 && remainder > 0) { + throw EOFException() + } } } - } - override fun close() { - channel.close() - } + override fun close() { + channel.close() + } - override fun toString(): String = "NioSeekableInputStream" - } + override fun toString(): String = "NioSeekableInputStream" + } override fun toString(): String = "LocalInputFile[path=$path]" } diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt index 1b17ae5d..24627b45 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt @@ -51,8 +51,7 @@ public class LocalOutputFile(private val path: Path) : OutputFile { override fun supportsBlockSize(): Boolean = false - override fun defaultBlockSize(): Long = - throw UnsupportedOperationException("Local filesystem does not have default block size") + override fun defaultBlockSize(): Long = throw UnsupportedOperationException("Local filesystem does not have default block size") override fun getPath(): String = path.toString() @@ -77,7 +76,11 @@ public class LocalOutputFile(private val path: Path) : OutputFile { _pos += b.size } - override fun write(b: ByteArray, off: Int, len: Int) { + override fun write( + b: ByteArray, + off: Int, + len: Int, + ) { output.write(b, off, len) _pos += len } diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt index de8a56d0..b503254e 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt @@ -43,20 +43,21 @@ import kotlin.io.path.isDirectory public class LocalParquetReader<out T>( path: Path, private val readSupport: ReadSupport<T>, - private val strictTyping: Boolean = true + private val strictTyping: Boolean = true, ) : AutoCloseable { /** * The input files to process. */ - private val filesIterator = if (path.isDirectory()) { - Files.list(path) - .filter { !it.isDirectory() } - .sorted() - .map { LocalInputFile(it) } - .iterator() - } else { - listOf(LocalInputFile(path)).iterator() - } + private val filesIterator = + if (path.isDirectory()) { + Files.list(path) + .filter { !it.isDirectory() } + .sorted() + .map { LocalInputFile(it) } + .iterator() + } else { + listOf(LocalInputFile(path)).iterator() + } /** * The Parquet reader to use. @@ -104,11 +105,12 @@ public class LocalParquetReader<out T>( reader?.close() try { - this.reader = if (filesIterator.hasNext()) { - createReader(filesIterator.next()) - } else { - null - } + this.reader = + if (filesIterator.hasNext()) { + createReader(filesIterator.next()) + } else { + null + } } catch (e: Throwable) { this.reader = null throw e diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt index b5eb1deb..c7028fc3 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt @@ -37,7 +37,7 @@ public class LocalParquetWriter { */ public class Builder<T> internal constructor( output: OutputFile, - private val writeSupport: WriteSupport<T> + private val writeSupport: WriteSupport<T>, ) : ParquetWriter.Builder<T, Builder<T>>(output) { override fun self(): Builder<T> = this @@ -49,7 +49,9 @@ public class LocalParquetWriter { * Create a [Builder] instance that writes a Parquet file at the specified [path]. */ @JvmStatic - public fun <T> builder(path: Path, writeSupport: WriteSupport<T>): Builder<T> = - Builder(LocalOutputFile(path), writeSupport) + public fun <T> builder( + path: Path, + writeSupport: WriteSupport<T>, + ): Builder<T> = Builder(LocalOutputFile(path), writeSupport) } } diff --git a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt index b6c5a423..fc90aded 100644 --- a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt +++ b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt @@ -51,49 +51,52 @@ import java.nio.file.Path internal class ParquetTest { private lateinit var path: Path - private val schema = Types.buildMessage() - .addField( - Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED) - .named("field") - ) - .named("test") - private val writeSupport = object : WriteSupport<Int>() { - lateinit var recordConsumer: RecordConsumer - - override fun init(configuration: Configuration): WriteContext { - return WriteContext(schema, emptyMap()) - } + private val schema = + Types.buildMessage() + .addField( + Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED) + .named("field"), + ) + .named("test") + private val writeSupport = + object : WriteSupport<Int>() { + lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(schema, emptyMap()) + } - override fun prepareForWrite(recordConsumer: RecordConsumer) { - this.recordConsumer = recordConsumer - } + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } - override fun write(record: Int) { - val consumer = recordConsumer + override fun write(record: Int) { + val consumer = recordConsumer - consumer.startMessage() - consumer.startField("field", 0) - consumer.addInteger(record) - consumer.endField("field", 0) - consumer.endMessage() + consumer.startMessage() + consumer.startField("field", 0) + consumer.addInteger(record) + consumer.endField("field", 0) + consumer.endMessage() + } } - } - private val readSupport = object : ReadSupport<Int>() { - @Suppress("OVERRIDE_DEPRECATION") - override fun init( - configuration: Configuration, - keyValueMetaData: Map<String, String>, - fileSchema: MessageType - ): ReadContext = ReadContext(fileSchema) - - override fun prepareForRead( - configuration: Configuration, - keyValueMetaData: Map<String, String>, - fileSchema: MessageType, - readContext: ReadContext - ): RecordMaterializer<Int> = TestRecordMaterializer() - } + private val readSupport = + object : ReadSupport<Int>() { + @Suppress("OVERRIDE_DEPRECATION") + override fun init( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType, + ): ReadContext = ReadContext(fileSchema) + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType, + readContext: ReadContext, + ): RecordMaterializer<Int> = TestRecordMaterializer() + } /** * Set up the test @@ -117,9 +120,10 @@ internal class ParquetTest { @Test fun testSmoke() { val n = 4 - val writer = LocalParquetWriter.builder(path, writeSupport) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build() + val writer = + LocalParquetWriter.builder(path, writeSupport) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() try { repeat(n) { i -> @@ -166,19 +170,23 @@ internal class ParquetTest { private class TestRecordMaterializer : RecordMaterializer<Int>() { private var current: Int = 0 - private val fieldConverter = object : PrimitiveConverter() { - override fun addInt(value: Int) { - current = value + private val fieldConverter = + object : PrimitiveConverter() { + override fun addInt(value: Int) { + current = value + } } - } - private val root = object : GroupConverter() { - override fun getConverter(fieldIndex: Int): Converter { - require(fieldIndex == 0) - return fieldConverter + private val root = + object : GroupConverter() { + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return fieldConverter + } + + override fun start() {} + + override fun end() {} } - override fun start() {} - override fun end() {} - } override fun getCurrentRecord(): Int = current |
