diff options
Diffstat (limited to 'opendc-trace')
| -rw-r--r-- | opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt | 2 | ||||
| -rw-r--r-- | opendc-trace/opendc-trace-parquet/build.gradle.kts | 2 | ||||
| -rw-r--r-- | opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt (renamed from opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt) | 4 | ||||
| -rw-r--r-- | opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt | 53 |
4 files changed, 43 insertions, 18 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 36a1b4a0..1a15c7b3 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -34,7 +34,7 @@ import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.parquet.LocalOutputFile import org.opendc.trace.util.parquet.LocalParquetReader -import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA import shaded.parquet.com.fasterxml.jackson.core.JsonEncoding import shaded.parquet.com.fasterxml.jackson.core.JsonFactory import java.nio.file.Files diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts index 302c0b14..e6415586 100644 --- a/opendc-trace/opendc-trace-parquet/build.gradle.kts +++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts @@ -32,7 +32,7 @@ dependencies { api(libs.parquet) { exclude(group = "org.apache.hadoop") } - runtimeOnly(libs.hadoop.common) { + api(libs.hadoop.common) { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") exclude(group = "org.apache.hadoop") diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt index 086b900b..a655d39f 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -21,7 +21,7 @@ */ @file:JvmName("AvroUtils") -package org.opendc.trace.util.parquet +package org.opendc.trace.util.avro import org.apache.avro.LogicalTypes import org.apache.avro.Schema diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt index ef9eaeb3..bb2bb10d 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt @@ -24,6 +24,7 @@ package org.opendc.trace.util.parquet import org.apache.parquet.avro.AvroParquetReader import org.apache.parquet.hadoop.ParquetReader +import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.InputFile import java.io.File import java.io.IOException @@ -32,11 +33,15 @@ import java.nio.file.Path import kotlin.io.path.isDirectory /** - * A helper class to read Parquet files. + * A helper class to read Parquet files from the filesystem. + * + * This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets. * * @param path The path to the Parquet file or directory to read. + * @param factory Function to construct a [ParquetReader] for a local [InputFile]. */ -public class LocalParquetReader<out T>(path: Path) : AutoCloseable { +public class LocalParquetReader<out T>(path: Path, + private val factory: (InputFile) -> ParquetReader<T> = avro()) : AutoCloseable { /** * The input files to process. */ @@ -93,20 +98,40 @@ public class LocalParquetReader<out T>(path: Path) : AutoCloseable { private fun initReader() { reader?.close() - this.reader = if (filesIterator.hasNext()) { - createReader(filesIterator.next()) - } else { - null + try { + this.reader = if (filesIterator.hasNext()) { + factory(filesIterator.next()) + } else { + null + } + } catch (e: Throwable) { + this.reader = null + throw e } } - /** - * Create a Parquet reader for the specified file. - */ - private fun createReader(input: InputFile): ParquetReader<T> { - return AvroParquetReader - .builder<T>(input) - .disableCompatibility() - .build() + public companion object { + /** + * A factory for reading Avro Parquet files. + */ + public fun <T> avro(): (InputFile) -> ParquetReader<T> { + return { input -> + AvroParquetReader + .builder<T>(input) + .disableCompatibility() + .build() + } + } + + /** + * A factory for reading Parquet files with custom [ReadSupport]. + */ + public fun <T> custom(readSupport: ReadSupport<T>): (InputFile) -> ParquetReader<T> { + return { input -> + object : ParquetReader.Builder<T>(input) { + override fun getReadSupport(): ReadSupport<T> = readSupport + }.build() + } + } } } |
