From 77075ff4680667da70bdee00be7fa83539a50439 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 9 Jun 2021 12:36:37 +0200 Subject: exp: Use LocalOutputFile for Parquet writers This change updates the Parquet writers used in the Capelin experiments to use our OutputFile implementation for local files, to reduce our dependency on Apache Hadoop. --- .../experiments/capelin/telemetry/parquet/ParquetEventWriter.kt | 5 ++--- .../org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt | 8 +++----- 2 files changed, 5 insertions(+), 8 deletions(-) (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt index 4fa6ae66..d8f7ff75 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt @@ -25,10 +25,10 @@ package org.opendc.experiments.capelin.telemetry.parquet import mu.KotlinLogging import org.apache.avro.Schema import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.experiments.capelin.telemetry.Event +import org.opendc.format.util.LocalOutputFile import java.io.Closeable import java.io.File import java.util.concurrent.ArrayBlockingQueue @@ -52,8 +52,7 @@ public open class ParquetEventWriter( /** * The writer to write the Parquet file. */ - @Suppress("DEPRECATION") - private val writer = AvroParquetWriter.builder(Path(path.absolutePath)) + private val writer = AvroParquetWriter.builder(LocalOutputFile(path)) .withSchema(schema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withPageSize(4 * 1024 * 1024) // For compression diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt index 1f9e289c..d0031a66 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt @@ -38,11 +38,11 @@ import me.tongfei.progressbar.ProgressBar import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.format.trace.sc20.Sc20VmPlacementReader +import org.opendc.format.util.LocalOutputFile import java.io.BufferedReader import java.io.File import java.io.FileReader @@ -109,16 +109,14 @@ public class TraceConverterCli : CliktCommand(name = "trace-converter") { traceParquet.delete() } - @Suppress("DEPRECATION") - val metaWriter = AvroParquetWriter.builder(Path(metaParquet.toURI())) + val metaWriter = AvroParquetWriter.builder(LocalOutputFile(metaParquet)) .withSchema(metaSchema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withPageSize(4 * 1024 * 1024) // For compression .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) .build() - @Suppress("DEPRECATION") - val writer = AvroParquetWriter.builder(Path(traceParquet.toURI())) + val writer = AvroParquetWriter.builder(LocalOutputFile(traceParquet)) .withSchema(schema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withPageSize(4 * 1024 * 1024) // For compression -- cgit v1.2.3