diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-06-09 12:36:37 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-06-09 12:43:25 +0200 |
| commit | 77075ff4680667da70bdee00be7fa83539a50439 (patch) | |
| tree | 21413cdcd4df4248cbafe48c366074f32ec9165b /opendc-experiments | |
| parent | 2837ee439aef908b3fe281b9707dbb961e700f1c (diff) | |
exp: Use LocalOutputFile for Parquet writers
This change updates the Parquet writers used in the Capelin experiments
to use our OutputFile implementation for local files, to reduce our
dependency on Apache Hadoop.
Diffstat (limited to 'opendc-experiments')
2 files changed, 5 insertions, 8 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt index 4fa6ae66..d8f7ff75 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt @@ -25,10 +25,10 @@ package org.opendc.experiments.capelin.telemetry.parquet import mu.KotlinLogging import org.apache.avro.Schema import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.experiments.capelin.telemetry.Event +import org.opendc.format.util.LocalOutputFile import java.io.Closeable import java.io.File import java.util.concurrent.ArrayBlockingQueue @@ -52,8 +52,7 @@ public open class ParquetEventWriter<in T : Event>( /** * The writer to write the Parquet file. */ - @Suppress("DEPRECATION") - private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath)) + private val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path)) .withSchema(schema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withPageSize(4 * 1024 * 1024) // For compression diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt index 1f9e289c..d0031a66 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt @@ -38,11 +38,11 @@ import me.tongfei.progressbar.ProgressBar import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.format.trace.sc20.Sc20VmPlacementReader +import org.opendc.format.util.LocalOutputFile import java.io.BufferedReader import java.io.File import java.io.FileReader @@ -109,16 +109,14 @@ public class TraceConverterCli : CliktCommand(name = "trace-converter") { traceParquet.delete() } - @Suppress("DEPRECATION") - val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(metaParquet.toURI())) + val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet)) .withSchema(metaSchema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withPageSize(4 * 1024 * 1024) // For compression .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) .build() - @Suppress("DEPRECATION") - val writer = AvroParquetWriter.builder<GenericData.Record>(Path(traceParquet.toURI())) + val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet)) .withSchema(schema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withPageSize(4 * 1024 * 1024) // For compression |
