From f0f59a0b98fe474da4411c0d5048ccdf4a2d7c43 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 9 Jun 2021 09:48:07 +0200 Subject: exp: Use LocalInputFile for Parquet readers This change updates the Parquet readers used in the Capelin experiments to use our InputFile implementation for local files, to reduce our dependency on Apache Hadoop. --- .../experiments/capelin/trace/Sc20ParquetTraceReader.kt | 1 - .../capelin/trace/Sc20RawParquetTraceReader.kt | 14 +++----------- .../capelin/trace/Sc20StreamingParquetTraceReader.kt | 17 ++++++++--------- 3 files changed, 11 insertions(+), 21 deletions(-) (limited to 'opendc-experiments/opendc-experiments-capelin') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt index a8462a51..7f25137e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt @@ -38,7 +38,6 @@ import java.util.TreeSet * @param performanceInterferenceModel The performance model covering the workload in the VM trace. * @param run The run to which this reader belongs. */ -@OptIn(ExperimentalStdlibApi::class) public class Sc20ParquetTraceReader( rawReaders: List, performanceInterferenceModel: Map, diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt index bd27cf02..54151c9f 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt @@ -24,10 +24,9 @@ package org.opendc.experiments.capelin.trace import mu.KotlinLogging import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetReader import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.format.util.LocalParquetReader import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload import java.io.File @@ -40,16 +39,12 @@ private val logger = KotlinLogging.logger {} * * @param path The directory of the traces. */ -@OptIn(ExperimentalStdlibApi::class) public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the fragments into memory. */ private fun parseFragments(path: File): Map> { - @Suppress("DEPRECATION") - val reader = AvroParquetReader.builder(Path(path.absolutePath, "trace.parquet")) - .disableCompatibility() - .build() + val reader = LocalParquetReader(File(path, "trace.parquet")) val fragments = mutableMapOf>() @@ -81,10 +76,7 @@ public class Sc20RawParquetTraceReader(private val path: File) { * Read the metadata into a workload. */ private fun parseMeta(path: File, fragments: Map>): List> { - @Suppress("DEPRECATION") - val metaReader = AvroParquetReader.builder(Path(path.absolutePath, "meta.parquet")) - .disableCompatibility() - .build() + val metaReader = LocalParquetReader(File(path, "meta.parquet")) var counter = 0 val entries = mutableListOf>() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt index c5294b55..6792c2ab 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt @@ -24,7 +24,6 @@ package org.opendc.experiments.capelin.trace import mu.KotlinLogging import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterApi @@ -33,6 +32,7 @@ import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.format.util.LocalInputFile import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload @@ -54,7 +54,6 @@ private val logger = KotlinLogging.logger {} * @param traceFile The directory of the traces. * @param performanceInterferenceModel The performance model covering the workload in the VM trace. */ -@OptIn(ExperimentalStdlibApi::class) public class Sc20StreamingParquetTraceReader( traceFile: File, performanceInterferenceModel: PerformanceInterferenceModel? = null, @@ -96,10 +95,10 @@ public class Sc20StreamingParquetTraceReader( * The thread to read the records in. */ private val readerThread = thread(start = true, name = "sc20-reader") { - @Suppress("DEPRECATION") - val reader = AvroParquetReader.builder(Path(traceFile.absolutePath, "trace.parquet")) + val reader = AvroParquetReader + .builder(LocalInputFile(File(traceFile, "trace.parquet"))) .disableCompatibility() - .run { if (filter != null) withFilter(filter) else this } + .withFilter(filter) .build() try { @@ -164,10 +163,10 @@ public class Sc20StreamingParquetTraceReader( val entries = mutableMapOf() val buffers = mutableMapOf>>() - @Suppress("DEPRECATION") - val metaReader = AvroParquetReader.builder(Path(traceFile.absolutePath, "meta.parquet")) + val metaReader = AvroParquetReader + .builder(LocalInputFile(File(traceFile, "meta.parquet"))) .disableCompatibility() - .run { if (filter != null) withFilter(filter) else this } + .withFilter(filter) .build() while (true) { @@ -178,7 +177,7 @@ public class Sc20StreamingParquetTraceReader( metaReader.close() - val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms + val selection = selectedVms.ifEmpty { entries.keys } // Create the entry iterator iterator = selection.asSequence() -- cgit v1.2.3