diff options
Diffstat (limited to 'opendc-format')
3 files changed, 53 insertions, 37 deletions
diff --git a/opendc-format/build.gradle.kts b/opendc-format/build.gradle.kts index c0ffeb3e..d3c1a59a 100644 --- a/opendc-format/build.gradle.kts +++ b/opendc-format/build.gradle.kts @@ -39,8 +39,15 @@ dependencies { exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect") } implementation(kotlin("reflect")) - implementation(libs.parquet) - implementation(libs.hadoop.client) { + + implementation(libs.parquet) { + exclude(group = "org.apache.hadoop") + } + implementation(libs.hadoop.common) { + exclude(group = "org.slf4j", module = "slf4j-log4j12") + exclude(group = "log4j") + } + implementation(libs.hadoop.mapreduce.client.core) { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") } diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index feadf61f..dde1b340 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -23,15 +23,16 @@ package org.opendc.format.trace.wtf import org.apache.avro.generic.GenericRecord -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetReader import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.format.util.LocalParquetReader import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.workflow.api.Job import org.opendc.workflow.api.Task import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import java.io.File +import java.nio.file.Path import java.util.UUID import kotlin.math.min @@ -41,13 +42,20 @@ import kotlin.math.min * * @param path The path to the trace. */ -public class WtfTraceReader(path: String) : TraceReader<Job> { +public class WtfTraceReader(path: Path) : TraceReader<Job> { /** * The internal iterator to use for this reader. */ private val iterator: Iterator<TraceEntry<Job>> /** + * Construct a [TraceReader] from the specified [path]. + * + * @param path The path to the trace. + */ + public constructor(path: File) : this(path.toPath()) + + /** * Initialize the reader. */ init { @@ -56,43 +64,43 @@ public class WtfTraceReader(path: String) : TraceReader<Job> { val tasks = mutableMapOf<Long, Task>() val taskDependencies = mutableMapOf<Task, List<Long>>() - @Suppress("DEPRECATION") - val reader = AvroParquetReader.builder<GenericRecord>(Path(path, "tasks/schema-1.0")).build() + LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")).use { reader -> + while (true) { + val nextRecord = reader.read() ?: break - while (true) { - val nextRecord = reader.read() ?: break + val workflowId = nextRecord.get("workflow_id") as Long + val taskId = nextRecord.get("id") as Long + val submitTime = nextRecord.get("ts_submit") as Long + val runtime = nextRecord.get("runtime") as Long + val cores = (nextRecord.get("resource_amount_requested") as Double).toInt() - val workflowId = nextRecord.get("workflow_id") as Long - val taskId = nextRecord.get("id") as Long - val submitTime = nextRecord.get("ts_submit") as Long - val runtime = nextRecord.get("runtime") as Long - val cores = (nextRecord.get("resource_amount_requested") as Double).toInt() - @Suppress("UNCHECKED_CAST") - val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map { - it.get("item") as Long - } + @Suppress("UNCHECKED_CAST") + val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map { + it.get("item") as Long + } - val flops: Long = 4100 * (runtime / 1000) * cores + val flops: Long = 4100 * (runtime / 1000) * cores - val workflow = workflows.getOrPut(workflowId) { - Job(UUID(0L, workflowId), "<unnamed>", HashSet()) - } - val workload = SimFlopsWorkload(flops) - val task = Task( - UUID(0L, taskId), - "<unnamed>", - HashSet(), - mapOf( - "workload" to workload, - WORKFLOW_TASK_CORES to cores, - WORKFLOW_TASK_DEADLINE to runtime + val workflow = workflows.getOrPut(workflowId) { + Job(UUID(0L, workflowId), "<unnamed>", HashSet()) + } + val workload = SimFlopsWorkload(flops) + val task = Task( + UUID(0L, taskId), + "<unnamed>", + HashSet(), + mapOf( + "workload" to workload, + WORKFLOW_TASK_CORES to cores, + WORKFLOW_TASK_DEADLINE to runtime + ) ) - ) - starts.merge(workflowId, submitTime, ::min) - (workflow.tasks as MutableSet<Task>).add(task) - tasks[taskId] = task - taskDependencies[task] = dependencies + starts.merge(workflowId, submitTime, ::min) + (workflow.tasks as MutableSet<Task>).add(task) + tasks[taskId] = task + taskDependencies[task] = dependencies + } } // Fix dependencies and dependents for all tasks diff --git a/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt b/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt index bcfa7553..31ae03e0 100644 --- a/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt +++ b/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt @@ -24,6 +24,7 @@ package org.opendc.format.trace.wtf import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test +import java.io.File /** * Test suite for the [WtfTraceReader] class. @@ -34,7 +35,7 @@ class WtfTraceReaderTest { */ @Test fun testParseWtf() { - val reader = WtfTraceReader("src/test/resources/wtf-trace") + val reader = WtfTraceReader(File("src/test/resources/wtf-trace")) var entry = reader.next() assertEquals(0, entry.start) assertEquals(23, entry.workload.tasks.size) |
