summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-parquet/src/main
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2024-03-05 13:23:57 +0100
committerGitHub <noreply@github.com>2024-03-05 13:23:57 +0100
commit5864cbcbfe2eb8c36ca05c3a39c7e5916aeecaec (patch)
tree5b2773b8dc21c2e1b526fb70f829c376dd80532a /opendc-trace/opendc-trace-parquet/src/main
parentd28002a3c151d198298574312f32f1cb43f3a660 (diff)
Updated package versions, updated web server tests. (#207)
* Updated all package versions including kotlin. Updated all web-server tests to run. * Changed the java version of the tests. OpenDC now only supports java 19. * small update * test update * new update * updated docker version to 19 * updated docker version to 19
Diffstat (limited to 'opendc-trace/opendc-trace-parquet/src/main')
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt87
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt9
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt32
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt8
4 files changed, 74 insertions, 62 deletions
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt
index fd2e00cd..a60b426a 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt
@@ -47,61 +47,66 @@ public class LocalInputFile(private val path: Path) : InputFile {
override fun getLength(): Long = channel.size()
- override fun newStream(): SeekableInputStream = object : SeekableInputStream() {
- override fun read(buf: ByteBuffer): Int {
- return channel.read(buf)
- }
+ override fun newStream(): SeekableInputStream =
+ object : SeekableInputStream() {
+ override fun read(buf: ByteBuffer): Int {
+ return channel.read(buf)
+ }
- override fun read(): Int {
- val single = ByteBuffer.allocate(1)
- var read: Int
+ override fun read(): Int {
+ val single = ByteBuffer.allocate(1)
+ var read: Int
- // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte
- do {
- read = channel.read(single)
- } while (read == 0)
+ // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte
+ do {
+ read = channel.read(single)
+ } while (read == 0)
- return if (read == -1) {
- read
- } else {
- single.get(0).toInt() and 0xff
+ return if (read == -1) {
+ read
+ } else {
+ single.get(0).toInt() and 0xff
+ }
}
- }
- override fun getPos(): Long {
- return channel.position()
- }
+ override fun getPos(): Long {
+ return channel.position()
+ }
- override fun seek(newPos: Long) {
- channel.position(newPos)
- }
+ override fun seek(newPos: Long) {
+ channel.position(newPos)
+ }
- override fun readFully(bytes: ByteArray) {
- readFully(ByteBuffer.wrap(bytes))
- }
+ override fun readFully(bytes: ByteArray) {
+ readFully(ByteBuffer.wrap(bytes))
+ }
- override fun readFully(bytes: ByteArray, start: Int, len: Int) {
- readFully(ByteBuffer.wrap(bytes, start, len))
- }
+ override fun readFully(
+ bytes: ByteArray,
+ start: Int,
+ len: Int,
+ ) {
+ readFully(ByteBuffer.wrap(bytes, start, len))
+ }
- override fun readFully(buf: ByteBuffer) {
- var remainder = buf.remaining()
- while (remainder > 0) {
- val read = channel.read(buf)
- remainder -= read
+ override fun readFully(buf: ByteBuffer) {
+ var remainder = buf.remaining()
+ while (remainder > 0) {
+ val read = channel.read(buf)
+ remainder -= read
- if (read == -1 && remainder > 0) {
- throw EOFException()
+ if (read == -1 && remainder > 0) {
+ throw EOFException()
+ }
}
}
- }
- override fun close() {
- channel.close()
- }
+ override fun close() {
+ channel.close()
+ }
- override fun toString(): String = "NioSeekableInputStream"
- }
+ override fun toString(): String = "NioSeekableInputStream"
+ }
override fun toString(): String = "LocalInputFile[path=$path]"
}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt
index 1b17ae5d..24627b45 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt
@@ -51,8 +51,7 @@ public class LocalOutputFile(private val path: Path) : OutputFile {
override fun supportsBlockSize(): Boolean = false
- override fun defaultBlockSize(): Long =
- throw UnsupportedOperationException("Local filesystem does not have default block size")
+ override fun defaultBlockSize(): Long = throw UnsupportedOperationException("Local filesystem does not have default block size")
override fun getPath(): String = path.toString()
@@ -77,7 +76,11 @@ public class LocalOutputFile(private val path: Path) : OutputFile {
_pos += b.size
}
- override fun write(b: ByteArray, off: Int, len: Int) {
+ override fun write(
+ b: ByteArray,
+ off: Int,
+ len: Int,
+ ) {
output.write(b, off, len)
_pos += len
}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
index de8a56d0..b503254e 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
@@ -43,20 +43,21 @@ import kotlin.io.path.isDirectory
public class LocalParquetReader<out T>(
path: Path,
private val readSupport: ReadSupport<T>,
- private val strictTyping: Boolean = true
+ private val strictTyping: Boolean = true,
) : AutoCloseable {
/**
* The input files to process.
*/
- private val filesIterator = if (path.isDirectory()) {
- Files.list(path)
- .filter { !it.isDirectory() }
- .sorted()
- .map { LocalInputFile(it) }
- .iterator()
- } else {
- listOf(LocalInputFile(path)).iterator()
- }
+ private val filesIterator =
+ if (path.isDirectory()) {
+ Files.list(path)
+ .filter { !it.isDirectory() }
+ .sorted()
+ .map { LocalInputFile(it) }
+ .iterator()
+ } else {
+ listOf(LocalInputFile(path)).iterator()
+ }
/**
* The Parquet reader to use.
@@ -104,11 +105,12 @@ public class LocalParquetReader<out T>(
reader?.close()
try {
- this.reader = if (filesIterator.hasNext()) {
- createReader(filesIterator.next())
- } else {
- null
- }
+ this.reader =
+ if (filesIterator.hasNext()) {
+ createReader(filesIterator.next())
+ } else {
+ null
+ }
} catch (e: Throwable) {
this.reader = null
throw e
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt
index b5eb1deb..c7028fc3 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt
@@ -37,7 +37,7 @@ public class LocalParquetWriter {
*/
public class Builder<T> internal constructor(
output: OutputFile,
- private val writeSupport: WriteSupport<T>
+ private val writeSupport: WriteSupport<T>,
) : ParquetWriter.Builder<T, Builder<T>>(output) {
override fun self(): Builder<T> = this
@@ -49,7 +49,9 @@ public class LocalParquetWriter {
* Create a [Builder] instance that writes a Parquet file at the specified [path].
*/
@JvmStatic
- public fun <T> builder(path: Path, writeSupport: WriteSupport<T>): Builder<T> =
- Builder(LocalOutputFile(path), writeSupport)
+ public fun <T> builder(
+ path: Path,
+ writeSupport: WriteSupport<T>,
+ ): Builder<T> = Builder(LocalOutputFile(path), writeSupport)
}
}