diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-06-08 23:46:07 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-06-09 12:39:05 +0200 |
| commit | 1b52a443e508bc4130071e67a1a8e17a6714c6b8 (patch) | |
| tree | 573071cbee5abe0a232cf1a61bfb60a23f5c72cb /opendc-format/src/main | |
| parent | 9097811e0ac6872de3e4ff5f521d8859870b1000 (diff) | |
format: Use LocalInputFile for Parquet reader
This change updates the format implementations that use Parquet by
switching to our InputFile implementation for local files, which
eliminates the need for Hadoop's filesystem support.
Diffstat (limited to 'opendc-format/src/main')
| -rw-r--r-- | opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt | 76 |
1 files changed, 42 insertions, 34 deletions
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index feadf61f..dde1b340 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -23,15 +23,16 @@ package org.opendc.format.trace.wtf import org.apache.avro.generic.GenericRecord -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetReader import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.format.util.LocalParquetReader import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.workflow.api.Job import org.opendc.workflow.api.Task import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import java.io.File +import java.nio.file.Path import java.util.UUID import kotlin.math.min @@ -41,13 +42,20 @@ import kotlin.math.min * * @param path The path to the trace. */ -public class WtfTraceReader(path: String) : TraceReader<Job> { +public class WtfTraceReader(path: Path) : TraceReader<Job> { /** * The internal iterator to use for this reader. */ private val iterator: Iterator<TraceEntry<Job>> /** + * Construct a [TraceReader] from the specified [path]. + * + * @param path The path to the trace. + */ + public constructor(path: File) : this(path.toPath()) + + /** * Initialize the reader. */ init { @@ -56,43 +64,43 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { val tasks = mutableMapOf<Long, Task>() val taskDependencies = mutableMapOf<Task, List<Long>>() - @Suppress("DEPRECATION") - val reader = AvroParquetReader.builder<GenericRecord>(Path(path, "tasks/schema-1.0")).build() + LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")).use { reader -> + while (true) { + val nextRecord = reader.read() ?: break - while (true) { - val nextRecord = reader.read() ?: break + val workflowId = nextRecord.get("workflow_id") as Long + val taskId = nextRecord.get("id") as Long + val submitTime = nextRecord.get("ts_submit") as Long + val runtime = nextRecord.get("runtime") as Long + val cores = (nextRecord.get("resource_amount_requested") as Double).toInt() - val workflowId = nextRecord.get("workflow_id") as Long - val taskId = nextRecord.get("id") as Long - val submitTime = nextRecord.get("ts_submit") as Long - val runtime = nextRecord.get("runtime") as Long - val cores = (nextRecord.get("resource_amount_requested") as Double).toInt() - @Suppress("UNCHECKED_CAST") - val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map { - it.get("item") as Long - } + @Suppress("UNCHECKED_CAST") + val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map { + it.get("item") as Long + } - val flops: Long = 4100 * (runtime / 1000) * cores + val flops: Long = 4100 * (runtime / 1000) * cores - val workflow = workflows.getOrPut(workflowId) { - Job(UUID(0L, workflowId), "<unnamed>", HashSet()) - } - val workload = SimFlopsWorkload(flops) - val task = Task( - UUID(0L, taskId), - "<unnamed>", - HashSet(), - mapOf( - "workload" to workload, - WORKFLOW_TASK_CORES to cores, - WORKFLOW_TASK_DEADLINE to runtime + val workflow = workflows.getOrPut(workflowId) { + Job(UUID(0L, workflowId), "<unnamed>", HashSet()) + } + val workload = SimFlopsWorkload(flops) + val task = Task( + UUID(0L, taskId), + "<unnamed>", + HashSet(), + mapOf( + "workload" to workload, + WORKFLOW_TASK_CORES to cores, + WORKFLOW_TASK_DEADLINE to runtime + ) ) - ) - starts.merge(workflowId, submitTime, ::min) - (workflow.tasks as MutableSet<Task>).add(task) - tasks[taskId] = task - taskDependencies[task] = dependencies + starts.merge(workflowId, submitTime, ::min) + (workflow.tasks as MutableSet<Task>).add(task) + tasks[taskId] = task + taskDependencies[task] = dependencies + } } // Fix dependencies and dependents for all tasks |
