summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt1
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt14
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt17
3 files changed, 11 insertions, 21 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
index a8462a51..7f25137e 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
@@ -38,7 +38,6 @@ import java.util.TreeSet
* @param performanceInterferenceModel The performance model covering the workload in the VM trace.
* @param run The run to which this reader belongs.
*/
-@OptIn(ExperimentalStdlibApi::class)
public class Sc20ParquetTraceReader(
rawReaders: List<Sc20RawParquetTraceReader>,
performanceInterferenceModel: Map<String, PerformanceInterferenceModel>,
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
index bd27cf02..54151c9f 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
@@ -24,10 +24,9 @@ package org.opendc.experiments.capelin.trace
import mu.KotlinLogging
import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
-import org.apache.parquet.avro.AvroParquetReader
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
+import org.opendc.format.util.LocalParquetReader
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import java.io.File
@@ -40,16 +39,12 @@ private val logger = KotlinLogging.logger {}
*
* @param path The directory of the traces.
*/
-@OptIn(ExperimentalStdlibApi::class)
public class Sc20RawParquetTraceReader(private val path: File) {
/**
* Read the fragments into memory.
*/
private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> {
- @Suppress("DEPRECATION")
- val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet"))
- .disableCompatibility()
- .build()
+ val reader = LocalParquetReader<GenericData.Record>(File(path, "trace.parquet"))
val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>()
@@ -81,10 +76,7 @@ public class Sc20RawParquetTraceReader(private val path: File) {
* Read the metadata into a workload.
*/
private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
- @Suppress("DEPRECATION")
- val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet"))
- .disableCompatibility()
- .build()
+ val metaReader = LocalParquetReader<GenericData.Record>(File(path, "meta.parquet"))
var counter = 0
val entries = mutableListOf<TraceEntry<SimWorkload>>()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
index c5294b55..6792c2ab 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
@@ -24,7 +24,6 @@ package org.opendc.experiments.capelin.trace
import mu.KotlinLogging
import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
@@ -33,6 +32,7 @@ import org.apache.parquet.filter2.predicate.UserDefinedPredicate
import org.apache.parquet.io.api.Binary
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
+import org.opendc.format.util.LocalInputFile
import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
@@ -54,7 +54,6 @@ private val logger = KotlinLogging.logger {}
* @param traceFile The directory of the traces.
* @param performanceInterferenceModel The performance model covering the workload in the VM trace.
*/
-@OptIn(ExperimentalStdlibApi::class)
public class Sc20StreamingParquetTraceReader(
traceFile: File,
performanceInterferenceModel: PerformanceInterferenceModel? = null,
@@ -96,10 +95,10 @@ public class Sc20StreamingParquetTraceReader(
* The thread to read the records in.
*/
private val readerThread = thread(start = true, name = "sc20-reader") {
- @Suppress("DEPRECATION")
- val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet"))
+ val reader = AvroParquetReader
+ .builder<GenericData.Record>(LocalInputFile(File(traceFile, "trace.parquet")))
.disableCompatibility()
- .run { if (filter != null) withFilter(filter) else this }
+ .withFilter(filter)
.build()
try {
@@ -164,10 +163,10 @@ public class Sc20StreamingParquetTraceReader(
val entries = mutableMapOf<String, GenericData.Record>()
val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>()
- @Suppress("DEPRECATION")
- val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet"))
+ val metaReader = AvroParquetReader
+ .builder<GenericData.Record>(LocalInputFile(File(traceFile, "meta.parquet")))
.disableCompatibility()
- .run { if (filter != null) withFilter(filter) else this }
+ .withFilter(filter)
.build()
while (true) {
@@ -178,7 +177,7 @@ public class Sc20StreamingParquetTraceReader(
metaReader.close()
- val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms
+ val selection = selectedVms.ifEmpty { entries.keys }
// Create the entry iterator
iterator = selection.asSequence()