From f0f59a0b98fe474da4411c0d5048ccdf4a2d7c43 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 9 Jun 2021 09:48:07 +0200 Subject: exp: Use LocalInputFile for Parquet readers This change updates the Parquet readers used in the Capelin experiments to use our InputFile implementation for local files, to reduce our dependency on Apache Hadoop. --- .../experiments/capelin/trace/Sc20ParquetTraceReader.kt | 1 - .../capelin/trace/Sc20RawParquetTraceReader.kt | 14 +++----------- .../capelin/trace/Sc20StreamingParquetTraceReader.kt | 17 ++++++++--------- 3 files changed, 11 insertions(+), 21 deletions(-) (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt index a8462a51..7f25137e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt @@ -38,7 +38,6 @@ import java.util.TreeSet * @param performanceInterferenceModel The performance model covering the workload in the VM trace. * @param run The run to which this reader belongs. */ -@OptIn(ExperimentalStdlibApi::class) public class Sc20ParquetTraceReader( rawReaders: List, performanceInterferenceModel: Map, diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt index bd27cf02..54151c9f 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt @@ -24,10 +24,9 @@ package org.opendc.experiments.capelin.trace import mu.KotlinLogging import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetReader import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.format.util.LocalParquetReader import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload import java.io.File @@ -40,16 +39,12 @@ private val logger = KotlinLogging.logger {} * * @param path The directory of the traces. */ -@OptIn(ExperimentalStdlibApi::class) public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the fragments into memory. */ private fun parseFragments(path: File): Map> { - @Suppress("DEPRECATION") - val reader = AvroParquetReader.builder(Path(path.absolutePath, "trace.parquet")) - .disableCompatibility() - .build() + val reader = LocalParquetReader(File(path, "trace.parquet")) val fragments = mutableMapOf>() @@ -81,10 +76,7 @@ public class Sc20RawParquetTraceReader(private val path: File) { * Read the metadata into a workload. */ private fun parseMeta(path: File, fragments: Map>): List> { - @Suppress("DEPRECATION") - val metaReader = AvroParquetReader.builder(Path(path.absolutePath, "meta.parquet")) - .disableCompatibility() - .build() + val metaReader = LocalParquetReader(File(path, "meta.parquet")) var counter = 0 val entries = mutableListOf>() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt index c5294b55..6792c2ab 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt @@ -24,7 +24,6 @@ package org.opendc.experiments.capelin.trace import mu.KotlinLogging import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterApi @@ -33,6 +32,7 @@ import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.format.util.LocalInputFile import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload @@ -54,7 +54,6 @@ private val logger = KotlinLogging.logger {} * @param traceFile The directory of the traces. * @param performanceInterferenceModel The performance model covering the workload in the VM trace. */ -@OptIn(ExperimentalStdlibApi::class) public class Sc20StreamingParquetTraceReader( traceFile: File, performanceInterferenceModel: PerformanceInterferenceModel? = null, @@ -96,10 +95,10 @@ public class Sc20StreamingParquetTraceReader( * The thread to read the records in. */ private val readerThread = thread(start = true, name = "sc20-reader") { - @Suppress("DEPRECATION") - val reader = AvroParquetReader.builder(Path(traceFile.absolutePath, "trace.parquet")) + val reader = AvroParquetReader + .builder(LocalInputFile(File(traceFile, "trace.parquet"))) .disableCompatibility() - .run { if (filter != null) withFilter(filter) else this } + .withFilter(filter) .build() try { @@ -164,10 +163,10 @@ public class Sc20StreamingParquetTraceReader( val entries = mutableMapOf() val buffers = mutableMapOf>>() - @Suppress("DEPRECATION") - val metaReader = AvroParquetReader.builder(Path(traceFile.absolutePath, "meta.parquet")) + val metaReader = AvroParquetReader + .builder(LocalInputFile(File(traceFile, "meta.parquet"))) .disableCompatibility() - .run { if (filter != null) withFilter(filter) else this } + .withFilter(filter) .build() while (true) { @@ -178,7 +177,7 @@ public class Sc20StreamingParquetTraceReader( metaReader.close() - val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms + val selection = selectedVms.ifEmpty { entries.keys } // Create the entry iterator iterator = selection.asSequence() -- cgit v1.2.3 From 77075ff4680667da70bdee00be7fa83539a50439 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 9 Jun 2021 12:36:37 +0200 Subject: exp: Use LocalOutputFile for Parquet writers This change updates the Parquet writers used in the Capelin experiments to use our OutputFile implementation for local files, to reduce our dependency on Apache Hadoop. --- .../experiments/capelin/telemetry/parquet/ParquetEventWriter.kt | 5 ++--- .../org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt | 8 +++----- 2 files changed, 5 insertions(+), 8 deletions(-) (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt index 4fa6ae66..d8f7ff75 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt @@ -25,10 +25,10 @@ package org.opendc.experiments.capelin.telemetry.parquet import mu.KotlinLogging import org.apache.avro.Schema import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.experiments.capelin.telemetry.Event +import org.opendc.format.util.LocalOutputFile import java.io.Closeable import java.io.File import java.util.concurrent.ArrayBlockingQueue @@ -52,8 +52,7 @@ public open class ParquetEventWriter( /** * The writer to write the Parquet file. */ - @Suppress("DEPRECATION") - private val writer = AvroParquetWriter.builder(Path(path.absolutePath)) + private val writer = AvroParquetWriter.builder(LocalOutputFile(path)) .withSchema(schema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withPageSize(4 * 1024 * 1024) // For compression diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt index 1f9e289c..d0031a66 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt @@ -38,11 +38,11 @@ import me.tongfei.progressbar.ProgressBar import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.format.trace.sc20.Sc20VmPlacementReader +import org.opendc.format.util.LocalOutputFile import java.io.BufferedReader import java.io.File import java.io.FileReader @@ -109,16 +109,14 @@ public class TraceConverterCli : CliktCommand(name = "trace-converter") { traceParquet.delete() } - @Suppress("DEPRECATION") - val metaWriter = AvroParquetWriter.builder(Path(metaParquet.toURI())) + val metaWriter = AvroParquetWriter.builder(LocalOutputFile(metaParquet)) .withSchema(metaSchema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withPageSize(4 * 1024 * 1024) // For compression .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) .build() - @Suppress("DEPRECATION") - val writer = AvroParquetWriter.builder(Path(traceParquet.toURI())) + val writer = AvroParquetWriter.builder(LocalOutputFile(traceParquet)) .withSchema(schema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withPageSize(4 * 1024 * 1024) // For compression -- cgit v1.2.3 From 0eb4fa604efe4e0b84d69749f688a79c2249c8b3 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 9 Jun 2021 13:38:56 +0200 Subject: build: Eliminate most Hadoop dependencies This change eliminates all Hadoop dependencies that are not necessary for Parquet to work correctly. As a result, the number of dependencies should now be greatly reduced, which in turn leads to less artifacts that need to be retrieved at build time. --- opendc-experiments/opendc-experiments-capelin/build.gradle.kts | 5 ----- opendc-experiments/opendc-experiments-serverless20/build.gradle.kts | 6 ------ opendc-experiments/opendc-experiments-tf20/build.gradle.kts | 5 ----- 3 files changed, 16 deletions(-) (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 0dade513..324cae3e 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -44,10 +44,5 @@ dependencies { implementation(libs.clikt) implementation(libs.parquet) - implementation(libs.hadoop.client) { - exclude(group = "org.slf4j", module = "slf4j-log4j12") - exclude(group = "log4j") - } - testImplementation(libs.log4j.slf4j) } diff --git a/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts b/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts index 88479765..7d68cb3a 100644 --- a/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts @@ -37,10 +37,4 @@ dependencies { implementation(projects.opendcTelemetry.opendcTelemetrySdk) implementation(libs.kotlin.logging) implementation(libs.config) - - implementation(libs.parquet) - implementation(libs.hadoop.client) { - exclude(group = "org.slf4j", module = "slf4j-log4j12") - exclude(group = "log4j") - } } diff --git a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts index 64483bd4..b088045b 100644 --- a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts @@ -38,9 +38,4 @@ dependencies { implementation(projects.opendcUtils) implementation(libs.kotlin.logging) - implementation(libs.parquet) - implementation(libs.hadoop.client) { - exclude(group = "org.slf4j", module = "slf4j-log4j12") - exclude(group = "log4j") - } } -- cgit v1.2.3