diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-08-31 15:14:46 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-02 09:26:42 +0200 |
| commit | e8cdfbcec3f75b3f303ce52bac5f5595a94555e4 (patch) | |
| tree | c15b491a10ef4215ac8640c873ed56651971a486 /opendc-format/src/main | |
| parent | 23c1502c2668305fd5f4c38c6c794c985d2037e3 (diff) | |
refactor(trace): Extract Parquet helpers into separate module
This change extracts the Parquet helpers outside format module into a
new module, in order to improve re-usability of these helpers.
Diffstat (limited to 'opendc-format/src/main')
4 files changed, 1 insertions, 315 deletions
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index dde1b340..e8e72f0e 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -25,8 +25,8 @@ package org.opendc.format.trace.wtf import org.apache.avro.generic.GenericRecord import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader -import org.opendc.format.util.LocalParquetReader import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.trace.util.parquet.LocalParquetReader import org.opendc.workflow.api.Job import org.opendc.workflow.api.Task import org.opendc.workflow.api.WORKFLOW_TASK_CORES diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt deleted file mode 100644 index 92319ace..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.util - -import org.apache.parquet.io.InputFile -import org.apache.parquet.io.SeekableInputStream -import java.io.EOFException -import java.io.File -import java.nio.ByteBuffer -import java.nio.channels.FileChannel -import java.nio.file.Path -import java.nio.file.StandardOpenOption - -/** - * An [InputFile] on the local filesystem. - */ -public class LocalInputFile(private val path: Path) : InputFile { - /** - * The [FileChannel] used for accessing the input path. - */ - private val channel = FileChannel.open(path, StandardOpenOption.READ) - - /** - * Construct a [LocalInputFile] for the specified [file]. - */ - public constructor(file: File) : this(file.toPath()) - - override fun getLength(): Long = channel.size() - - override fun newStream(): SeekableInputStream = object : SeekableInputStream() { - override fun read(buf: ByteBuffer): Int { - return channel.read(buf) - } - - override fun read(): Int { - val single = ByteBuffer.allocate(1) - var read: Int - - // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte - do { - read = channel.read(single) - } while (read == 0) - - return if (read == -1) { - read - } else { - single.get(0).toInt() and 0xff - } - } - - override fun getPos(): Long { - return channel.position() - } - - override fun seek(newPos: Long) { - channel.position(newPos) - } - - override fun readFully(bytes: ByteArray) { - readFully(ByteBuffer.wrap(bytes)) - } - - override fun readFully(bytes: ByteArray, start: Int, len: Int) { - readFully(ByteBuffer.wrap(bytes, start, len)) - } - - override fun readFully(buf: ByteBuffer) { - var remainder = buf.remaining() - while (remainder > 0) { - val read = channel.read(buf) - remainder -= read - - if (read == -1 && remainder > 0) { - throw EOFException() - } - } - } - - override fun close() { - channel.close() - } - - override fun toString(): String = "NioSeekableInputStream" - } - - override fun toString(): String = "LocalInputFile[path=$path]" -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt deleted file mode 100644 index 657bca5a..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.util - -import org.apache.parquet.io.OutputFile -import org.apache.parquet.io.PositionOutputStream -import java.io.File -import java.io.OutputStream -import java.nio.file.Files -import java.nio.file.Path -import java.nio.file.StandardOpenOption - -/** - * An [OutputFile] on the local filesystem. - */ -public class LocalOutputFile(private val path: Path) : OutputFile { - /** - * Construct a [LocalOutputFile] from the specified [file] - */ - public constructor(file: File) : this(file.toPath()) - - override fun create(blockSizeHint: Long): PositionOutputStream { - val output = Files.newOutputStream(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE) - return NioPositionOutputStream(output) - } - - override fun createOrOverwrite(blockSizeHint: Long): PositionOutputStream { - val output = Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING) - return NioPositionOutputStream(output) - } - - override fun supportsBlockSize(): Boolean = false - - override fun defaultBlockSize(): Long = - throw UnsupportedOperationException("Local filesystem does not have default block size") - - override fun getPath(): String = path.toString() - - /** - * Implementation of [PositionOutputStream] for an [OutputStream]. - */ - private class NioPositionOutputStream(private val output: OutputStream) : PositionOutputStream() { - /** - * The current position in the file. - */ - private var _pos = 0L - - override fun getPos(): Long = _pos - - override fun write(b: Int) { - output.write(b) - _pos++ - } - - override fun write(b: ByteArray) { - output.write(b) - _pos += b.size - } - - override fun write(b: ByteArray, off: Int, len: Int) { - output.write(b, off, len) - _pos += len - } - - override fun flush() { - output.flush() - } - - override fun close() { - output.close() - } - - override fun toString(): String = "NioPositionOutputStream[output=$output]" - } -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt deleted file mode 100644 index 5083f3e1..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.util - -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.hadoop.ParquetReader -import org.apache.parquet.io.InputFile -import java.io.File -import java.io.IOException -import java.nio.file.Files -import java.nio.file.Path -import kotlin.io.path.isDirectory - -/** - * A helper class to read Parquet files. - * - * @param path The path to the Parquet file or directory to read. - */ -public class LocalParquetReader<out T>(path: Path) : AutoCloseable { - /** - * The input files to process. - */ - private val filesIterator = if (path.isDirectory()) - Files.list(path) - .filter { !it.isDirectory() } - .sorted() - .map { LocalInputFile(it) } - .iterator() - else - listOf(LocalInputFile(path)).iterator() - - /** - * The Parquet reader to use. - */ - private var reader: ParquetReader<T>? = null - - /** - * Construct a [LocalParquetReader] for the specified [file]. - */ - public constructor(file: File) : this(file.toPath()) - - /** - * Read a single entry in the Parquet file. - */ - public fun read(): T? { - return try { - val next = reader?.read() - if (next != null) { - next - } else { - initReader() - - if (reader == null) - null - else - read() - } - } catch (e: InterruptedException) { - throw IOException(e) - } - } - - /** - * Close the Parquet reader. - */ - override fun close() { - reader?.close() - } - - /** - * Initialize the next reader. - */ - private fun initReader() { - reader?.close() - - this.reader = if (filesIterator.hasNext()) { - createReader(filesIterator.next()) - } else { - null - } - } - - /** - * Create a Parquet reader for the specified file. - */ - private fun createReader(input: InputFile): ParquetReader<T> { - return AvroParquetReader - .builder<T>(input) - .disableCompatibility() - .build() - } -} |
