summaryrefslogtreecommitdiff
path: root/opendc-format/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-31 15:14:46 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-02 09:26:42 +0200
commite8cdfbcec3f75b3f303ce52bac5f5595a94555e4 (patch)
treec15b491a10ef4215ac8640c873ed56651971a486 /opendc-format/src/main
parent23c1502c2668305fd5f4c38c6c794c985d2037e3 (diff)
refactor(trace): Extract Parquet helpers into separate module
This change extracts the Parquet helpers outside format module into a new module, in order to improve re-usability of these helpers.
Diffstat (limited to 'opendc-format/src/main')
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt2
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt107
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt95
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt112
4 files changed, 1 insertions, 315 deletions
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
index dde1b340..e8e72f0e 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
@@ -25,8 +25,8 @@ package org.opendc.format.trace.wtf
import org.apache.avro.generic.GenericRecord
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
-import org.opendc.format.util.LocalParquetReader
import org.opendc.simulator.compute.workload.SimFlopsWorkload
+import org.opendc.trace.util.parquet.LocalParquetReader
import org.opendc.workflow.api.Job
import org.opendc.workflow.api.Task
import org.opendc.workflow.api.WORKFLOW_TASK_CORES
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt
deleted file mode 100644
index 92319ace..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.util
-
-import org.apache.parquet.io.InputFile
-import org.apache.parquet.io.SeekableInputStream
-import java.io.EOFException
-import java.io.File
-import java.nio.ByteBuffer
-import java.nio.channels.FileChannel
-import java.nio.file.Path
-import java.nio.file.StandardOpenOption
-
-/**
- * An [InputFile] on the local filesystem.
- */
-public class LocalInputFile(private val path: Path) : InputFile {
- /**
- * The [FileChannel] used for accessing the input path.
- */
- private val channel = FileChannel.open(path, StandardOpenOption.READ)
-
- /**
- * Construct a [LocalInputFile] for the specified [file].
- */
- public constructor(file: File) : this(file.toPath())
-
- override fun getLength(): Long = channel.size()
-
- override fun newStream(): SeekableInputStream = object : SeekableInputStream() {
- override fun read(buf: ByteBuffer): Int {
- return channel.read(buf)
- }
-
- override fun read(): Int {
- val single = ByteBuffer.allocate(1)
- var read: Int
-
- // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte
- do {
- read = channel.read(single)
- } while (read == 0)
-
- return if (read == -1) {
- read
- } else {
- single.get(0).toInt() and 0xff
- }
- }
-
- override fun getPos(): Long {
- return channel.position()
- }
-
- override fun seek(newPos: Long) {
- channel.position(newPos)
- }
-
- override fun readFully(bytes: ByteArray) {
- readFully(ByteBuffer.wrap(bytes))
- }
-
- override fun readFully(bytes: ByteArray, start: Int, len: Int) {
- readFully(ByteBuffer.wrap(bytes, start, len))
- }
-
- override fun readFully(buf: ByteBuffer) {
- var remainder = buf.remaining()
- while (remainder > 0) {
- val read = channel.read(buf)
- remainder -= read
-
- if (read == -1 && remainder > 0) {
- throw EOFException()
- }
- }
- }
-
- override fun close() {
- channel.close()
- }
-
- override fun toString(): String = "NioSeekableInputStream"
- }
-
- override fun toString(): String = "LocalInputFile[path=$path]"
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt
deleted file mode 100644
index 657bca5a..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.util
-
-import org.apache.parquet.io.OutputFile
-import org.apache.parquet.io.PositionOutputStream
-import java.io.File
-import java.io.OutputStream
-import java.nio.file.Files
-import java.nio.file.Path
-import java.nio.file.StandardOpenOption
-
-/**
- * An [OutputFile] on the local filesystem.
- */
-public class LocalOutputFile(private val path: Path) : OutputFile {
- /**
- * Construct a [LocalOutputFile] from the specified [file]
- */
- public constructor(file: File) : this(file.toPath())
-
- override fun create(blockSizeHint: Long): PositionOutputStream {
- val output = Files.newOutputStream(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)
- return NioPositionOutputStream(output)
- }
-
- override fun createOrOverwrite(blockSizeHint: Long): PositionOutputStream {
- val output = Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)
- return NioPositionOutputStream(output)
- }
-
- override fun supportsBlockSize(): Boolean = false
-
- override fun defaultBlockSize(): Long =
- throw UnsupportedOperationException("Local filesystem does not have default block size")
-
- override fun getPath(): String = path.toString()
-
- /**
- * Implementation of [PositionOutputStream] for an [OutputStream].
- */
- private class NioPositionOutputStream(private val output: OutputStream) : PositionOutputStream() {
- /**
- * The current position in the file.
- */
- private var _pos = 0L
-
- override fun getPos(): Long = _pos
-
- override fun write(b: Int) {
- output.write(b)
- _pos++
- }
-
- override fun write(b: ByteArray) {
- output.write(b)
- _pos += b.size
- }
-
- override fun write(b: ByteArray, off: Int, len: Int) {
- output.write(b, off, len)
- _pos += len
- }
-
- override fun flush() {
- output.flush()
- }
-
- override fun close() {
- output.close()
- }
-
- override fun toString(): String = "NioPositionOutputStream[output=$output]"
- }
-}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt
deleted file mode 100644
index 5083f3e1..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.util
-
-import org.apache.parquet.avro.AvroParquetReader
-import org.apache.parquet.hadoop.ParquetReader
-import org.apache.parquet.io.InputFile
-import java.io.File
-import java.io.IOException
-import java.nio.file.Files
-import java.nio.file.Path
-import kotlin.io.path.isDirectory
-
-/**
- * A helper class to read Parquet files.
- *
- * @param path The path to the Parquet file or directory to read.
- */
-public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
- /**
- * The input files to process.
- */
- private val filesIterator = if (path.isDirectory())
- Files.list(path)
- .filter { !it.isDirectory() }
- .sorted()
- .map { LocalInputFile(it) }
- .iterator()
- else
- listOf(LocalInputFile(path)).iterator()
-
- /**
- * The Parquet reader to use.
- */
- private var reader: ParquetReader<T>? = null
-
- /**
- * Construct a [LocalParquetReader] for the specified [file].
- */
- public constructor(file: File) : this(file.toPath())
-
- /**
- * Read a single entry in the Parquet file.
- */
- public fun read(): T? {
- return try {
- val next = reader?.read()
- if (next != null) {
- next
- } else {
- initReader()
-
- if (reader == null)
- null
- else
- read()
- }
- } catch (e: InterruptedException) {
- throw IOException(e)
- }
- }
-
- /**
- * Close the Parquet reader.
- */
- override fun close() {
- reader?.close()
- }
-
- /**
- * Initialize the next reader.
- */
- private fun initReader() {
- reader?.close()
-
- this.reader = if (filesIterator.hasNext()) {
- createReader(filesIterator.next())
- } else {
- null
- }
- }
-
- /**
- * Create a Parquet reader for the specified file.
- */
- private fun createReader(input: InputFile): ParquetReader<T> {
- return AvroParquetReader
- .builder<T>(input)
- .disableCompatibility()
- .build()
- }
-}