summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-parquet
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2024-03-05 13:23:57 +0100
committerGitHub <noreply@github.com>2024-03-05 13:23:57 +0100
commit5864cbcbfe2eb8c36ca05c3a39c7e5916aeecaec (patch)
tree5b2773b8dc21c2e1b526fb70f829c376dd80532a /opendc-trace/opendc-trace-parquet
parentd28002a3c151d198298574312f32f1cb43f3a660 (diff)
Updated package versions, updated web server tests. (#207)
* Updated all package versions including kotlin. Updated all web-server tests to run. * Changed the java version of the tests. OpenDC now only supports java 19. * small update * test update * new update * updated docker version to 19 * updated docker version to 19
Diffstat (limited to 'opendc-trace/opendc-trace-parquet')
-rw-r--r--opendc-trace/opendc-trace-parquet/build.gradle.kts4
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt87
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt9
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt32
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt8
-rw-r--r--opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt112
6 files changed, 136 insertions, 116 deletions
diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts
index 2217a017..4cdd4350 100644
--- a/opendc-trace/opendc-trace-parquet/build.gradle.kts
+++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts
@@ -22,13 +22,13 @@
description = "Parquet helpers for traces in OpenDC"
-/* Build configuration */
+// Build configuration
plugins {
`kotlin-library-conventions`
}
dependencies {
- /* This configuration is necessary for a slim dependency on Apache Parquet */
+ // This configuration is necessary for a slim dependency on Apache Parquet
api(libs.parquet) {
exclude(group = "org.apache.hadoop")
}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt
index fd2e00cd..a60b426a 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt
@@ -47,61 +47,66 @@ public class LocalInputFile(private val path: Path) : InputFile {
override fun getLength(): Long = channel.size()
- override fun newStream(): SeekableInputStream = object : SeekableInputStream() {
- override fun read(buf: ByteBuffer): Int {
- return channel.read(buf)
- }
+ override fun newStream(): SeekableInputStream =
+ object : SeekableInputStream() {
+ override fun read(buf: ByteBuffer): Int {
+ return channel.read(buf)
+ }
- override fun read(): Int {
- val single = ByteBuffer.allocate(1)
- var read: Int
+ override fun read(): Int {
+ val single = ByteBuffer.allocate(1)
+ var read: Int
- // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte
- do {
- read = channel.read(single)
- } while (read == 0)
+ // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte
+ do {
+ read = channel.read(single)
+ } while (read == 0)
- return if (read == -1) {
- read
- } else {
- single.get(0).toInt() and 0xff
+ return if (read == -1) {
+ read
+ } else {
+ single.get(0).toInt() and 0xff
+ }
}
- }
- override fun getPos(): Long {
- return channel.position()
- }
+ override fun getPos(): Long {
+ return channel.position()
+ }
- override fun seek(newPos: Long) {
- channel.position(newPos)
- }
+ override fun seek(newPos: Long) {
+ channel.position(newPos)
+ }
- override fun readFully(bytes: ByteArray) {
- readFully(ByteBuffer.wrap(bytes))
- }
+ override fun readFully(bytes: ByteArray) {
+ readFully(ByteBuffer.wrap(bytes))
+ }
- override fun readFully(bytes: ByteArray, start: Int, len: Int) {
- readFully(ByteBuffer.wrap(bytes, start, len))
- }
+ override fun readFully(
+ bytes: ByteArray,
+ start: Int,
+ len: Int,
+ ) {
+ readFully(ByteBuffer.wrap(bytes, start, len))
+ }
- override fun readFully(buf: ByteBuffer) {
- var remainder = buf.remaining()
- while (remainder > 0) {
- val read = channel.read(buf)
- remainder -= read
+ override fun readFully(buf: ByteBuffer) {
+ var remainder = buf.remaining()
+ while (remainder > 0) {
+ val read = channel.read(buf)
+ remainder -= read
- if (read == -1 && remainder > 0) {
- throw EOFException()
+ if (read == -1 && remainder > 0) {
+ throw EOFException()
+ }
}
}
- }
- override fun close() {
- channel.close()
- }
+ override fun close() {
+ channel.close()
+ }
- override fun toString(): String = "NioSeekableInputStream"
- }
+ override fun toString(): String = "NioSeekableInputStream"
+ }
override fun toString(): String = "LocalInputFile[path=$path]"
}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt
index 1b17ae5d..24627b45 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt
@@ -51,8 +51,7 @@ public class LocalOutputFile(private val path: Path) : OutputFile {
override fun supportsBlockSize(): Boolean = false
- override fun defaultBlockSize(): Long =
- throw UnsupportedOperationException("Local filesystem does not have default block size")
+ override fun defaultBlockSize(): Long = throw UnsupportedOperationException("Local filesystem does not have default block size")
override fun getPath(): String = path.toString()
@@ -77,7 +76,11 @@ public class LocalOutputFile(private val path: Path) : OutputFile {
_pos += b.size
}
- override fun write(b: ByteArray, off: Int, len: Int) {
+ override fun write(
+ b: ByteArray,
+ off: Int,
+ len: Int,
+ ) {
output.write(b, off, len)
_pos += len
}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
index de8a56d0..b503254e 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
@@ -43,20 +43,21 @@ import kotlin.io.path.isDirectory
public class LocalParquetReader<out T>(
path: Path,
private val readSupport: ReadSupport<T>,
- private val strictTyping: Boolean = true
+ private val strictTyping: Boolean = true,
) : AutoCloseable {
/**
* The input files to process.
*/
- private val filesIterator = if (path.isDirectory()) {
- Files.list(path)
- .filter { !it.isDirectory() }
- .sorted()
- .map { LocalInputFile(it) }
- .iterator()
- } else {
- listOf(LocalInputFile(path)).iterator()
- }
+ private val filesIterator =
+ if (path.isDirectory()) {
+ Files.list(path)
+ .filter { !it.isDirectory() }
+ .sorted()
+ .map { LocalInputFile(it) }
+ .iterator()
+ } else {
+ listOf(LocalInputFile(path)).iterator()
+ }
/**
* The Parquet reader to use.
@@ -104,11 +105,12 @@ public class LocalParquetReader<out T>(
reader?.close()
try {
- this.reader = if (filesIterator.hasNext()) {
- createReader(filesIterator.next())
- } else {
- null
- }
+ this.reader =
+ if (filesIterator.hasNext()) {
+ createReader(filesIterator.next())
+ } else {
+ null
+ }
} catch (e: Throwable) {
this.reader = null
throw e
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt
index b5eb1deb..c7028fc3 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt
@@ -37,7 +37,7 @@ public class LocalParquetWriter {
*/
public class Builder<T> internal constructor(
output: OutputFile,
- private val writeSupport: WriteSupport<T>
+ private val writeSupport: WriteSupport<T>,
) : ParquetWriter.Builder<T, Builder<T>>(output) {
override fun self(): Builder<T> = this
@@ -49,7 +49,9 @@ public class LocalParquetWriter {
* Create a [Builder] instance that writes a Parquet file at the specified [path].
*/
@JvmStatic
- public fun <T> builder(path: Path, writeSupport: WriteSupport<T>): Builder<T> =
- Builder(LocalOutputFile(path), writeSupport)
+ public fun <T> builder(
+ path: Path,
+ writeSupport: WriteSupport<T>,
+ ): Builder<T> = Builder(LocalOutputFile(path), writeSupport)
}
}
diff --git a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
index b6c5a423..fc90aded 100644
--- a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
+++ b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
@@ -51,49 +51,52 @@ import java.nio.file.Path
internal class ParquetTest {
private lateinit var path: Path
- private val schema = Types.buildMessage()
- .addField(
- Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED)
- .named("field")
- )
- .named("test")
- private val writeSupport = object : WriteSupport<Int>() {
- lateinit var recordConsumer: RecordConsumer
-
- override fun init(configuration: Configuration): WriteContext {
- return WriteContext(schema, emptyMap())
- }
+ private val schema =
+ Types.buildMessage()
+ .addField(
+ Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED)
+ .named("field"),
+ )
+ .named("test")
+ private val writeSupport =
+ object : WriteSupport<Int>() {
+ lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(schema, emptyMap())
+ }
- override fun prepareForWrite(recordConsumer: RecordConsumer) {
- this.recordConsumer = recordConsumer
- }
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
- override fun write(record: Int) {
- val consumer = recordConsumer
+ override fun write(record: Int) {
+ val consumer = recordConsumer
- consumer.startMessage()
- consumer.startField("field", 0)
- consumer.addInteger(record)
- consumer.endField("field", 0)
- consumer.endMessage()
+ consumer.startMessage()
+ consumer.startField("field", 0)
+ consumer.addInteger(record)
+ consumer.endField("field", 0)
+ consumer.endMessage()
+ }
}
- }
- private val readSupport = object : ReadSupport<Int>() {
- @Suppress("OVERRIDE_DEPRECATION")
- override fun init(
- configuration: Configuration,
- keyValueMetaData: Map<String, String>,
- fileSchema: MessageType
- ): ReadContext = ReadContext(fileSchema)
-
- override fun prepareForRead(
- configuration: Configuration,
- keyValueMetaData: Map<String, String>,
- fileSchema: MessageType,
- readContext: ReadContext
- ): RecordMaterializer<Int> = TestRecordMaterializer()
- }
+ private val readSupport =
+ object : ReadSupport<Int>() {
+ @Suppress("OVERRIDE_DEPRECATION")
+ override fun init(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ ): ReadContext = ReadContext(fileSchema)
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext,
+ ): RecordMaterializer<Int> = TestRecordMaterializer()
+ }
/**
* Set up the test
@@ -117,9 +120,10 @@ internal class ParquetTest {
@Test
fun testSmoke() {
val n = 4
- val writer = LocalParquetWriter.builder(path, writeSupport)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- .build()
+ val writer =
+ LocalParquetWriter.builder(path, writeSupport)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .build()
try {
repeat(n) { i ->
@@ -166,19 +170,23 @@ internal class ParquetTest {
private class TestRecordMaterializer : RecordMaterializer<Int>() {
private var current: Int = 0
- private val fieldConverter = object : PrimitiveConverter() {
- override fun addInt(value: Int) {
- current = value
+ private val fieldConverter =
+ object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ current = value
+ }
}
- }
- private val root = object : GroupConverter() {
- override fun getConverter(fieldIndex: Int): Converter {
- require(fieldIndex == 0)
- return fieldConverter
+ private val root =
+ object : GroupConverter() {
+ override fun getConverter(fieldIndex: Int): Converter {
+ require(fieldIndex == 0)
+ return fieldConverter
+ }
+
+ override fun start() {}
+
+ override fun end() {}
}
- override fun start() {}
- override fun end() {}
- }
override fun getCurrentRecord(): Int = current