diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-10-01 17:30:54 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-10-01 17:30:54 +0200 |
| commit | dff259fa1a721df3bc2601014d5749f6e620854c (patch) | |
| tree | bb246d18f86e551dd01f92658c6ef2b14a608fc6 /simulator/opendc-format | |
| parent | 5d71ef4bd6ca84cf5f445f7ba4bb5f7a1e181b64 (diff) | |
| parent | ebd375d524714708d7b38c0117a77d50d23043a3 (diff) | |
Merge pull request #43 from atlarge-research/feat/wta-parser
Add parser for WTA traces
Diffstat (limited to 'simulator/opendc-format')
4 files changed, 181 insertions, 0 deletions
diff --git a/simulator/opendc-format/build.gradle.kts b/simulator/opendc-format/build.gradle.kts index e44d68b9..38fcb329 100644 --- a/simulator/opendc-format/build.gradle.kts +++ b/simulator/opendc-format/build.gradle.kts @@ -36,6 +36,12 @@ dependencies { } implementation(kotlin("reflect")) + implementation("org.apache.parquet:parquet-avro:1.11.0") + implementation("org.apache.hadoop:hadoop-client:3.2.1") { + exclude(group = "org.slf4j", module = "slf4j-log4j12") + exclude(group = "log4j") + } + testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}") diff --git a/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt new file mode 100644 index 00000000..12a60aec --- /dev/null +++ b/simulator/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.trace.wtf + +import org.apache.avro.generic.GenericRecord +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetReader +import org.opendc.compute.core.image.FlopsApplicationImage +import org.opendc.core.User +import org.opendc.format.trace.TraceEntry +import org.opendc.format.trace.TraceReader +import org.opendc.workflows.workload.Job +import org.opendc.workflows.workload.Task +import org.opendc.workflows.workload.WORKFLOW_TASK_DEADLINE +import java.util.UUID +import kotlin.math.min + +/** + * A [TraceReader] for the Workflow Trace Format (WTF). See the Workflow Trace Archive + * (https://wta.atlarge-research.com/) for more information about the format. + * + * @param path The path to the trace. + */ +class WtfTraceReader(path: String) : TraceReader<Job> { + /** + * The internal iterator to use for this reader. + */ + private val iterator: Iterator<TraceEntry<Job>> + + /** + * Initialize the reader. + */ + init { + val entries = mutableMapOf<Long, TraceEntryImpl>() + val tasks = mutableMapOf<Long, Task>() + val taskDependencies = mutableMapOf<Task, List<Long>>() + + val reader = AvroParquetReader.builder<GenericRecord>(Path(path, "tasks/schema-1.0")).build() + + while (true) { + val nextRecord = reader.read() ?: break + + val workflowId = nextRecord.get("workflow_id") as Long + val taskId = nextRecord.get("id") as Long + val submitTime = nextRecord.get("ts_submit") as Long + val runtime = nextRecord.get("runtime") as Long + val cores = (nextRecord.get("resource_amount_requested") as Double).toInt() + @Suppress("UNCHECKED_CAST") + val dependencies = (nextRecord.get("parents") as ArrayList<GenericRecord>).map { + it.get("item") as Long + } + val inputSize: Long = 0 + val outputSize: Long = 0 + + val flops: Long = 4100 * (runtime / 1000) * cores + + val entry = entries.getOrPut(workflowId) { + TraceEntryImpl(submitTime, Job(UUID(0L, taskId), "<unnamed>", UnnamedUser, HashSet())) + } + val workflow = entry.workload + val task = Task( + UUID(0L, taskId), + "<unnamed>", + FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), flops, cores), + HashSet(), + mapOf(WORKFLOW_TASK_DEADLINE to runtime) + ) + + entry.submissionTime = min(entry.submissionTime, submitTime) + (workflow.tasks as MutableSet<Task>).add(task) + tasks[taskId] = task + taskDependencies[task] = dependencies + } + + // Fix dependencies and dependents for all tasks + taskDependencies.forEach { (task, dependencies) -> + (task.dependencies as MutableSet<Task>).addAll( + dependencies.map { taskId -> + tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found") + } + ) + } + + // Create the entry iterator + iterator = entries.values.sortedBy { it.submissionTime }.iterator() + } + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): TraceEntry<Job> = iterator.next() + + override fun close() {} + + /** + * An unnamed user. + */ + private object UnnamedUser : User { + override val name: String = "<unnamed>" + override val uid: UUID = UUID.randomUUID() + } + + /** + * An entry in the trace. + */ + private data class TraceEntryImpl( + override var submissionTime: Long, + override val workload: Job + ) : TraceEntry<Job> +} diff --git a/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt new file mode 100644 index 00000000..58d96657 --- /dev/null +++ b/simulator/opendc-format/src/test/kotlin/org/opendc/format/trace/wtf/WtfTraceReaderTest.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.trace.wtf + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test + +/** + * Test suite for the [WtfTraceReader] class. + */ +class WtfTraceReaderTest { + /** + * Smoke test for parsing WTF traces. + */ + @Test + fun testParseWtf() { + val reader = WtfTraceReader("src/test/resources/wtf-trace") + var entry = reader.next() + assertEquals(0, entry.submissionTime) + assertEquals(23, entry.workload.tasks.size) + + entry = reader.next() + assertEquals(333387, entry.submissionTime) + assertEquals(23, entry.workload.tasks.size) + } +} diff --git a/simulator/opendc-format/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquet b/simulator/opendc-format/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquet Binary files differnew file mode 100644 index 00000000..d2044038 --- /dev/null +++ b/simulator/opendc-format/src/test/resources/wtf-trace/tasks/schema-1.0/part.0.parquet |
