summaryrefslogtreecommitdiff
path: root/opendc-format
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-format')
-rw-r--r--opendc-format/build.gradle.kts2
-rw-r--r--opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt176
2 files changed, 1 insertions, 177 deletions
diff --git a/opendc-format/build.gradle.kts b/opendc-format/build.gradle.kts
index c8e30846..c1258428 100644
--- a/opendc-format/build.gradle.kts
+++ b/opendc-format/build.gradle.kts
@@ -39,7 +39,7 @@ dependencies {
exclude(group = "org.jetbrains.kotlin", module = "kotlin-reflect")
}
implementation(libs.jackson.dataformat.csv)
- implementation(kotlin("reflect"))
+ implementation("org.jetbrains.kotlin:kotlin-reflect:1.5.30")
/* This configuration is necessary for a slim dependency on Apache Parquet */
implementation(libs.parquet) {
diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
deleted file mode 100644
index e68afeb7..00000000
--- a/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.format.trace.gwf
-
-import org.opendc.format.trace.TraceEntry
-import org.opendc.format.trace.TraceReader
-import org.opendc.simulator.compute.workload.SimFlopsWorkload
-import org.opendc.workflow.api.Job
-import org.opendc.workflow.api.Task
-import org.opendc.workflow.api.WORKFLOW_TASK_CORES
-import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE
-import java.io.BufferedReader
-import java.io.File
-import java.io.InputStream
-import java.util.*
-import kotlin.collections.HashSet
-import kotlin.collections.Iterator
-import kotlin.collections.List
-import kotlin.collections.MutableSet
-import kotlin.collections.component1
-import kotlin.collections.component2
-import kotlin.collections.filter
-import kotlin.collections.forEach
-import kotlin.collections.getOrPut
-import kotlin.collections.map
-import kotlin.collections.mapIndexed
-import kotlin.collections.mapOf
-import kotlin.collections.mutableMapOf
-import kotlin.collections.set
-import kotlin.collections.sortedBy
-import kotlin.collections.toMap
-import kotlin.math.max
-import kotlin.math.min
-
-/**
- * A [TraceReader] for the Grid Workload Format. See the Grid Workloads Archive (http://gwa.ewi.tudelft.nl/) for more
- * information about the format.
- *
- * Be aware that in the Grid Workload Format, workflows are not required to be ordered by submission time and therefore
- * this reader needs to read the whole trace into memory before an entry can be read. Consider converting the trace to a
- * different format for better performance.
- *
- * @param reader The buffered reader to read the trace with.
- */
-public class GwfTraceReader(reader: BufferedReader) : TraceReader<Job> {
- /**
- * The internal iterator to use for this reader.
- */
- private val iterator: Iterator<TraceEntry<Job>>
-
- /**
- * Create a [GwfTraceReader] instance from the specified [File].
- *
- * @param file The file to read from.
- */
- public constructor(file: File) : this(file.bufferedReader())
-
- /**
- * Create a [GwfTraceReader] instance from the specified [InputStream].
- *
- * @param input The input stream to read from.
- */
- public constructor(input: InputStream) : this(input.bufferedReader())
-
- /**
- * Initialize the reader.
- */
- init {
- val workflows = mutableMapOf<Long, Job>()
- val starts = mutableMapOf<Long, Long>()
- val tasks = mutableMapOf<Long, Task>()
- val taskDependencies = mutableMapOf<Task, List<Long>>()
-
- var workflowIdCol = 0
- var taskIdCol = 0
- var submitTimeCol = 0
- var runtimeCol = 0
- var coreCol = 0
- var dependencyCol = 0
-
- try {
- reader.lineSequence()
- .filter { line ->
- // Ignore comments in the trace
- !line.startsWith("#") && line.isNotBlank()
- }
- .forEachIndexed { idx, line ->
- val values = line.split(",")
-
- // Parse GWF header
- if (idx == 0) {
- val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap()
- workflowIdCol = header["WorkflowID"]!!
- taskIdCol = header["JobID"]!!
- submitTimeCol = header["SubmitTime"]!!
- runtimeCol = header["RunTime"]!!
- coreCol = header["NProcs"]!!
- dependencyCol = header["Dependencies"]!!
- return@forEachIndexed
- }
-
- val workflowId = values[workflowIdCol].trim().toLong()
- val taskId = values[taskIdCol].trim().toLong()
- val submitTime = values[submitTimeCol].trim().toLong() * 1000 // ms
- val runtime = max(0, values[runtimeCol].trim().toLong()) // s
- val cores = values[coreCol].trim().toInt()
- val dependencies = values[dependencyCol].split(" ")
- .filter { it.isNotEmpty() }
- .map { it.trim().toLong() }
-
- val flops: Long = 4000 * runtime * cores
-
- val workflow = workflows.getOrPut(workflowId) {
- Job(UUID(0L, workflowId), "<unnamed>", HashSet())
- }
- val workload = SimFlopsWorkload(flops)
- val task = Task(
- UUID(0L, taskId),
- "<unnamed>",
- HashSet(),
- mapOf(
- "workload" to workload,
- WORKFLOW_TASK_CORES to cores,
- WORKFLOW_TASK_DEADLINE to (runtime * 1000)
- ),
- )
- starts.merge(workflowId, submitTime, ::min)
- (workflow.tasks as MutableSet<Task>).add(task)
- tasks[taskId] = task
- taskDependencies[task] = dependencies
- }
- } finally {
- reader.close()
- }
-
- // Fix dependencies and dependents for all tasks
- taskDependencies.forEach { (task, dependencies) ->
- (task.dependencies as MutableSet<Task>).addAll(
- dependencies.map { taskId ->
- tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found")
- }
- )
- }
-
- // Create the entry iterator
- iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) }
- .sortedBy { it.start }
- .iterator()
- }
-
- override fun hasNext(): Boolean = iterator.hasNext()
-
- override fun next(): TraceEntry<Job> = iterator.next()
-
- override fun close() {}
-}