summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt6
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt6
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt2
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-parquet/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt (renamed from opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt)4
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt53
7 files changed, 50 insertions, 25 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
index 2b7cac8f..72dbba90 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
@@ -29,9 +29,9 @@ import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.telemetry.compute.table.HostTableReader
-import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
-import org.opendc.trace.util.parquet.UUID_SCHEMA
-import org.opendc.trace.util.parquet.optional
+import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA
+import org.opendc.trace.util.avro.UUID_SCHEMA
+import org.opendc.trace.util.avro.optional
import java.io.File
/**
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
index 144b6624..aac6115f 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
@@ -29,9 +29,9 @@ import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.telemetry.compute.table.ServerTableReader
-import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
-import org.opendc.trace.util.parquet.UUID_SCHEMA
-import org.opendc.trace.util.parquet.optional
+import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA
+import org.opendc.trace.util.avro.UUID_SCHEMA
+import org.opendc.trace.util.avro.optional
import java.io.File
/**
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
index ec8a2b65..2db30bc4 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
@@ -26,7 +26,7 @@ import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericRecordBuilder
import org.opendc.telemetry.compute.table.ServiceTableReader
-import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
+import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA
import java.io.File
/**
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index 36a1b4a0..1a15c7b3 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -34,7 +34,7 @@ import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
import org.opendc.trace.util.parquet.LocalOutputFile
import org.opendc.trace.util.parquet.LocalParquetReader
-import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
+import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA
import shaded.parquet.com.fasterxml.jackson.core.JsonEncoding
import shaded.parquet.com.fasterxml.jackson.core.JsonFactory
import java.nio.file.Files
diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts
index 302c0b14..e6415586 100644
--- a/opendc-trace/opendc-trace-parquet/build.gradle.kts
+++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts
@@ -32,7 +32,7 @@ dependencies {
api(libs.parquet) {
exclude(group = "org.apache.hadoop")
}
- runtimeOnly(libs.hadoop.common) {
+ api(libs.hadoop.common) {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
exclude(group = "log4j")
exclude(group = "org.apache.hadoop")
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt
index 086b900b..a655d39f 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -21,7 +21,7 @@
*/
@file:JvmName("AvroUtils")
-package org.opendc.trace.util.parquet
+package org.opendc.trace.util.avro
import org.apache.avro.LogicalTypes
import org.apache.avro.Schema
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
index ef9eaeb3..bb2bb10d 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
@@ -24,6 +24,7 @@ package org.opendc.trace.util.parquet
import org.apache.parquet.avro.AvroParquetReader
import org.apache.parquet.hadoop.ParquetReader
+import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.io.InputFile
import java.io.File
import java.io.IOException
@@ -32,11 +33,15 @@ import java.nio.file.Path
import kotlin.io.path.isDirectory
/**
- * A helper class to read Parquet files.
+ * A helper class to read Parquet files from the filesystem.
+ *
+ * This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets.
*
* @param path The path to the Parquet file or directory to read.
+ * @param factory Function to construct a [ParquetReader] for a local [InputFile].
*/
-public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
+public class LocalParquetReader<out T>(path: Path,
+ private val factory: (InputFile) -> ParquetReader<T> = avro()) : AutoCloseable {
/**
* The input files to process.
*/
@@ -93,20 +98,40 @@ public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
private fun initReader() {
reader?.close()
- this.reader = if (filesIterator.hasNext()) {
- createReader(filesIterator.next())
- } else {
- null
+ try {
+ this.reader = if (filesIterator.hasNext()) {
+ factory(filesIterator.next())
+ } else {
+ null
+ }
+ } catch (e: Throwable) {
+ this.reader = null
+ throw e
}
}
- /**
- * Create a Parquet reader for the specified file.
- */
- private fun createReader(input: InputFile): ParquetReader<T> {
- return AvroParquetReader
- .builder<T>(input)
- .disableCompatibility()
- .build()
+ public companion object {
+ /**
+ * A factory for reading Avro Parquet files.
+ */
+ public fun <T> avro(): (InputFile) -> ParquetReader<T> {
+ return { input ->
+ AvroParquetReader
+ .builder<T>(input)
+ .disableCompatibility()
+ .build()
+ }
+ }
+
+ /**
+ * A factory for reading Parquet files with custom [ReadSupport].
+ */
+ public fun <T> custom(readSupport: ReadSupport<T>): (InputFile) -> ParquetReader<T> {
+ return { input ->
+ object : ParquetReader.Builder<T>(input) {
+ override fun getReadSupport(): ReadSupport<T> = readSupport
+ }.build()
+ }
+ }
}
}