summaryrefslogtreecommitdiff
path: root/opendc-format/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-01 11:29:27 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-02 11:30:15 +0200
commitb2308e1077dc60ec6a4dc646613a4be5b59695a6 (patch)
treea39aa4b5a41bc4616dbe5cade8522221fa6c10af /opendc-format/src
parent5c6bf9739aa0ffd9651df4fcb4cd46a8545144f0 (diff)
refactor(trace): Implement trace API for WTF reader
This change updates the WTF trace reader to support the new streaming trace API.
Diffstat (limited to 'opendc-format/src')
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt44
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt32
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt126
-rw-r--r--opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt47
-rw-r--r--opendc-format/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquetbin87475 -> 0 bytes
5 files changed, 0 insertions, 249 deletions
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt
deleted file mode 100644
index 3ce79d69..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2019 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace
-
-import java.util.UUID
-
-/**
- * An entry in a workload trace.
- *
- * @param uid The unique identifier of the entry.
- * @param name The name of the entry.
- * @param start The start time of the workload.
- * @param workload The workload of the entry.
- * @param meta The meta-data associated with the workload.
- */
-public data class TraceEntry<out T>(
- val uid: UUID,
- val name: String,
- val start: Long,
- val workload: T,
- val meta: Map<String, Any>
-)
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt
deleted file mode 100644
index 797a88d5..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace
-
-/**
- * An interface for reading workloads into memory.
- *
- * This interface must guarantee that the entries are delivered in order of submission time.
- *
- * @param T The shape of the workloads supported by this reader.
- */
-public interface TraceReader<T> : Iterator<TraceEntry<T>>, AutoCloseable
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
deleted file mode 100644
index e8e72f0e..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace.wtf
-
-import org.apache.avro.generic.GenericRecord
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.trace.util.parquet.LocalParquetReader
-import org.opendc.workflow.api.Job
-import org.opendc.workflow.api.Task
-import org.opendc.workflow.api.WORKFLOW_TASK_CORES
-import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
-import java.io.File
-import java.nio.file.Path
-import java.util.UUID
-import kotlin.math.min
-
-/**
- * A [TraceReader] for the Workflow Trace Format (WTF). See the Workflow Trace Archive
- * (https://wta.atlarge-research.com/) for more information about the format.
- *
- * @param path The path to the trace.
- */
-public class WtfTraceReader(path: Path) : TraceReader<Job> {
- /**
- * The internal iterator to use for this reader.
- */
- private val iterator: Iterator<TraceEntry<Job>>
-
- /**
- * Construct a [TraceReader] from the specified [path].
- *
- * @param path The path to the trace.
- */
- public constructor(path: File) : this(path.toPath())
-
- /**
- * Initialize the reader.
- */
- init {
- val workflows = mutableMapOf<Long, Job>()
- val starts = mutableMapOf<Long, Long>()
- val tasks = mutableMapOf<Long, Task>()
- val taskDependencies = mutableMapOf<Task, List<Long>>()
-
- LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")).use { reader ->
- while (true) {
- val nextRecord = reader.read() ?: break
-
- val workflowId = nextRecord.get("workflow_id") as Long
- val taskId = nextRecord.get("id") as Long
- val submitTime = nextRecord.get("ts_submit") as Long
- val runtime = nextRecord.get("runtime") as Long
- val cores = (nextRecord.get("resource_amount_requested") as Double).toInt()
-
- @Suppress("UNCHECKED_CAST")
- val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map {
- it.get("item") as Long
- }
-
- val flops: Long = 4100 * (runtime / 1000) * cores
-
- val workflow = workflows.getOrPut(workflowId) {
- Job(UUID(0L, workflowId), "<unnamed>", HashSet())
- }
- val workload = SimFlopsWorkload(flops)
- val task = Task(
- UUID(0L, taskId),
- "<unnamed>",
- HashSet(),
- mapOf(
- "workload" to workload,
- WORKFLOW_TASK_CORES to cores,
- WORKFLOW_TASK_DEADLINE to runtime
- )
- )
-
- starts.merge(workflowId, submitTime, ::min)
- (workflow.tasks as MutableSet<Task>).add(task)
- tasks[taskId] = task
- taskDependencies[task] = dependencies
- }
- }
-
- // Fix dependencies and dependents for all tasks
- taskDependencies.forEach { (task, dependencies) ->
- (task.dependencies as MutableSet<Task>).addAll(
- dependencies.map { taskId ->
- tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found")
- }
- )
- }
-
- // Create the entry iterator
- iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) }
- .sortedBy { it.start }
- .iterator()
- }
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<Job> = iterator.next()
-
- override fun close() {}
-}
diff --git a/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt b/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt
deleted file mode 100644
index 31ae03e0..00000000
--- a/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace.wtf
-
-import org.junit.jupiter.api.Assertions.*
-import org.junit.jupiter.api.Test
-import java.io.File
-
-/**
- * Test suite for the [WtfTraceReader] class.
- */
-class WtfTraceReaderTest {
- /**
- * Smoke test for parsing WTF traces.
- */
- @Test
- fun testParseWtf() {
- val reader = WtfTraceReader(File("src/test/resources/wtf-trace"))
- var entry = reader.next()
- assertEquals(0, entry.start)
- assertEquals(23, entry.workload.tasks.size)
-
- entry = reader.next()
- assertEquals(333387, entry.start)
- assertEquals(23, entry.workload.tasks.size)
- }
-}
diff --git a/opendc-format/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquet b/opendc-format/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquet
deleted file mode 100644
index d2044038..00000000
--- a/opendc-format/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquet
+++ /dev/null
Binary files differ