summaryrefslogtreecommitdiff
path: root/opendc-format
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-31 15:14:46 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-02 09:26:42 +0200
commite8cdfbcec3f75b3f303ce52bac5f5595a94555e4 (patch)
treec15b491a10ef4215ac8640c873ed56651971a486 /opendc-format
parent23c1502c2668305fd5f4c38c6c794c985d2037e3 (diff)
refactor(trace): Extract Parquet helpers into separate module
This change extracts the Parquet helpers outside format module into a new module, in order to improve re-usability of these helpers.
Diffstat (limited to 'opendc-format')
-rw-r--r--opendc-format/build.gradle.kts24
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt2
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt107
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt95
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt112
-rw-r--r--opendc-format/src/test/kotlin/org/opendc/format/util/ParquetTest.kt125
6 files changed, 2 insertions, 463 deletions
diff --git a/opendc-format/build.gradle.kts b/opendc-format/build.gradle.kts
index c1258428..0c7f2a51 100644
--- a/opendc-format/build.gradle.kts
+++ b/opendc-format/build.gradle.kts
@@ -41,29 +41,7 @@ dependencies {
implementation(libs.jackson.dataformat.csv)
implementation("org.jetbrains.kotlin:kotlin-reflect:1.5.30")
- /* This configuration is necessary for a slim dependency on Apache Parquet */
- implementation(libs.parquet) {
- exclude(group = "org.apache.hadoop")
- }
- runtimeOnly(libs.hadoop.common) {
- exclude(group = "org.slf4j", module = "slf4j-log4j12")
- exclude(group = "log4j")
- exclude(group = "org.apache.hadoop")
- exclude(group = "org.apache.curator")
- exclude(group = "org.apache.zookeeper")
- exclude(group = "org.apache.kerby")
- exclude(group = "org.apache.httpcomponents")
- exclude(group = "org.apache.htrace")
- exclude(group = "commons-cli")
- exclude(group = "javax.servlet")
- exclude(group = "org.eclipse.jetty")
- exclude(group = "com.sun.jersey")
- exclude(group = "com.jcraft")
- exclude(group = "dnsjava")
- }
- runtimeOnly(libs.hadoop.mapreduce.client.core) {
- isTransitive = false
- }
+ implementation(projects.opendcTrace.opendcTraceParquet)
testRuntimeOnly(libs.slf4j.simple)
}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
index dde1b340..e8e72f0e 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
@@ -25,8 +25,8 @@ package org.opendc.format.trace.wtf
import org.apache.avro.generic.GenericRecord
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
-import org.opendc.format.util.LocalParquetReader
import org.opendc.simulator.compute.workload.SimFlopsWorkload
+import org.opendc.trace.util.parquet.LocalParquetReader
import org.opendc.workflow.api.Job
import org.opendc.workflow.api.Task
import org.opendc.workflow.api.WORKFLOW_TASK_CORES
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt
deleted file mode 100644
index 92319ace..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.util
-
-import org.apache.parquet.io.InputFile
-import org.apache.parquet.io.SeekableInputStream
-import java.io.EOFException
-import java.io.File
-import java.nio.ByteBuffer
-import java.nio.channels.FileChannel
-import java.nio.file.Path
-import java.nio.file.StandardOpenOption
-
-/**
- * An [InputFile] on the local filesystem.
- */
-public class LocalInputFile(private val path: Path) : InputFile {
- /**
- * The [FileChannel] used for accessing the input path.
- */
- private val channel = FileChannel.open(path, StandardOpenOption.READ)
-
- /**
- * Construct a [LocalInputFile] for the specified [file].
- */
- public constructor(file: File) : this(file.toPath())
-
- override fun getLength(): Long = channel.size()
-
- override fun newStream(): SeekableInputStream = object : SeekableInputStream() {
- override fun read(buf: ByteBuffer): Int {
- return channel.read(buf)
- }
-
- override fun read(): Int {
- val single = ByteBuffer.allocate(1)
- var read: Int
-
- // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte
- do {
- read = channel.read(single)
- } while (read == 0)
-
- return if (read == -1) {
- read
- } else {
- single.get(0).toInt() and 0xff
- }
- }
-
- override fun getPos(): Long {
- return channel.position()
- }
-
- override fun seek(newPos: Long) {
- channel.position(newPos)
- }
-
- override fun readFully(bytes: ByteArray) {
- readFully(ByteBuffer.wrap(bytes))
- }
-
- override fun readFully(bytes: ByteArray, start: Int, len: Int) {
- readFully(ByteBuffer.wrap(bytes, start, len))
- }
-
- override fun readFully(buf: ByteBuffer) {
- var remainder = buf.remaining()
- while (remainder > 0) {
- val read = channel.read(buf)
- remainder -= read
-
- if (read == -1 && remainder > 0) {
- throw EOFException()
- }
- }
- }
-
- override fun close() {
- channel.close()
- }
-
- override fun toString(): String = "NioSeekableInputStream"
- }
-
- override fun toString(): String = "LocalInputFile[path=$path]"
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt
deleted file mode 100644
index 657bca5a..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.util
-
-import org.apache.parquet.io.OutputFile
-import org.apache.parquet.io.PositionOutputStream
-import java.io.File
-import java.io.OutputStream
-import java.nio.file.Files
-import java.nio.file.Path
-import java.nio.file.StandardOpenOption
-
-/**
- * An [OutputFile] on the local filesystem.
- */
-public class LocalOutputFile(private val path: Path) : OutputFile {
- /**
- * Construct a [LocalOutputFile] from the specified [file]
- */
- public constructor(file: File) : this(file.toPath())
-
- override fun create(blockSizeHint: Long): PositionOutputStream {
- val output = Files.newOutputStream(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)
- return NioPositionOutputStream(output)
- }
-
- override fun createOrOverwrite(blockSizeHint: Long): PositionOutputStream {
- val output = Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)
- return NioPositionOutputStream(output)
- }
-
- override fun supportsBlockSize(): Boolean = false
-
- override fun defaultBlockSize(): Long =
- throw UnsupportedOperationException("Local filesystem does not have default block size")
-
- override fun getPath(): String = path.toString()
-
- /**
- * Implementation of [PositionOutputStream] for an [OutputStream].
- */
- private class NioPositionOutputStream(private val output: OutputStream) : PositionOutputStream() {
- /**
- * The current position in the file.
- */
- private var _pos = 0L
-
- override fun getPos(): Long = _pos
-
- override fun write(b: Int) {
- output.write(b)
- _pos++
- }
-
- override fun write(b: ByteArray) {
- output.write(b)
- _pos += b.size
- }
-
- override fun write(b: ByteArray, off: Int, len: Int) {
- output.write(b, off, len)
- _pos += len
- }
-
- override fun flush() {
- output.flush()
- }
-
- override fun close() {
- output.close()
- }
-
- override fun toString(): String = "NioPositionOutputStream[output=$output]"
- }
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt
deleted file mode 100644
index 5083f3e1..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.util
-
-import org.apache.parquet.avro.AvroParquetReader
-import org.apache.parquet.hadoop.ParquetReader
-import org.apache.parquet.io.InputFile
-import java.io.File
-import java.io.IOException
-import java.nio.file.Files
-import java.nio.file.Path
-import kotlin.io.path.isDirectory
-
-/**
- * A helper class to read Parquet files.
- *
- * @param path The path to the Parquet file or directory to read.
- */
-public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
- /**
- * The input files to process.
- */
- private val filesIterator = if (path.isDirectory())
- Files.list(path)
- .filter { !it.isDirectory() }
- .sorted()
- .map { LocalInputFile(it) }
- .iterator()
- else
- listOf(LocalInputFile(path)).iterator()
-
- /**
- * The Parquet reader to use.
- */
- private var reader: ParquetReader<T>? = null
-
- /**
- * Construct a [LocalParquetReader] for the specified [file].
- */
- public constructor(file: File) : this(file.toPath())
-
- /**
- * Read a single entry in the Parquet file.
- */
- public fun read(): T? {
- return try {
- val next = reader?.read()
- if (next != null) {
- next
- } else {
- initReader()
-
- if (reader == null)
- null
- else
- read()
- }
- } catch (e: InterruptedException) {
- throw IOException(e)
- }
- }
-
- /**
- * Close the Parquet reader.
- */
- override fun close() {
- reader?.close()
- }
-
- /**
- * Initialize the next reader.
- */
- private fun initReader() {
- reader?.close()
-
- this.reader = if (filesIterator.hasNext()) {
- createReader(filesIterator.next())
- } else {
- null
- }
- }
-
- /**
- * Create a Parquet reader for the specified file.
- */
- private fun createReader(input: InputFile): ParquetReader<T> {
- return AvroParquetReader
- .builder<T>(input)
- .disableCompatibility()
- .build()
- }
-}
diff --git a/opendc-format/src/test/kotlin/org/opendc/format/util/ParquetTest.kt b/opendc-format/src/test/kotlin/org/opendc/format/util/ParquetTest.kt
deleted file mode 100644
index e496dd96..00000000
--- a/opendc-format/src/test/kotlin/org/opendc/format/util/ParquetTest.kt
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.util
-
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.parquet.avro.AvroParquetReader
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.ParquetFileWriter
-import org.junit.jupiter.api.*
-import org.junit.jupiter.api.Assertions.assertEquals
-import java.io.File
-import java.nio.file.FileAlreadyExistsException
-import java.nio.file.NoSuchFileException
-
-/**
- * Test suite for the Parquet helper classes.
- */
-internal class ParquetTest {
- private val schema = SchemaBuilder
- .record("test")
- .namespace("org.opendc.format.util")
- .fields()
- .name("field").type().intType().noDefault()
- .endRecord()
-
- private lateinit var file: File
-
- /**
- * Setup the test
- */
- @BeforeEach
- fun setUp() {
- file = File.createTempFile("opendc", "parquet")
- }
-
- /**
- * Tear down the test.
- */
- @AfterEach
- fun tearDown() {
- file.delete()
- }
-
- /**
- * Initial test to verify whether the Parquet writer works.
- */
- @Test
- fun testSmoke() {
- val n = 4
- val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(file))
- .withSchema(schema)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- .build()
-
- try {
- repeat(n) { i ->
- val record = GenericData.Record(schema)
- record.put("field", i)
- writer.write(record)
- }
- } finally {
- writer.close()
- }
-
- val reader = AvroParquetReader.builder<GenericData.Record>(LocalInputFile(file))
- .build()
-
- var counter = 0
- try {
- while (true) {
- val record = reader.read() ?: break
- assertEquals(counter++, record.get("field"))
- }
- } finally {
- reader.close()
- }
-
- assertEquals(n, counter)
- }
-
- /**
- * Test if overwriting fails if not specified.
- */
- @Test
- fun testOverwrite() {
- assertThrows<FileAlreadyExistsException> {
- AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(file))
- .withSchema(schema)
- .build()
- }
- }
-
- /**
- * Test non-existent file.
- */
- @Test
- fun testNonExistent() {
- file.delete()
- assertThrows<NoSuchFileException> {
- AvroParquetReader.builder<GenericData.Record>(LocalInputFile(file))
- .build()
- }
- }
-}