diff options
| -rw-r--r-- | opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt | 6 | ||||
| -rw-r--r-- | opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt | 6 | ||||
| -rw-r--r-- | opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt | 2 | ||||
| -rw-r--r-- | opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt | 2 | ||||
| -rw-r--r-- | opendc-trace/opendc-trace-parquet/build.gradle.kts | 2 | ||||
| -rw-r--r-- | opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt (renamed from opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt) | 4 | ||||
| -rw-r--r-- | opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt | 53 |
7 files changed, 50 insertions, 25 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt index 2b7cac8f..72dbba90 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt @@ -29,9 +29,9 @@ import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA -import org.opendc.trace.util.parquet.UUID_SCHEMA -import org.opendc.trace.util.parquet.optional +import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA +import org.opendc.trace.util.avro.UUID_SCHEMA +import org.opendc.trace.util.avro.optional import java.io.File /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt index 144b6624..aac6115f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt @@ -29,9 +29,9 @@ import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.opendc.telemetry.compute.table.ServerTableReader -import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA -import org.opendc.trace.util.parquet.UUID_SCHEMA -import org.opendc.trace.util.parquet.optional +import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA +import org.opendc.trace.util.avro.UUID_SCHEMA +import org.opendc.trace.util.avro.optional import java.io.File /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt index ec8a2b65..2db30bc4 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt @@ -26,7 +26,7 @@ import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericRecordBuilder import org.opendc.telemetry.compute.table.ServiceTableReader -import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA import java.io.File /** diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 36a1b4a0..1a15c7b3 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -34,7 +34,7 @@ import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.parquet.LocalOutputFile import org.opendc.trace.util.parquet.LocalParquetReader -import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA import shaded.parquet.com.fasterxml.jackson.core.JsonEncoding import shaded.parquet.com.fasterxml.jackson.core.JsonFactory import java.nio.file.Files diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts index 302c0b14..e6415586 100644 --- a/opendc-trace/opendc-trace-parquet/build.gradle.kts +++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts @@ -32,7 +32,7 @@ dependencies { api(libs.parquet) { exclude(group = "org.apache.hadoop") } - runtimeOnly(libs.hadoop.common) { + api(libs.hadoop.common) { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") exclude(group = "org.apache.hadoop") diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt index 086b900b..a655d39f 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -21,7 +21,7 @@ */ @file:JvmName("AvroUtils") -package org.opendc.trace.util.parquet +package org.opendc.trace.util.avro import org.apache.avro.LogicalTypes import org.apache.avro.Schema diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt index ef9eaeb3..bb2bb10d 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt @@ -24,6 +24,7 @@ package org.opendc.trace.util.parquet import org.apache.parquet.avro.AvroParquetReader import org.apache.parquet.hadoop.ParquetReader +import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.InputFile import java.io.File import java.io.IOException @@ -32,11 +33,15 @@ import java.nio.file.Path import kotlin.io.path.isDirectory /** - * A helper class to read Parquet files. + * A helper class to read Parquet files from the filesystem. + * + * This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets. * * @param path The path to the Parquet file or directory to read. + * @param factory Function to construct a [ParquetReader] for a local [InputFile]. */ -public class LocalParquetReader<out T>(path: Path) : AutoCloseable { +public class LocalParquetReader<out T>(path: Path, + private val factory: (InputFile) -> ParquetReader<T> = avro()) : AutoCloseable { /** * The input files to process. */ @@ -93,20 +98,40 @@ public class LocalParquetReader<out T>(path: Path) : AutoCloseable { private fun initReader() { reader?.close() - this.reader = if (filesIterator.hasNext()) { - createReader(filesIterator.next()) - } else { - null + try { + this.reader = if (filesIterator.hasNext()) { + factory(filesIterator.next()) + } else { + null + } + } catch (e: Throwable) { + this.reader = null + throw e } } - /** - * Create a Parquet reader for the specified file. - */ - private fun createReader(input: InputFile): ParquetReader<T> { - return AvroParquetReader - .builder<T>(input) - .disableCompatibility() - .build() + public companion object { + /** + * A factory for reading Avro Parquet files. + */ + public fun <T> avro(): (InputFile) -> ParquetReader<T> { + return { input -> + AvroParquetReader + .builder<T>(input) + .disableCompatibility() + .build() + } + } + + /** + * A factory for reading Parquet files with custom [ReadSupport]. + */ + public fun <T> custom(readSupport: ReadSupport<T>): (InputFile) -> ParquetReader<T> { + return { input -> + object : ParquetReader.Builder<T>(input) { + override fun getReadSupport(): ReadSupport<T> = readSupport + }.build() + } + } } } |
