diff options
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin')
3 files changed, 11 insertions, 21 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt index a8462a51..7f25137e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt @@ -38,7 +38,6 @@ import java.util.TreeSet * @param performanceInterferenceModel The performance model covering the workload in the VM trace. * @param run The run to which this reader belongs. */ -@OptIn(ExperimentalStdlibApi::class) public class Sc20ParquetTraceReader( rawReaders: List<Sc20RawParquetTraceReader>, performanceInterferenceModel: Map<String, PerformanceInterferenceModel>, diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt index bd27cf02..54151c9f 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt @@ -24,10 +24,9 @@ package org.opendc.experiments.capelin.trace import mu.KotlinLogging import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetReader import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.format.util.LocalParquetReader import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload import java.io.File @@ -40,16 +39,12 @@ private val logger = KotlinLogging.logger {} * * @param path The directory of the traces. */ -@OptIn(ExperimentalStdlibApi::class) public class Sc20RawParquetTraceReader(private val path: File) { /** * Read the fragments into memory. */ private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> { - @Suppress("DEPRECATION") - val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet")) - .disableCompatibility() - .build() + val reader = LocalParquetReader<GenericData.Record>(File(path, "trace.parquet")) val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>() @@ -81,10 +76,7 @@ public class Sc20RawParquetTraceReader(private val path: File) { * Read the metadata into a workload. */ private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> { - @Suppress("DEPRECATION") - val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet")) - .disableCompatibility() - .build() + val metaReader = LocalParquetReader<GenericData.Record>(File(path, "meta.parquet")) var counter = 0 val entries = mutableListOf<TraceEntry<SimWorkload>>() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt index c5294b55..6792c2ab 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt @@ -24,7 +24,6 @@ package org.opendc.experiments.capelin.trace import mu.KotlinLogging import org.apache.avro.generic.GenericData -import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterApi @@ -33,6 +32,7 @@ import org.apache.parquet.filter2.predicate.UserDefinedPredicate import org.apache.parquet.io.api.Binary import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.format.util.LocalInputFile import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload @@ -54,7 +54,6 @@ private val logger = KotlinLogging.logger {} * @param traceFile The directory of the traces. * @param performanceInterferenceModel The performance model covering the workload in the VM trace. */ -@OptIn(ExperimentalStdlibApi::class) public class Sc20StreamingParquetTraceReader( traceFile: File, performanceInterferenceModel: PerformanceInterferenceModel? = null, @@ -96,10 +95,10 @@ public class Sc20StreamingParquetTraceReader( * The thread to read the records in. */ private val readerThread = thread(start = true, name = "sc20-reader") { - @Suppress("DEPRECATION") - val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet")) + val reader = AvroParquetReader + .builder<GenericData.Record>(LocalInputFile(File(traceFile, "trace.parquet"))) .disableCompatibility() - .run { if (filter != null) withFilter(filter) else this } + .withFilter(filter) .build() try { @@ -164,10 +163,10 @@ public class Sc20StreamingParquetTraceReader( val entries = mutableMapOf<String, GenericData.Record>() val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>() - @Suppress("DEPRECATION") - val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet")) + val metaReader = AvroParquetReader + .builder<GenericData.Record>(LocalInputFile(File(traceFile, "meta.parquet"))) .disableCompatibility() - .run { if (filter != null) withFilter(filter) else this } + .withFilter(filter) .build() while (true) { @@ -178,7 +177,7 @@ public class Sc20StreamingParquetTraceReader( metaReader.close() - val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms + val selection = selectedVms.ifEmpty { entries.keys } // Create the entry iterator iterator = selection.asSequence() |
