From e8cdfbcec3f75b3f303ce52bac5f5595a94555e4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 31 Aug 2021 15:14:46 +0200 Subject: refactor(trace): Extract Parquet helpers into separate module This change extracts the Parquet helpers outside format module into a new module, in order to improve re-usability of these helpers. --- .../org/opendc/format/trace/wtf/WtfTraceReader.kt | 2 +- .../org/opendc/format/util/LocalInputFile.kt | 107 ------------------ .../org/opendc/format/util/LocalOutputFile.kt | 95 ---------------- .../org/opendc/format/util/LocalParquetReader.kt | 112 ------------------ .../kotlin/org/opendc/format/util/ParquetTest.kt | 125 --------------------- 5 files changed, 1 insertion(+), 440 deletions(-) delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt delete mode 100644 opendc-format/src/test/kotlin/org/opendc/format/util/ParquetTest.kt (limited to 'opendc-format/src') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index dde1b340..e8e72f0e 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -25,8 +25,8 @@ package org.opendc.format.trace.wtf import org.apache.avro.generic.GenericRecord import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader -import org.opendc.format.util.LocalParquetReader import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.trace.util.parquet.LocalParquetReader import org.opendc.workflow.api.Job import org.opendc.workflow.api.Task import org.opendc.workflow.api.WORKFLOW_TASK_CORES diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt deleted file mode 100644 index 92319ace..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.util - -import org.apache.parquet.io.InputFile -import org.apache.parquet.io.SeekableInputStream -import java.io.EOFException -import java.io.File -import java.nio.ByteBuffer -import java.nio.channels.FileChannel -import java.nio.file.Path -import java.nio.file.StandardOpenOption - -/** - * An [InputFile] on the local filesystem. - */ -public class LocalInputFile(private val path: Path) : InputFile { - /** - * The [FileChannel] used for accessing the input path. - */ - private val channel = FileChannel.open(path, StandardOpenOption.READ) - - /** - * Construct a [LocalInputFile] for the specified [file]. - */ - public constructor(file: File) : this(file.toPath()) - - override fun getLength(): Long = channel.size() - - override fun newStream(): SeekableInputStream = object : SeekableInputStream() { - override fun read(buf: ByteBuffer): Int { - return channel.read(buf) - } - - override fun read(): Int { - val single = ByteBuffer.allocate(1) - var read: Int - - // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte - do { - read = channel.read(single) - } while (read == 0) - - return if (read == -1) { - read - } else { - single.get(0).toInt() and 0xff - } - } - - override fun getPos(): Long { - return channel.position() - } - - override fun seek(newPos: Long) { - channel.position(newPos) - } - - override fun readFully(bytes: ByteArray) { - readFully(ByteBuffer.wrap(bytes)) - } - - override fun readFully(bytes: ByteArray, start: Int, len: Int) { - readFully(ByteBuffer.wrap(bytes, start, len)) - } - - override fun readFully(buf: ByteBuffer) { - var remainder = buf.remaining() - while (remainder > 0) { - val read = channel.read(buf) - remainder -= read - - if (read == -1 && remainder > 0) { - throw EOFException() - } - } - } - - override fun close() { - channel.close() - } - - override fun toString(): String = "NioSeekableInputStream" - } - - override fun toString(): String = "LocalInputFile[path=$path]" -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt deleted file mode 100644 index 657bca5a..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.util - -import org.apache.parquet.io.OutputFile -import org.apache.parquet.io.PositionOutputStream -import java.io.File -import java.io.OutputStream -import java.nio.file.Files -import java.nio.file.Path -import java.nio.file.StandardOpenOption - -/** - * An [OutputFile] on the local filesystem. - */ -public class LocalOutputFile(private val path: Path) : OutputFile { - /** - * Construct a [LocalOutputFile] from the specified [file] - */ - public constructor(file: File) : this(file.toPath()) - - override fun create(blockSizeHint: Long): PositionOutputStream { - val output = Files.newOutputStream(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE) - return NioPositionOutputStream(output) - } - - override fun createOrOverwrite(blockSizeHint: Long): PositionOutputStream { - val output = Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING) - return NioPositionOutputStream(output) - } - - override fun supportsBlockSize(): Boolean = false - - override fun defaultBlockSize(): Long = - throw UnsupportedOperationException("Local filesystem does not have default block size") - - override fun getPath(): String = path.toString() - - /** - * Implementation of [PositionOutputStream] for an [OutputStream]. - */ - private class NioPositionOutputStream(private val output: OutputStream) : PositionOutputStream() { - /** - * The current position in the file. - */ - private var _pos = 0L - - override fun getPos(): Long = _pos - - override fun write(b: Int) { - output.write(b) - _pos++ - } - - override fun write(b: ByteArray) { - output.write(b) - _pos += b.size - } - - override fun write(b: ByteArray, off: Int, len: Int) { - output.write(b, off, len) - _pos += len - } - - override fun flush() { - output.flush() - } - - override fun close() { - output.close() - } - - override fun toString(): String = "NioPositionOutputStream[output=$output]" - } -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt deleted file mode 100644 index 5083f3e1..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.util - -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.hadoop.ParquetReader -import org.apache.parquet.io.InputFile -import java.io.File -import java.io.IOException -import java.nio.file.Files -import java.nio.file.Path -import kotlin.io.path.isDirectory - -/** - * A helper class to read Parquet files. - * - * @param path The path to the Parquet file or directory to read. - */ -public class LocalParquetReader(path: Path) : AutoCloseable { - /** - * The input files to process. - */ - private val filesIterator = if (path.isDirectory()) - Files.list(path) - .filter { !it.isDirectory() } - .sorted() - .map { LocalInputFile(it) } - .iterator() - else - listOf(LocalInputFile(path)).iterator() - - /** - * The Parquet reader to use. - */ - private var reader: ParquetReader? = null - - /** - * Construct a [LocalParquetReader] for the specified [file]. - */ - public constructor(file: File) : this(file.toPath()) - - /** - * Read a single entry in the Parquet file. - */ - public fun read(): T? { - return try { - val next = reader?.read() - if (next != null) { - next - } else { - initReader() - - if (reader == null) - null - else - read() - } - } catch (e: InterruptedException) { - throw IOException(e) - } - } - - /** - * Close the Parquet reader. - */ - override fun close() { - reader?.close() - } - - /** - * Initialize the next reader. - */ - private fun initReader() { - reader?.close() - - this.reader = if (filesIterator.hasNext()) { - createReader(filesIterator.next()) - } else { - null - } - } - - /** - * Create a Parquet reader for the specified file. - */ - private fun createReader(input: InputFile): ParquetReader { - return AvroParquetReader - .builder(input) - .disableCompatibility() - .build() - } -} diff --git a/opendc-format/src/test/kotlin/org/opendc/format/util/ParquetTest.kt b/opendc-format/src/test/kotlin/org/opendc/format/util/ParquetTest.kt deleted file mode 100644 index e496dd96..00000000 --- a/opendc-format/src/test/kotlin/org/opendc/format/util/ParquetTest.kt +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.util - -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.ParquetFileWriter -import org.junit.jupiter.api.* -import org.junit.jupiter.api.Assertions.assertEquals -import java.io.File -import java.nio.file.FileAlreadyExistsException -import java.nio.file.NoSuchFileException - -/** - * Test suite for the Parquet helper classes. - */ -internal class ParquetTest { - private val schema = SchemaBuilder - .record("test") - .namespace("org.opendc.format.util") - .fields() - .name("field").type().intType().noDefault() - .endRecord() - - private lateinit var file: File - - /** - * Setup the test - */ - @BeforeEach - fun setUp() { - file = File.createTempFile("opendc", "parquet") - } - - /** - * Tear down the test. - */ - @AfterEach - fun tearDown() { - file.delete() - } - - /** - * Initial test to verify whether the Parquet writer works. - */ - @Test - fun testSmoke() { - val n = 4 - val writer = AvroParquetWriter.builder(LocalOutputFile(file)) - .withSchema(schema) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build() - - try { - repeat(n) { i -> - val record = GenericData.Record(schema) - record.put("field", i) - writer.write(record) - } - } finally { - writer.close() - } - - val reader = AvroParquetReader.builder(LocalInputFile(file)) - .build() - - var counter = 0 - try { - while (true) { - val record = reader.read() ?: break - assertEquals(counter++, record.get("field")) - } - } finally { - reader.close() - } - - assertEquals(n, counter) - } - - /** - * Test if overwriting fails if not specified. - */ - @Test - fun testOverwrite() { - assertThrows { - AvroParquetWriter.builder(LocalOutputFile(file)) - .withSchema(schema) - .build() - } - } - - /** - * Test non-existent file. - */ - @Test - fun testNonExistent() { - file.delete() - assertThrows { - AvroParquetReader.builder(LocalInputFile(file)) - .build() - } - } -} -- cgit v1.2.3