diff options
Diffstat (limited to 'opendc-format/src/main')
| -rw-r--r-- | opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt | 76 |
1 files changed, 42 insertions, 34 deletions
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index feadf61f..dde1b340 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -23,15 +23,16 @@ package org.opendc.format.trace.wtf import org.apache.avro.generic.GenericRecord -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetReader import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.format.util.LocalParquetReader import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.workflow.api.Job import org.opendc.workflow.api.Task import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import java.io.File +import java.nio.file.Path import java.util.UUID import kotlin.math.min @@ -41,13 +42,20 @@ import kotlin.math.min * * @param path The path to the trace. */ -public class WtfTraceReader(path: String) : TraceReader<Job> { +public class WtfTraceReader(path: Path) : TraceReader<Job> { /** * The internal iterator to use for this reader. */ private val iterator: Iterator<TraceEntry<Job>> /** + * Construct a [TraceReader] from the specified [path]. + * + * @param path The path to the trace. + */ + public constructor(path: File) : this(path.toPath()) + + /** * Initialize the reader. */ init { @@ -56,43 +64,43 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { val tasks = mutableMapOf<Long, Task>() val taskDependencies = mutableMapOf<Task, List<Long>>() - @Suppress("DEPRECATION") - val reader = AvroParquetReader.builder<GenericRecord>(Path(path, "tasks/schema-1.0")).build() + LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")).use { reader -> + while (true) { + val nextRecord = reader.read() ?: break - while (true) { - val nextRecord = reader.read() ?: break + val workflowId = nextRecord.get("workflow_id") as Long + val taskId = nextRecord.get("id") as Long + val submitTime = nextRecord.get("ts_submit") as Long + val runtime = nextRecord.get("runtime") as Long + val cores = (nextRecord.get("resource_amount_requested") as Double).toInt() - val workflowId = nextRecord.get("workflow_id") as Long - val taskId = nextRecord.get("id") as Long - val submitTime = nextRecord.get("ts_submit") as Long - val runtime = nextRecord.get("runtime") as Long - val cores = (nextRecord.get("resource_amount_requested") as Double).toInt() - @Suppress("UNCHECKED_CAST") - val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map { - it.get("item") as Long - } + @Suppress("UNCHECKED_CAST") + val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map { + it.get("item") as Long + } - val flops: Long = 4100 * (runtime / 1000) * cores + val flops: Long = 4100 * (runtime / 1000) * cores - val workflow = workflows.getOrPut(workflowId) { - Job(UUID(0L, workflowId), "<unnamed>", HashSet()) - } - val workload = SimFlopsWorkload(flops) - val task = Task( - UUID(0L, taskId), - "<unnamed>", - HashSet(), - mapOf( - "workload" to workload, - WORKFLOW_TASK_CORES to cores, - WORKFLOW_TASK_DEADLINE to runtime + val workflow = workflows.getOrPut(workflowId) { + Job(UUID(0L, workflowId), "<unnamed>", HashSet()) + } + val workload = SimFlopsWorkload(flops) + val task = Task( + UUID(0L, taskId), + "<unnamed>", + HashSet(), + mapOf( + "workload" to workload, + WORKFLOW_TASK_CORES to cores, + WORKFLOW_TASK_DEADLINE to runtime + ) ) - ) - starts.merge(workflowId, submitTime, ::min) - (workflow.tasks as MutableSet<Task>).add(task) - tasks[taskId] = task - taskDependencies[task] = dependencies + starts.merge(workflowId, submitTime, ::min) + (workflow.tasks as MutableSet<Task>).add(task) + tasks[taskId] = task + taskDependencies[task] = dependencies + } } // Fix dependencies and dependents for all tasks |
