summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-06-09 12:36:37 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-06-09 12:43:25 +0200
commit77075ff4680667da70bdee00be7fa83539a50439 (patch)
tree21413cdcd4df4248cbafe48c366074f32ec9165b
parent2837ee439aef908b3fe281b9707dbb961e700f1c (diff)
exp: Use LocalOutputFile for Parquet writers
This change updates the Parquet writers used in the Capelin experiments to use our OutputFile implementation for local files, to reduce our dependency on Apache Hadoop.
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt5
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt8
2 files changed, 5 insertions, 8 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
index 4fa6ae66..d8f7ff75 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
@@ -25,10 +25,10 @@ package org.opendc.experiments.capelin.telemetry.parquet
import mu.KotlinLogging
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.opendc.experiments.capelin.telemetry.Event
+import org.opendc.format.util.LocalOutputFile
import java.io.Closeable
import java.io.File
import java.util.concurrent.ArrayBlockingQueue
@@ -52,8 +52,7 @@ public open class ParquetEventWriter<in T : Event>(
/**
* The writer to write the Parquet file.
*/
- @Suppress("DEPRECATION")
- private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath))
+ private val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path))
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withPageSize(4 * 1024 * 1024) // For compression
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt
index 1f9e289c..d0031a66 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt
@@ -38,11 +38,11 @@ import me.tongfei.progressbar.ProgressBar
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.opendc.format.trace.sc20.Sc20VmPlacementReader
+import org.opendc.format.util.LocalOutputFile
import java.io.BufferedReader
import java.io.File
import java.io.FileReader
@@ -109,16 +109,14 @@ public class TraceConverterCli : CliktCommand(name = "trace-converter") {
traceParquet.delete()
}
- @Suppress("DEPRECATION")
- val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(metaParquet.toURI()))
+ val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet))
.withSchema(metaSchema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withPageSize(4 * 1024 * 1024) // For compression
.withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
.build()
- @Suppress("DEPRECATION")
- val writer = AvroParquetWriter.builder<GenericData.Record>(Path(traceParquet.toURI()))
+ val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet))
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withPageSize(4 * 1024 * 1024) // For compression