From 1b52a443e508bc4130071e67a1a8e17a6714c6b8 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 8 Jun 2021 23:46:07 +0200 Subject: format: Use LocalInputFile for Parquet reader This change updates the format implementations that use Parquet by switching to our InputFile implementation for local files, which eliminates the need for Hadoop's filesystem support. --- opendc-format/build.gradle.kts | 11 +++- .../org/opendc/format/trace/wtf/WtfTraceReader.kt | 76 ++++++++++++---------- .../opendc/format/trace/wtf/WtfTraceReaderTest.kt | 3 +- 3 files changed, 53 insertions(+), 37 deletions(-) (limited to 'opendc-format') diff --git a/opendc-format/build.gradle.kts b/opendc-format/build.gradle.kts index c0ffeb3e..d3c1a59a 100644 --- a/opendc-format/build.gradle.kts +++ b/opendc-format/build.gradle.kts @@ -39,8 +39,15 @@ dependencies { exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect") } implementation(kotlin("reflect")) - implementation(libs.parquet) - implementation(libs.hadoop.client) { + + implementation(libs.parquet) { + exclude(group = "org.apache.hadoop") + } + implementation(libs.hadoop.common) { + exclude(group = "org.slf4j", module = "slf4j-log4j12") + exclude(group = "log4j") + } + implementation(libs.hadoop.mapreduce.client.core) { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") } diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index feadf61f..dde1b340 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -23,15 +23,16 @@ package org.opendc.format.trace.wtf import org.apache.avro.generic.GenericRecord -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetReader import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.format.util.LocalParquetReader import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.workflow.api.Job import org.opendc.workflow.api.Task import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import java.io.File +import java.nio.file.Path import java.util.UUID import kotlin.math.min @@ -41,12 +42,19 @@ import kotlin.math.min * * @param path The path to the trace. */ -public class WtfTraceReader(path: String) : TraceReader { +public class WtfTraceReader(path: Path) : TraceReader { /** * The internal iterator to use for this reader. */ private val iterator: Iterator> + /** + * Construct a [TraceReader] from the specified [path]. + * + * @param path The path to the trace. + */ + public constructor(path: File) : this(path.toPath()) + /** * Initialize the reader. */ @@ -56,43 +64,43 @@ public class WtfTraceReader(path: String) : TraceReader { val tasks = mutableMapOf() val taskDependencies = mutableMapOf>() - @Suppress("DEPRECATION") - val reader = AvroParquetReader.builder(Path(path, "tasks/schema-1.0")).build() + LocalParquetReader(path.resolve("tasks/schema-1.0")).use { reader -> + while (true) { + val nextRecord = reader.read() ?: break - while (true) { - val nextRecord = reader.read() ?: break + val workflowId = nextRecord.get("workflow_id") as Long + val taskId = nextRecord.get("id") as Long + val submitTime = nextRecord.get("ts_submit") as Long + val runtime = nextRecord.get("runtime") as Long + val cores = (nextRecord.get("resource_amount_requested") as Double).toInt() - val workflowId = nextRecord.get("workflow_id") as Long - val taskId = nextRecord.get("id") as Long - val submitTime = nextRecord.get("ts_submit") as Long - val runtime = nextRecord.get("runtime") as Long - val cores = (nextRecord.get("resource_amount_requested") as Double).toInt() - @Suppress("UNCHECKED_CAST") - val dependencies = (nextRecord.get("parents") as ArrayList).map { - it.get("item") as Long - } + @Suppress("UNCHECKED_CAST") + val dependencies = (nextRecord.get("parents") as ArrayList).map { + it.get("item") as Long + } - val flops: Long = 4100 * (runtime / 1000) * cores + val flops: Long = 4100 * (runtime / 1000) * cores - val workflow = workflows.getOrPut(workflowId) { - Job(UUID(0L, workflowId), "", HashSet()) - } - val workload = SimFlopsWorkload(flops) - val task = Task( - UUID(0L, taskId), - "", - HashSet(), - mapOf( - "workload" to workload, - WORKFLOW_TASK_CORES to cores, - WORKFLOW_TASK_DEADLINE to runtime + val workflow = workflows.getOrPut(workflowId) { + Job(UUID(0L, workflowId), "", HashSet()) + } + val workload = SimFlopsWorkload(flops) + val task = Task( + UUID(0L, taskId), + "", + HashSet(), + mapOf( + "workload" to workload, + WORKFLOW_TASK_CORES to cores, + WORKFLOW_TASK_DEADLINE to runtime + ) ) - ) - starts.merge(workflowId, submitTime, ::min) - (workflow.tasks as MutableSet).add(task) - tasks[taskId] = task - taskDependencies[task] = dependencies + starts.merge(workflowId, submitTime, ::min) + (workflow.tasks as MutableSet).add(task) + tasks[taskId] = task + taskDependencies[task] = dependencies + } } // Fix dependencies and dependents for all tasks diff --git a/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt b/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt index bcfa7553..31ae03e0 100644 --- a/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt +++ b/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt @@ -24,6 +24,7 @@ package org.opendc.format.trace.wtf import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test +import java.io.File /** * Test suite for the [WtfTraceReader] class. @@ -34,7 +35,7 @@ class WtfTraceReaderTest { */ @Test fun testParseWtf() { - val reader = WtfTraceReader("src/test/resources/wtf-trace") + val reader = WtfTraceReader(File("src/test/resources/wtf-trace")) var entry = reader.next() assertEquals(0, entry.start) assertEquals(23, entry.workload.tasks.size) -- cgit v1.2.3