summaryrefslogtreecommitdiff
path: root/opendc-experiments
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-06-09 14:16:10 +0200
committerGitHub <noreply@github.com>2021-06-09 14:16:10 +0200
commite2f6cd904ccd8018b65ff897181388ae3f02ae6f (patch)
tree96efef3c0aa511093c793dae6ea648448510bb68 /opendc-experiments
parent38ef2cabbecf694f66fa3bd5e69b9431c56a3f8d (diff)
parent0eb4fa604efe4e0b84d69749f688a79c2249c8b3 (diff)
Remove dependency on Hadoop
This pull request attempts to remove the dependency of the simulator on Apache Hadoop which is pulled in as a consequence of using parquet-mr. The reason for removal is that Apache Hadoop does not work natively on Windows without user intervention, which makes adoption on this platform more difficult. * Add Windows as CI target for the OpenDC simulator * Use `LocalInputFile` for Parquet reader usages * Use `LocalOutputFile` for Parquet writer usages * Remove Apache Hadoop as dependency of OpenDC.
Diffstat (limited to 'opendc-experiments')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/build.gradle.kts5
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt5
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt1
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt14
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt17
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt8
-rw-r--r--opendc-experiments/opendc-experiments-serverless20/build.gradle.kts6
-rw-r--r--opendc-experiments/opendc-experiments-tf20/build.gradle.kts5
8 files changed, 16 insertions, 45 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 0dade513..324cae3e 100644
--- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -44,10 +44,5 @@ dependencies {
implementation(libs.clikt)
implementation(libs.parquet)
- implementation(libs.hadoop.client) {
- exclude(group = "org.slf4j", module = "slf4j-log4j12")
- exclude(group = "log4j")
- }
-
testImplementation(libs.log4j.slf4j)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
index 4fa6ae66..d8f7ff75 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/telemetry/parquet/ParquetEventWriter.kt
@@ -25,10 +25,10 @@ package org.opendc.experiments.capelin.telemetry.parquet
import mu.KotlinLogging
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.opendc.experiments.capelin.telemetry.Event
+import org.opendc.format.util.LocalOutputFile
import java.io.Closeable
import java.io.File
import java.util.concurrent.ArrayBlockingQueue
@@ -52,8 +52,7 @@ public open class ParquetEventWriter<in T : Event>(
/**
* The writer to write the Parquet file.
*/
- @Suppress("DEPRECATION")
- private val writer = AvroParquetWriter.builder<GenericData.Record>(Path(path.absolutePath))
+ private val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path))
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withPageSize(4 * 1024 * 1024) // For compression
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
index a8462a51..7f25137e 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20ParquetTraceReader.kt
@@ -38,7 +38,6 @@ import java.util.TreeSet
* @param performanceInterferenceModel The performance model covering the workload in the VM trace.
* @param run The run to which this reader belongs.
*/
-@OptIn(ExperimentalStdlibApi::class)
public class Sc20ParquetTraceReader(
rawReaders: List<Sc20RawParquetTraceReader>,
performanceInterferenceModel: Map<String, PerformanceInterferenceModel>,
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
index bd27cf02..54151c9f 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20RawParquetTraceReader.kt
@@ -24,10 +24,9 @@ package org.opendc.experiments.capelin.trace
import mu.KotlinLogging
import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
-import org.apache.parquet.avro.AvroParquetReader
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
+import org.opendc.format.util.LocalParquetReader
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import java.io.File
@@ -40,16 +39,12 @@ private val logger = KotlinLogging.logger {}
*
* @param path The directory of the traces.
*/
-@OptIn(ExperimentalStdlibApi::class)
public class Sc20RawParquetTraceReader(private val path: File) {
/**
* Read the fragments into memory.
*/
private fun parseFragments(path: File): Map<String, List<SimTraceWorkload.Fragment>> {
- @Suppress("DEPRECATION")
- val reader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "trace.parquet"))
- .disableCompatibility()
- .build()
+ val reader = LocalParquetReader<GenericData.Record>(File(path, "trace.parquet"))
val fragments = mutableMapOf<String, MutableList<SimTraceWorkload.Fragment>>()
@@ -81,10 +76,7 @@ public class Sc20RawParquetTraceReader(private val path: File) {
* Read the metadata into a workload.
*/
private fun parseMeta(path: File, fragments: Map<String, List<SimTraceWorkload.Fragment>>): List<TraceEntry<SimWorkload>> {
- @Suppress("DEPRECATION")
- val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(path.absolutePath, "meta.parquet"))
- .disableCompatibility()
- .build()
+ val metaReader = LocalParquetReader<GenericData.Record>(File(path, "meta.parquet"))
var counter = 0
val entries = mutableListOf<TraceEntry<SimWorkload>>()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
index c5294b55..6792c2ab 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20StreamingParquetTraceReader.kt
@@ -24,7 +24,6 @@ package org.opendc.experiments.capelin.trace
import mu.KotlinLogging
import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
@@ -33,6 +32,7 @@ import org.apache.parquet.filter2.predicate.UserDefinedPredicate
import org.apache.parquet.io.api.Binary
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
+import org.opendc.format.util.LocalInputFile
import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL
import org.opendc.simulator.compute.interference.PerformanceInterferenceModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
@@ -54,7 +54,6 @@ private val logger = KotlinLogging.logger {}
* @param traceFile The directory of the traces.
* @param performanceInterferenceModel The performance model covering the workload in the VM trace.
*/
-@OptIn(ExperimentalStdlibApi::class)
public class Sc20StreamingParquetTraceReader(
traceFile: File,
performanceInterferenceModel: PerformanceInterferenceModel? = null,
@@ -96,10 +95,10 @@ public class Sc20StreamingParquetTraceReader(
* The thread to read the records in.
*/
private val readerThread = thread(start = true, name = "sc20-reader") {
- @Suppress("DEPRECATION")
- val reader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "trace.parquet"))
+ val reader = AvroParquetReader
+ .builder<GenericData.Record>(LocalInputFile(File(traceFile, "trace.parquet")))
.disableCompatibility()
- .run { if (filter != null) withFilter(filter) else this }
+ .withFilter(filter)
.build()
try {
@@ -164,10 +163,10 @@ public class Sc20StreamingParquetTraceReader(
val entries = mutableMapOf<String, GenericData.Record>()
val buffers = mutableMapOf<String, MutableList<MutableList<SimTraceWorkload.Fragment>>>()
- @Suppress("DEPRECATION")
- val metaReader = AvroParquetReader.builder<GenericData.Record>(Path(traceFile.absolutePath, "meta.parquet"))
+ val metaReader = AvroParquetReader
+ .builder<GenericData.Record>(LocalInputFile(File(traceFile, "meta.parquet")))
.disableCompatibility()
- .run { if (filter != null) withFilter(filter) else this }
+ .withFilter(filter)
.build()
while (true) {
@@ -178,7 +177,7 @@ public class Sc20StreamingParquetTraceReader(
metaReader.close()
- val selection = if (selectedVms.isEmpty()) entries.keys else selectedVms
+ val selection = selectedVms.ifEmpty { entries.keys }
// Create the entry iterator
iterator = selection.asSequence()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt
index 1f9e289c..d0031a66 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/Sc20TraceConverter.kt
@@ -38,11 +38,11 @@ import me.tongfei.progressbar.ProgressBar
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
-import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.opendc.format.trace.sc20.Sc20VmPlacementReader
+import org.opendc.format.util.LocalOutputFile
import java.io.BufferedReader
import java.io.File
import java.io.FileReader
@@ -109,16 +109,14 @@ public class TraceConverterCli : CliktCommand(name = "trace-converter") {
traceParquet.delete()
}
- @Suppress("DEPRECATION")
- val metaWriter = AvroParquetWriter.builder<GenericData.Record>(Path(metaParquet.toURI()))
+ val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet))
.withSchema(metaSchema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withPageSize(4 * 1024 * 1024) // For compression
.withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
.build()
- @Suppress("DEPRECATION")
- val writer = AvroParquetWriter.builder<GenericData.Record>(Path(traceParquet.toURI()))
+ val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet))
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withPageSize(4 * 1024 * 1024) // For compression
diff --git a/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts b/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts
index 88479765..7d68cb3a 100644
--- a/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts
@@ -37,10 +37,4 @@ dependencies {
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
implementation(libs.kotlin.logging)
implementation(libs.config)
-
- implementation(libs.parquet)
- implementation(libs.hadoop.client) {
- exclude(group = "org.slf4j", module = "slf4j-log4j12")
- exclude(group = "log4j")
- }
}
diff --git a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts
index 64483bd4..b088045b 100644
--- a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts
@@ -38,9 +38,4 @@ dependencies {
implementation(projects.opendcUtils)
implementation(libs.kotlin.logging)
- implementation(libs.parquet)
- implementation(libs.hadoop.client) {
- exclude(group = "org.slf4j", module = "slf4j-log4j12")
- exclude(group = "log4j")
- }
}