summaryrefslogtreecommitdiff
path: root/opendc-format/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-format/src')
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt76
-rw-r--r--opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt3
2 files changed, 44 insertions, 35 deletions
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
index feadf61f..dde1b340 100644
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
+++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
@@ -23,15 +23,16 @@
package org.opendc.format.trace.wtf
import org.apache.avro.generic.GenericRecord
-import org.apache.hadoop.fs.Path
-import org.apache.parquet.avro.AvroParquetReader
import org.opendc.format.trace.TraceEntry
import org.opendc.format.trace.TraceReader
+import org.opendc.format.util.LocalParquetReader
import org.opendc.simulator.compute.workload.SimFlopsWorkload
import org.opendc.workflow.api.Job
import org.opendc.workflow.api.Task
import org.opendc.workflow.api.WORKFLOW_TASK_CORES
import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
+import java.io.File
+import java.nio.file.Path
import java.util.UUID
import kotlin.math.min
@@ -41,13 +42,20 @@ import kotlin.math.min
*
* @param path The path to the trace.
*/
-public class WtfTraceReader(path: String) : TraceReader<Job> {
+public class WtfTraceReader(path: Path) : TraceReader<Job> {
/**
* The internal iterator to use for this reader.
*/
private val iterator: Iterator<TraceEntry<Job>>
/**
+ * Construct a [TraceReader] from the specified [path].
+ *
+ * @param path The path to the trace.
+ */
+ public constructor(path: File) : this(path.toPath())
+
+ /**
* Initialize the reader.
*/
init {
@@ -56,43 +64,43 @@ public class WtfTraceReader(path: String) : TraceReader<Job> {
val tasks = mutableMapOf<Long, Task>()
val taskDependencies = mutableMapOf<Task, List<Long>>()
- @Suppress("DEPRECATION")
- val reader = AvroParquetReader.builder<GenericRecord>(Path(path, "tasks/schema-1.0")).build()
+ LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")).use { reader ->
+ while (true) {
+ val nextRecord = reader.read() ?: break
- while (true) {
- val nextRecord = reader.read() ?: break
+ val workflowId = nextRecord.get("workflow_id") as Long
+ val taskId = nextRecord.get("id") as Long
+ val submitTime = nextRecord.get("ts_submit") as Long
+ val runtime = nextRecord.get("runtime") as Long
+ val cores = (nextRecord.get("resource_amount_requested") as Double).toInt()
- val workflowId = nextRecord.get("workflow_id") as Long
- val taskId = nextRecord.get("id") as Long
- val submitTime = nextRecord.get("ts_submit") as Long
- val runtime = nextRecord.get("runtime") as Long
- val cores = (nextRecord.get("resource_amount_requested") as Double).toInt()
- @Suppress("UNCHECKED_CAST")
- val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map {
- it.get("item") as Long
- }
+ @Suppress("UNCHECKED_CAST")
+ val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map {
+ it.get("item") as Long
+ }
- val flops: Long = 4100 * (runtime / 1000) * cores
+ val flops: Long = 4100 * (runtime / 1000) * cores
- val workflow = workflows.getOrPut(workflowId) {
- Job(UUID(0L, workflowId), "<unnamed>", HashSet())
- }
- val workload = SimFlopsWorkload(flops)
- val task = Task(
- UUID(0L, taskId),
- "<unnamed>",
- HashSet(),
- mapOf(
- "workload" to workload,
- WORKFLOW_TASK_CORES to cores,
- WORKFLOW_TASK_DEADLINE to runtime
+ val workflow = workflows.getOrPut(workflowId) {
+ Job(UUID(0L, workflowId), "<unnamed>", HashSet())
+ }
+ val workload = SimFlopsWorkload(flops)
+ val task = Task(
+ UUID(0L, taskId),
+ "<unnamed>",
+ HashSet(),
+ mapOf(
+ "workload" to workload,
+ WORKFLOW_TASK_CORES to cores,
+ WORKFLOW_TASK_DEADLINE to runtime
+ )
)
- )
- starts.merge(workflowId, submitTime, ::min)
- (workflow.tasks as MutableSet<Task>).add(task)
- tasks[taskId] = task
- taskDependencies[task] = dependencies
+ starts.merge(workflowId, submitTime, ::min)
+ (workflow.tasks as MutableSet<Task>).add(task)
+ tasks[taskId] = task
+ taskDependencies[task] = dependencies
+ }
}
// Fix dependencies and dependents for all tasks
diff --git a/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt b/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt
index bcfa7553..31ae03e0 100644
--- a/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt
+++ b/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt
@@ -24,6 +24,7 @@ package org.opendc.format.trace.wtf
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
+import java.io.File
/**
* Test suite for the [WtfTraceReader] class.
@@ -34,7 +35,7 @@ class WtfTraceReaderTest {
*/
@Test
fun testParseWtf() {
- val reader = WtfTraceReader("src/test/resources/wtf-trace")
+ val reader = WtfTraceReader(File("src/test/resources/wtf-trace"))
var entry = reader.next()
assertEquals(0, entry.start)
assertEquals(23, entry.workload.tasks.size)