diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-06-08 23:46:07 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-06-09 12:39:05 +0200 |
| commit | 1b52a443e508bc4130071e67a1a8e17a6714c6b8 (patch) | |
| tree | 573071cbee5abe0a232cf1a61bfb60a23f5c72cb | |
| parent | 9097811e0ac6872de3e4ff5f521d8859870b1000 (diff) | |
format: Use LocalInputFile for Parquet reader
This change updates the format implementations that use Parquet by
switching to our InputFile implementation for local files, which
eliminates the need for Hadoop's filesystem support.
4 files changed, 57 insertions, 38 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 1d7fdd3e..4faa0476 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -5,6 +5,7 @@ slf4j = "1.7.30" log4j = "2.14.1" opentelemetry-main = "1.2.0" opentelemetry-metrics = "1.2.0-alpha" +hadoop = "3.3.0" [libraries] kotlinx-coroutines = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version = "1.5.0" } @@ -45,4 +46,6 @@ kotlinx-benchmark-runtime-jvm = { module = "org.jetbrains.kotlinx:kotlinx-benchm # Other mongodb = { module = "org.mongodb:mongodb-driver-sync", version = "4.2.3" } classgraph = { module = "io.github.classgraph:classgraph", version = "4.8.105" } -hadoop-client = { module = "org.apache.hadoop:hadoop-client", version = "3.3.0" } +hadoop-client = { module = "org.apache.hadoop:hadoop-client", version.ref = "hadoop" } +hadoop-common = { module = "org.apache.hadoop:hadoop-common", version.ref = "hadoop" } +hadoop-mapreduce-client-core = { module = "org.apache.hadoop:hadoop-mapreduce-client-core", version.ref = "hadoop" } diff --git a/opendc-format/build.gradle.kts b/opendc-format/build.gradle.kts index c0ffeb3e..d3c1a59a 100644 --- a/opendc-format/build.gradle.kts +++ b/opendc-format/build.gradle.kts @@ -39,8 +39,15 @@ dependencies { exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect") } implementation(kotlin("reflect")) - implementation(libs.parquet) - implementation(libs.hadoop.client) { + + implementation(libs.parquet) { + exclude(group = "org.apache.hadoop") + } + implementation(libs.hadoop.common) { + exclude(group = "org.slf4j", module = "slf4j-log4j12") + exclude(group = "log4j") + } + implementation(libs.hadoop.mapreduce.client.core) { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") } diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index feadf61f..dde1b340 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -23,15 +23,16 @@ package org.opendc.format.trace.wtf import org.apache.avro.generic.GenericRecord -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetReader import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.format.util.LocalParquetReader import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.workflow.api.Job import org.opendc.workflow.api.Task import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import java.io.File +import java.nio.file.Path import java.util.UUID import kotlin.math.min @@ -41,13 +42,20 @@ import kotlin.math.min * * @param path The path to the trace. */ -public class WtfTraceReader(path: String) : TraceReader<Job> { +public class WtfTraceReader(path: Path) : TraceReader<Job> { /** * The internal iterator to use for this reader. */ private val iterator: Iterator<TraceEntry<Job>> /** + * Construct a [TraceReader] from the specified [path]. + * + * @param path The path to the trace. + */ + public constructor(path: File) : this(path.toPath()) + + /** * Initialize the reader. */ init { @@ -56,43 +64,43 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { val tasks = mutableMapOf<Long, Task>() val taskDependencies = mutableMapOf<Task, List<Long>>() - @Suppress("DEPRECATION") - val reader = AvroParquetReader.builder<GenericRecord>(Path(path, "tasks/schema-1.0")).build() + LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")).use { reader -> + while (true) { + val nextRecord = reader.read() ?: break - while (true) { - val nextRecord = reader.read() ?: break + val workflowId = nextRecord.get("workflow_id") as Long + val taskId = nextRecord.get("id") as Long + val submitTime = nextRecord.get("ts_submit") as Long + val runtime = nextRecord.get("runtime") as Long + val cores = (nextRecord.get("resource_amount_requested") as Double).toInt() - val workflowId = nextRecord.get("workflow_id") as Long - val taskId = nextRecord.get("id") as Long - val submitTime = nextRecord.get("ts_submit") as Long - val runtime = nextRecord.get("runtime") as Long - val cores = (nextRecord.get("resource_amount_requested") as Double).toInt() - @Suppress("UNCHECKED_CAST") - val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map { - it.get("item") as Long - } + @Suppress("UNCHECKED_CAST") + val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map { + it.get("item") as Long + } - val flops: Long = 4100 * (runtime / 1000) * cores + val flops: Long = 4100 * (runtime / 1000) * cores - val workflow = workflows.getOrPut(workflowId) { - Job(UUID(0L, workflowId), "<unnamed>", HashSet()) - } - val workload = SimFlopsWorkload(flops) - val task = Task( - UUID(0L, taskId), - "<unnamed>", - HashSet(), - mapOf( - "workload" to workload, - WORKFLOW_TASK_CORES to cores, - WORKFLOW_TASK_DEADLINE to runtime + val workflow = workflows.getOrPut(workflowId) { + Job(UUID(0L, workflowId), "<unnamed>", HashSet()) + } + val workload = SimFlopsWorkload(flops) + val task = Task( + UUID(0L, taskId), + "<unnamed>", + HashSet(), + mapOf( + "workload" to workload, + WORKFLOW_TASK_CORES to cores, + WORKFLOW_TASK_DEADLINE to runtime + ) ) - ) - starts.merge(workflowId, submitTime, ::min) - (workflow.tasks as MutableSet<Task>).add(task) - tasks[taskId] = task - taskDependencies[task] = dependencies + starts.merge(workflowId, submitTime, ::min) + (workflow.tasks as MutableSet<Task>).add(task) + tasks[taskId] = task + taskDependencies[task] = dependencies + } } // Fix dependencies and dependents for all tasks diff --git a/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt b/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt index bcfa7553..31ae03e0 100644 --- a/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt +++ b/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt @@ -24,6 +24,7 @@ package org.opendc.format.trace.wtf import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test +import java.io.File /** * Test suite for the [WtfTraceReader] class. @@ -34,7 +35,7 @@ class WtfTraceReaderTest { */ @Test fun testParseWtf() { - val reader = WtfTraceReader("src/test/resources/wtf-trace") + val reader = WtfTraceReader(File("src/test/resources/wtf-trace")) var entry = reader.next() assertEquals(0, entry.start) assertEquals(23, entry.workload.tasks.size) |
