summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-06-09 09:48:07 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-06-09 12:42:23 +0200
commitf0f59a0b98fe474da4411c0d5048ccdf4a2d7c43 (patch)
treeb9a8de23bac3a764d24a348b8d2d548b0d6b4298 /opendc-experiments/opendc-experiments-capelin
parent1b52a443e508bc4130071e67a1a8e17a6714c6b8 (diff)
exp: Use LocalInputFile for Parquet readers
This change updates the Parquet readers used in the Capelin experiments to use our InputFile implementation for local files, to reduce our dependency on Apache Hadoop.
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt1
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt14
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt17
3 files changed, 11 insertions, 21 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
index a8462a51..7f25137e 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
@@ -38,7 +38,6 @@ import java.util.TreeSet
* @param performanceInterferenceModel The performance model covering the workload in the VM trace.
* @param run The run to which this reader belongs.
*/
-@OptIn(ExperimentalStdlibApi::class)
public class Sc20ParquetTraceReader(
rawReaders: List<Sc20RawParquetTraceReader>,
performanceInterferenceModel: Map<String, PerformanceInterferenceModel>,
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
index bd27cf02..54151c9f 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
@@ -24,10 +24,9 @@ package org.opendc.experiments.capelin.trace
import mu.KotlinLogging
import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
-import org.apache.parquet.avro.AvroParquetReader
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
+import org.opendc.format.util.LocalParquetReader
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import java.io.File
@@ -40,16 +39,12 @@ private val logger = KotlinLogging.logger {}
*
* @param path The directory of the traces.
*/
-@OptIn(ExperimentalStdlibApi::class)
public class Sc20RawParquetTraceReader(private val path: File) {
/**
* Read the fragments into memory.
*/
private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> {
- @Suppress("DEPRECATION")
- val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet"))
- .disableCompatibility()
- .build()
+ val reader = LocalParquetReader<GenericData.Record>(File(path, "trace.parquet"))
val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>()
@@ -81,10 +76,7 @@ public class Sc20RawParquetTraceReader(private val path: File) {
* Read the metadata into a workload.
*/
private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
- @Suppress("DEPRECATION")
- val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet"))
- .disableCompatibility()
- .build()
+ val metaReader = LocalParquetReader<GenericData.Record>(File(path, "meta.parquet"))
var counter = 0
val entries = mutableListOf<TraceEntry<SimWorkload>>()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
index c5294b55..6792c2ab 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
@@ -24,7 +24,6 @@ package org.opendc.experiments.capelin.trace
import mu.KotlinLogging
import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
@@ -33,6 +32,7 @@ import org.apache.parquet.filter2.predicate.UserDefinedPredicate
import org.apache.parquet.io.api.Binary
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
+import org.opendc.format.util.LocalInputFile
import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
@@ -54,7 +54,6 @@ private val logger = KotlinLogging.logger {}
* @param traceFile The directory of the traces.
* @param performanceInterferenceModel The performance model covering the workload in the VM trace.
*/
-@OptIn(ExperimentalStdlibApi::class)
public class Sc20StreamingParquetTraceReader(
traceFile: File,
performanceInterferenceModel: PerformanceInterferenceModel? = null,
@@ -96,10 +95,10 @@ public class Sc20StreamingParquetTraceReader(
* The thread to read the records in.
*/
private val readerThread = thread(start = true, name = "sc20-reader") {
- @Suppress("DEPRECATION")
- val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet"))
+ val reader = AvroParquetReader
+ .builder<GenericData.Record>(LocalInputFile(File(traceFile, "trace.parquet")))
.disableCompatibility()
- .run { if (filter != null) withFilter(filter) else this }
+ .withFilter(filter)
.build()
try {
@@ -164,10 +163,10 @@ public class Sc20StreamingParquetTraceReader(
val entries = mutableMapOf<String, GenericData.Record>()
val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>()
- @Suppress("DEPRECATION")
- val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet"))
+ val metaReader = AvroParquetReader
+ .builder<GenericData.Record>(LocalInputFile(File(traceFile, "meta.parquet")))
.disableCompatibility()
- .run { if (filter != null) withFilter(filter) else this }
+ .withFilter(filter)
.build()
while (true) {
@@ -178,7 +177,7 @@ public class Sc20StreamingParquetTraceReader(
metaReader.close()
- val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms
+ val selection = selectedVms.ifEmpty { entries.keys }
// Create the entry iterator
iterator = selection.asSequence()