summaryrefslogtreecommitdiff
path: root/opendc-format
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-06-08 23:46:07 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-06-09 12:39:05 +0200
commit1b52a443e508bc4130071e67a1a8e17a6714c6b8 (patch)
tree573071cbee5abe0a232cf1a61bfb60a23f5c72cb /opendc-format
parent9097811e0ac6872de3e4ff5f521d8859870b1000 (diff)
format: Use LocalInputFile for Parquet reader
This change updates the format implementations that use Parquet by switching to our InputFile implementation for local files, which eliminates the need for Hadoop's filesystem support.
Diffstat (limited to 'opendc-format')
-rw-r--r--opendc-format/build.gradle.kts11
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt76
-rw-r--r--opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt3
3 files changed, 53 insertions, 37 deletions
diff --git a/opendc-format/build.gradle.kts b/opendc-format/build.gradle.kts
index c0ffeb3e..d3c1a59a 100644
--- a/opendc-format/build.gradle.kts
+++ b/opendc-format/build.gradle.kts
@@ -39,8 +39,15 @@ dependencies {
exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect")
}
implementation(kotlin("reflect"))
- implementation(libs.parquet)
- implementation(libs.hadoop.client) {
+
+ implementation(libs.parquet) {
+ exclude(group = "org.apache.hadoop")
+ }
+ implementation(libs.hadoop.common) {
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ exclude(group = "log4j")
+ }
+ implementation(libs.hadoop.mapreduce.client.core) {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
exclude(group = "log4j")
}
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
index feadf61f..dde1b340 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
@@ -23,15 +23,16 @@
package org.opendc.format.trace.wtf
import org.apache.avro.generic.GenericRecord
-import org.apache.hadoop.fs.Path
-import org.apache.parquet.avro.AvroParquetReader
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
+import org.opendc.format.util.LocalParquetReader
import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.workflow.api.Job
import org.opendc.workflow.api.Task
import org.opendc.workflow.api.WORKFLOW_TASK_CORES
import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
+import java.io.File
+import java.nio.file.Path
import java.util.UUID
import kotlin.math.min
@@ -41,13 +42,20 @@ import kotlin.math.min
*
* @param path The path to the trace.
*/
-public class WtfTraceReader(path: String) : TraceReader<Job> {
+public class WtfTraceReader(path: Path) : TraceReader<Job> {
/**
* The internal iterator to use for this reader.
*/
private val iterator: Iterator<TraceEntry<Job>>
/**
+ * Construct a [TraceReader] from the specified [path].
+ *
+ * @param path The path to the trace.
+ */
+ public constructor(path: File) : this(path.toPath())
+
+ /**
* Initialize the reader.
*/
init {
@@ -56,43 +64,43 @@ public class WtfTraceReader(path: String) : TraceReader<Job> {
val tasks = mutableMapOf<Long, Task>()
val taskDependencies = mutableMapOf<Task, List<Long>>()
- @Suppress("DEPRECATION")
- val reader = AvroParquetReader.builder<GenericRecord>(Path(path, "tasks/schema-1.0")).build()
+ LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")).use { reader ->
+ while (true) {
+ val nextRecord = reader.read() ?: break
- while (true) {
- val nextRecord = reader.read() ?: break
+ val workflowId = nextRecord.get("workflow_id") as Long
+ val taskId = nextRecord.get("id") as Long
+ val submitTime = nextRecord.get("ts_submit") as Long
+ val runtime = nextRecord.get("runtime") as Long
+ val cores = (nextRecord.get("resource_amount_requested") as Double).toInt()
- val workflowId = nextRecord.get("workflow_id") as Long
- val taskId = nextRecord.get("id") as Long
- val submitTime = nextRecord.get("ts_submit") as Long
- val runtime = nextRecord.get("runtime") as Long
- val cores = (nextRecord.get("resource_amount_requested") as Double).toInt()
- @Suppress("UNCHECKED_CAST")
- val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map {
- it.get("item") as Long
- }
+ @Suppress("UNCHECKED_CAST")
+ val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map {
+ it.get("item") as Long
+ }
- val flops: Long = 4100 * (runtime / 1000) * cores
+ val flops: Long = 4100 * (runtime / 1000) * cores
- val workflow = workflows.getOrPut(workflowId) {
- Job(UUID(0L, workflowId), "<unnamed>", HashSet())
- }
- val workload = SimFlopsWorkload(flops)
- val task = Task(
- UUID(0L, taskId),
- "<unnamed>",
- HashSet(),
- mapOf(
- "workload" to workload,
- WORKFLOW_TASK_CORES to cores,
- WORKFLOW_TASK_DEADLINE to runtime
+ val workflow = workflows.getOrPut(workflowId) {
+ Job(UUID(0L, workflowId), "<unnamed>", HashSet())
+ }
+ val workload = SimFlopsWorkload(flops)
+ val task = Task(
+ UUID(0L, taskId),
+ "<unnamed>",
+ HashSet(),
+ mapOf(
+ "workload" to workload,
+ WORKFLOW_TASK_CORES to cores,
+ WORKFLOW_TASK_DEADLINE to runtime
+ )
)
- )
- starts.merge(workflowId, submitTime, ::min)
- (workflow.tasks as MutableSet<Task>).add(task)
- tasks[taskId] = task
- taskDependencies[task] = dependencies
+ starts.merge(workflowId, submitTime, ::min)
+ (workflow.tasks as MutableSet<Task>).add(task)
+ tasks[taskId] = task
+ taskDependencies[task] = dependencies
+ }
}
// Fix dependencies and dependents for all tasks
diff --git a/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt b/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt
index bcfa7553..31ae03e0 100644
--- a/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt
+++ b/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt
@@ -24,6 +24,7 @@ package org.opendc.format.trace.wtf
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
+import java.io.File
/**
* Test suite for the [WtfTraceReader] class.
@@ -34,7 +35,7 @@ class WtfTraceReaderTest {
*/
@Test
fun testParseWtf() {
- val reader = WtfTraceReader("src/test/resources/wtf-trace")
+ val reader = WtfTraceReader(File("src/test/resources/wtf-trace"))
var entry = reader.next()
assertEquals(0, entry.start)
assertEquals(23, entry.workload.tasks.size)