From 23c1502c2668305fd5f4c38c6c794c985d2037e3 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 31 Aug 2021 14:56:08 +0200 Subject: refactor(trace): Move GWF trace reader into separate module This change starts the process of moving the different trace formats into separate modules. This change in particular moves the GWF trace format into a new module, opendc-trace-gwf. Furthermore, this change also implements the trace API for the GWF module. --- .../org/opendc/format/trace/gwf/GwfTraceReader.kt | 176 --------------------- 1 file changed, 176 deletions(-) delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt deleted file mode 100644 index e68afeb7..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.gwf - -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.workflow.api.Job -import org.opendc.workflow.api.Task -import org.opendc.workflow.api.WORKFLOW_TASK_CORES -import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE -import java.io.BufferedReader -import java.io.File -import java.io.InputStream -import java.util.* -import kotlin.collections.HashSet -import kotlin.collections.Iterator -import kotlin.collections.List -import kotlin.collections.MutableSet -import kotlin.collections.component1 -import kotlin.collections.component2 -import kotlin.collections.filter -import kotlin.collections.forEach -import kotlin.collections.getOrPut -import kotlin.collections.map -import kotlin.collections.mapIndexed -import kotlin.collections.mapOf -import kotlin.collections.mutableMapOf -import kotlin.collections.set -import kotlin.collections.sortedBy -import kotlin.collections.toMap -import kotlin.math.max -import kotlin.math.min - -/** - * A [TraceReader] for the Grid Workload Format. See the Grid Workloads Archive (http://gwa.ewi.tudelft.nl/) for more - * information about the format. - * - * Be aware that in the Grid Workload Format, workflows are not required to be ordered by submission time and therefore - * this reader needs to read the whole trace into memory before an entry can be read. Consider converting the trace to a - * different format for better performance. - * - * @param reader The buffered reader to read the trace with. - */ -public class GwfTraceReader(reader: BufferedReader) : TraceReader { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator> - - /** - * Create a [GwfTraceReader] instance from the specified [File]. - * - * @param file The file to read from. - */ - public constructor(file: File) : this(file.bufferedReader()) - - /** - * Create a [GwfTraceReader] instance from the specified [InputStream]. - * - * @param input The input stream to read from. - */ - public constructor(input: InputStream) : this(input.bufferedReader()) - - /** - * Initialize the reader. - */ - init { - val workflows = mutableMapOf() - val starts = mutableMapOf() - val tasks = mutableMapOf() - val taskDependencies = mutableMapOf>() - - var workflowIdCol = 0 - var taskIdCol = 0 - var submitTimeCol = 0 - var runtimeCol = 0 - var coreCol = 0 - var dependencyCol = 0 - - try { - reader.lineSequence() - .filter { line -> - // Ignore comments in the trace - !line.startsWith("#") && line.isNotBlank() - } - .forEachIndexed { idx, line -> - val values = line.split(",") - - // Parse GWF header - if (idx == 0) { - val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() - workflowIdCol = header["WorkflowID"]!! - taskIdCol = header["JobID"]!! - submitTimeCol = header["SubmitTime"]!! - runtimeCol = header["RunTime"]!! - coreCol = header["NProcs"]!! - dependencyCol = header["Dependencies"]!! - return@forEachIndexed - } - - val workflowId = values[workflowIdCol].trim().toLong() - val taskId = values[taskIdCol].trim().toLong() - val submitTime = values[submitTimeCol].trim().toLong() * 1000 // ms - val runtime = max(0, values[runtimeCol].trim().toLong()) // s - val cores = values[coreCol].trim().toInt() - val dependencies = values[dependencyCol].split(" ") - .filter { it.isNotEmpty() } - .map { it.trim().toLong() } - - val flops: Long = 4000 * runtime * cores - - val workflow = workflows.getOrPut(workflowId) { - Job(UUID(0L, workflowId), "", HashSet()) - } - val workload = SimFlopsWorkload(flops) - val task = Task( - UUID(0L, taskId), - "", - HashSet(), - mapOf( - "workload" to workload, - WORKFLOW_TASK_CORES to cores, - WORKFLOW_TASK_DEADLINE to (runtime * 1000) - ), - ) - starts.merge(workflowId, submitTime, ::min) - (workflow.tasks as MutableSet).add(task) - tasks[taskId] = task - taskDependencies[task] = dependencies - } - } finally { - reader.close() - } - - // Fix dependencies and dependents for all tasks - taskDependencies.forEach { (task, dependencies) -> - (task.dependencies as MutableSet).addAll( - dependencies.map { taskId -> - tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found") - } - ) - } - - // Create the entry iterator - iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) } - .sortedBy { it.start } - .iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() {} -} -- cgit v1.2.3 From e8cdfbcec3f75b3f303ce52bac5f5595a94555e4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 31 Aug 2021 15:14:46 +0200 Subject: refactor(trace): Extract Parquet helpers into separate module This change extracts the Parquet helpers outside format module into a new module, in order to improve re-usability of these helpers. --- .../org/opendc/format/trace/wtf/WtfTraceReader.kt | 2 +- .../org/opendc/format/util/LocalInputFile.kt | 107 -------------------- .../org/opendc/format/util/LocalOutputFile.kt | 95 ----------------- .../org/opendc/format/util/LocalParquetReader.kt | 112 --------------------- 4 files changed, 1 insertion(+), 315 deletions(-) delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index dde1b340..e8e72f0e 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -25,8 +25,8 @@ package org.opendc.format.trace.wtf import org.apache.avro.generic.GenericRecord import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader -import org.opendc.format.util.LocalParquetReader import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.trace.util.parquet.LocalParquetReader import org.opendc.workflow.api.Job import org.opendc.workflow.api.Task import org.opendc.workflow.api.WORKFLOW_TASK_CORES diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt deleted file mode 100644 index 92319ace..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.util - -import org.apache.parquet.io.InputFile -import org.apache.parquet.io.SeekableInputStream -import java.io.EOFException -import java.io.File -import java.nio.ByteBuffer -import java.nio.channels.FileChannel -import java.nio.file.Path -import java.nio.file.StandardOpenOption - -/** - * An [InputFile] on the local filesystem. - */ -public class LocalInputFile(private val path: Path) : InputFile { - /** - * The [FileChannel] used for accessing the input path. - */ - private val channel = FileChannel.open(path, StandardOpenOption.READ) - - /** - * Construct a [LocalInputFile] for the specified [file]. - */ - public constructor(file: File) : this(file.toPath()) - - override fun getLength(): Long = channel.size() - - override fun newStream(): SeekableInputStream = object : SeekableInputStream() { - override fun read(buf: ByteBuffer): Int { - return channel.read(buf) - } - - override fun read(): Int { - val single = ByteBuffer.allocate(1) - var read: Int - - // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte - do { - read = channel.read(single) - } while (read == 0) - - return if (read == -1) { - read - } else { - single.get(0).toInt() and 0xff - } - } - - override fun getPos(): Long { - return channel.position() - } - - override fun seek(newPos: Long) { - channel.position(newPos) - } - - override fun readFully(bytes: ByteArray) { - readFully(ByteBuffer.wrap(bytes)) - } - - override fun readFully(bytes: ByteArray, start: Int, len: Int) { - readFully(ByteBuffer.wrap(bytes, start, len)) - } - - override fun readFully(buf: ByteBuffer) { - var remainder = buf.remaining() - while (remainder > 0) { - val read = channel.read(buf) - remainder -= read - - if (read == -1 && remainder > 0) { - throw EOFException() - } - } - } - - override fun close() { - channel.close() - } - - override fun toString(): String = "NioSeekableInputStream" - } - - override fun toString(): String = "LocalInputFile[path=$path]" -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt deleted file mode 100644 index 657bca5a..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.util - -import org.apache.parquet.io.OutputFile -import org.apache.parquet.io.PositionOutputStream -import java.io.File -import java.io.OutputStream -import java.nio.file.Files -import java.nio.file.Path -import java.nio.file.StandardOpenOption - -/** - * An [OutputFile] on the local filesystem. - */ -public class LocalOutputFile(private val path: Path) : OutputFile { - /** - * Construct a [LocalOutputFile] from the specified [file] - */ - public constructor(file: File) : this(file.toPath()) - - override fun create(blockSizeHint: Long): PositionOutputStream { - val output = Files.newOutputStream(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE) - return NioPositionOutputStream(output) - } - - override fun createOrOverwrite(blockSizeHint: Long): PositionOutputStream { - val output = Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING) - return NioPositionOutputStream(output) - } - - override fun supportsBlockSize(): Boolean = false - - override fun defaultBlockSize(): Long = - throw UnsupportedOperationException("Local filesystem does not have default block size") - - override fun getPath(): String = path.toString() - - /** - * Implementation of [PositionOutputStream] for an [OutputStream]. - */ - private class NioPositionOutputStream(private val output: OutputStream) : PositionOutputStream() { - /** - * The current position in the file. - */ - private var _pos = 0L - - override fun getPos(): Long = _pos - - override fun write(b: Int) { - output.write(b) - _pos++ - } - - override fun write(b: ByteArray) { - output.write(b) - _pos += b.size - } - - override fun write(b: ByteArray, off: Int, len: Int) { - output.write(b, off, len) - _pos += len - } - - override fun flush() { - output.flush() - } - - override fun close() { - output.close() - } - - override fun toString(): String = "NioPositionOutputStream[output=$output]" - } -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt deleted file mode 100644 index 5083f3e1..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.util - -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.hadoop.ParquetReader -import org.apache.parquet.io.InputFile -import java.io.File -import java.io.IOException -import java.nio.file.Files -import java.nio.file.Path -import kotlin.io.path.isDirectory - -/** - * A helper class to read Parquet files. - * - * @param path The path to the Parquet file or directory to read. - */ -public class LocalParquetReader(path: Path) : AutoCloseable { - /** - * The input files to process. - */ - private val filesIterator = if (path.isDirectory()) - Files.list(path) - .filter { !it.isDirectory() } - .sorted() - .map { LocalInputFile(it) } - .iterator() - else - listOf(LocalInputFile(path)).iterator() - - /** - * The Parquet reader to use. - */ - private var reader: ParquetReader? = null - - /** - * Construct a [LocalParquetReader] for the specified [file]. - */ - public constructor(file: File) : this(file.toPath()) - - /** - * Read a single entry in the Parquet file. - */ - public fun read(): T? { - return try { - val next = reader?.read() - if (next != null) { - next - } else { - initReader() - - if (reader == null) - null - else - read() - } - } catch (e: InterruptedException) { - throw IOException(e) - } - } - - /** - * Close the Parquet reader. - */ - override fun close() { - reader?.close() - } - - /** - * Initialize the next reader. - */ - private fun initReader() { - reader?.close() - - this.reader = if (filesIterator.hasNext()) { - createReader(filesIterator.next()) - } else { - null - } - } - - /** - * Create a Parquet reader for the specified file. - */ - private fun createReader(input: InputFile): ParquetReader { - return AvroParquetReader - .builder(input) - .disableCompatibility() - .build() - } -} -- cgit v1.2.3 From 9fcce6ade8714f7f0a9073fe5b7ddd3f0b35c375 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 31 Aug 2021 15:44:47 +0200 Subject: refactor(format): Remove environment reader from format library This change removes the environment reader from the format library since they are highly specific for the particular experiment. In the future, we hope to have a single format to setup the entire datacenter (perhaps similar to the format used by the web runner). --- .../opendc/format/environment/EnvironmentReader.kt | 35 --------- .../org/opendc/format/environment/MachineDef.kt | 35 --------- .../org/opendc/format/environment/sc18/Model.kt | 66 ---------------- .../environment/sc18/Sc18EnvironmentReader.kt | 89 ---------------------- 4 files changed, 225 deletions(-) delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt deleted file mode 100644 index 97d6f239..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment - -import java.io.Closeable - -/** - * An interface for reading descriptions of topology environments into memory. - */ -public interface EnvironmentReader : Closeable { - /** - * Read the environment into a list. - */ - public fun read(): List -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt deleted file mode 100644 index f65c4880..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment - -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.power.PowerModel -import java.util.* - -public data class MachineDef( - val uid: UUID, - val name: String, - val meta: Map, - val model: MachineModel, - val powerModel: PowerModel -) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt deleted file mode 100644 index c313467f..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment.sc18 - -import com.fasterxml.jackson.annotation.JsonSubTypes -import com.fasterxml.jackson.annotation.JsonTypeInfo - -/** - * A topology setup. - * - * @property name The name of the setup. - * @property rooms The rooms in the topology. - */ -internal data class Setup(val name: String, val rooms: List) - -/** - * A room in a topology. - * - * @property type The type of room in the topology. - * @property objects The objects in the room. - */ -internal data class Room(val type: String, val objects: List) - -/** - * An object in a [Room]. - * - * @property type The type of the room object. - */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") -@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)]) -internal sealed class RoomObject(val type: String) { - /** - * A rack in a server room. - * - * @property machines The machines in the rack. - */ - internal data class Rack(val machines: List) : RoomObject("RACK") -} - -/** - * A machine in the setup that consists of the specified CPU's represented as - * integer identifiers and ethernet speed. - * - * @property cpus The CPUs in the machine represented as integer identifiers. - */ -internal data class Machine(val cpus: List) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt deleted file mode 100644 index 7780a98e..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment.sc18 - -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.MachineDef -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.power.ConstantPowerModel -import java.io.InputStream -import java.util.* - -/** - * A parser for the JSON experiment setup files used for the SC18 paper: "A Reference Architecture for Topology - * Schedulers". - * - * @param input The input stream to read from. - * @param mapper The Jackson object mapper to use. - */ -public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader { - /** - * The environment that was read from the file. - */ - private val setup: Setup = mapper.readValue(input) - - /** - * Read the environment. - */ - public override fun read(): List { - var counter = 0 - return setup.rooms.flatMap { room -> - room.objects.flatMap { roomObject -> - when (roomObject) { - is RoomObject.Rack -> { - roomObject.machines.map { machine -> - val cores = machine.cpus.flatMap { id -> - when (id) { - 1 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) - List(node.coreCount) { ProcessingUnit(node, it, 4100.0) } - } - 2 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) - List(node.coreCount) { ProcessingUnit(node, it, 3500.0) } - } - else -> throw IllegalArgumentException("The cpu id $id is not recognized") - } - } - MachineDef( - UUID(0L, counter++.toLong()), - "node-$counter", - emptyMap(), - MachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))), - ConstantPowerModel(0.0) - ) - } - } - } - } - } - } - - override fun close() {} -} -- cgit v1.2.3 From 214480d154771f0b783829b6e5ec82b837304ad2 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 31 Aug 2021 16:18:56 +0200 Subject: refactor(trace): Move Bitbrains format into separate module This change moves Bitbrains trace support into a separate module and adds support for the new trace api. --- .../trace/bitbrains/BitbrainsRawTraceReader.kt | 100 ---------------- .../format/trace/bitbrains/BitbrainsTraceReader.kt | 127 --------------------- 2 files changed, 227 deletions(-) delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt deleted file mode 100644 index ff6cdd02..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.bitbrains - -import com.fasterxml.jackson.annotation.JsonProperty -import com.fasterxml.jackson.databind.MappingIterator -import com.fasterxml.jackson.dataformat.csv.CsvMapper -import com.fasterxml.jackson.dataformat.csv.CsvSchema -import java.io.InputStream - -/** - * A trace reader that enables the user to read Bitbrains specific trace data. - */ -public class BitbrainsRawTraceReader(input: InputStream) : Iterator, AutoCloseable { - /** - * The [CsvSchema] that is used to parse the trace. - */ - private val schema = CsvSchema.builder() - .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER) - .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .setAllowComments(true) - .setUseHeader(true) - .setColumnSeparator(';') - .build() - - /** - * The mapping iterator to use. - */ - private val iterator: MappingIterator = CsvMapper().readerFor(Entry::class.java).with(schema) - .readValues(input) - - override fun hasNext(): Boolean { - return iterator.hasNext() - } - - override fun next(): Entry { - return iterator.next() - } - - override fun close() { - iterator.close() - } - - /** - * A single entry in the trace. - */ - public data class Entry( - @JsonProperty("Timestamp [ms]") - val timestamp: Long, - @JsonProperty("CPU cores") - val cpuCores: Int, - @JsonProperty("CPU capacity provisioned [MHZ]") - val cpuCapacity: Double, - @JsonProperty("CPU usage [MHZ]") - val cpuUsage: Double, - @JsonProperty("CPU usage [%]") - val cpuUsagePct: Double, - @JsonProperty("Memory capacity provisioned [KB]") - val memCapacity: Double, - @JsonProperty("Memory usage [KB]") - val memUsage: Double, - @JsonProperty("Disk read throughput [KB/s]") - val diskRead: Double, - @JsonProperty("Disk write throughput [KB/s]") - val diskWrite: Double, - @JsonProperty("Network received throughput [KB/s]") - val netReceived: Double, - @JsonProperty("Network transmitted throughput [KB/s]") - val netTransmitted: Double - ) -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt deleted file mode 100644 index 9e4876df..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.bitbrains - -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import java.io.File -import java.io.FileInputStream -import java.util.* -import kotlin.math.max -import kotlin.math.min - -/** - * A [TraceReader] for the public VM workload trace format. - * - * @param traceDirectory The directory of the traces. - */ -public class BitbrainsTraceReader(traceDirectory: File) : TraceReader { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator> - - /** - * Initialize the reader. - */ - init { - val entries = mutableMapOf>() - val traceInterval = 5 * 60 * 1000L - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" } - .forEach { vmFile -> - val flopsHistory = mutableListOf() - var vmId = -1L - var maxCores = Int.MIN_VALUE - var requiredMemory = Long.MIN_VALUE - var startTime = Long.MAX_VALUE - var lastTimestamp = Long.MIN_VALUE - - BitbrainsRawTraceReader(FileInputStream(vmFile)).use { reader -> - reader.forEach { entry -> - val timestamp = entry.timestamp * 1000L - val cpuUsage = entry.cpuUsage - vmId = vmFile.nameWithoutExtension.trim().toLong() - val cores = entry.cpuCores - maxCores = max(maxCores, cores) - requiredMemory = max(requiredMemory, (entry.memCapacity / 1000).toLong()) - - if (lastTimestamp < 0) { - lastTimestamp = timestamp - 5 * 60 * 1000L - startTime = min(startTime, lastTimestamp) - } - - if (flopsHistory.isEmpty()) { - flopsHistory.add(SimTraceWorkload.Fragment(lastTimestamp, traceInterval, cpuUsage, cores)) - } else { - val last = flopsHistory.last() - val duration = timestamp - lastTimestamp - // Perform run-length encoding - if (duration == 0L || last.usage == cpuUsage) { - flopsHistory[flopsHistory.size - 1] = last.copy(duration = last.duration + duration) - } else { - flopsHistory.add( - SimTraceWorkload.Fragment( - lastTimestamp, - duration, - cpuUsage, - cores - ) - ) - } - } - - lastTimestamp = timestamp - } - } - - val uuid = UUID(0L, vmId) - - val workload = SimTraceWorkload(flopsHistory.asSequence()) - entries[vmId] = TraceEntry( - uuid, - vmId.toString(), - startTime, - workload, - mapOf( - "cores" to maxCores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - } - - // Create the entry iterator - iterator = entries.values.sortedBy { it.start }.iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() {} -} -- cgit v1.2.3 From 5c6bf9739aa0ffd9651df4fcb4cd46a8545144f0 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 31 Aug 2021 18:08:14 +0200 Subject: refactor(trace): Implement trace API for SWF reader This change updates the SWF trace reader to support the new streaming trace API. --- .../org/opendc/format/trace/swf/SwfTraceReader.kt | 176 --------------------- 1 file changed, 176 deletions(-) delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt deleted file mode 100644 index bda392a9..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.swf - -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import java.io.BufferedReader -import java.io.File -import java.io.FileReader -import java.util.* - -/** - * A [TraceReader] for reading SWF traces into VM-modeled workloads. - * - * The standard is defined by the PWA, see here: https://www.cse.huji.ac.il/labs/parallel/workload/swf.html - * - * @param file The trace file. - */ -public class SwfTraceReader( - file: File, - maxNumCores: Int = -1 -) : TraceReader { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator> - - /** - * Initialize the reader. - */ - init { - val entries = mutableMapOf>() - - val jobNumberCol = 0 - val submitTimeCol = 1 // seconds (begin of trace is 0) - val waitTimeCol = 2 // seconds - val runTimeCol = 3 // seconds - val numAllocatedCoresCol = 4 // We assume that single-core processors were used at the time - val requestedMemoryCol = 9 // KB per processor/core (-1 if not specified) - - val sliceDuration = 5 * 60L - - var jobNumber: Long - var submitTime: Long - var waitTime: Long - var runTime: Long - var cores: Int - var memory: Long - var slicedWaitTime: Long - var runtimePartialSliceRemainder: Long - - BufferedReader(FileReader(file)).use { reader -> - reader.lineSequence() - .filter { line -> - // Ignore comments in the trace - !line.startsWith(";") && line.isNotBlank() - } - .forEach { line -> - val values = line.trim().split("\\s+".toRegex()) - - jobNumber = values[jobNumberCol].trim().toLong() - submitTime = values[submitTimeCol].trim().toLong() - waitTime = values[waitTimeCol].trim().toLong() - runTime = values[runTimeCol].trim().toLong() - cores = values[numAllocatedCoresCol].trim().toInt() - memory = values[requestedMemoryCol].trim().toLong() - - if (maxNumCores != -1 && cores > maxNumCores) { - println("Skipped a task due to processor count ($cores > $maxNumCores).") - return@forEach - } - - if (memory == -1L) { - memory = 1000L * cores // assume 1GB of memory per processor if not specified - } else { - memory /= 1000 // convert KB to MB - } - - val flopsHistory = mutableListOf() - - // Insert waiting time slices - - // We ignore wait time remainders under one - slicedWaitTime = 0L - if (waitTime >= sliceDuration) { - for (tick in submitTime until (submitTime + waitTime - sliceDuration) step sliceDuration) { - flopsHistory.add( - SimTraceWorkload.Fragment( - tick, - sliceDuration * 1000, - 0.0, - cores - ) - ) - slicedWaitTime += sliceDuration - } - } - - // Insert run time slices - - runtimePartialSliceRemainder = runTime % sliceDuration - - for ( - tick in (submitTime + slicedWaitTime) - until (submitTime + slicedWaitTime + runTime - sliceDuration) - step sliceDuration - ) { - flopsHistory.add( - SimTraceWorkload.Fragment( - tick, - sliceDuration * 1000L, - 1.0, - cores - ) - ) - } - - if (runtimePartialSliceRemainder > 0) { - flopsHistory.add( - SimTraceWorkload.Fragment( - submitTime + slicedWaitTime + runTime, - sliceDuration, - runtimePartialSliceRemainder / sliceDuration.toDouble(), - cores - ) - ) - } - - val uuid = UUID(0L, jobNumber) - val workload = SimTraceWorkload(flopsHistory.asSequence()) - entries[jobNumber] = TraceEntry( - uuid, - jobNumber.toString(), - submitTime, - workload, - mapOf( - "cores" to cores, - "required-memory" to memory, - "workload" to workload - ) - ) - } - } - - // Create the entry iterator - iterator = entries.values.sortedBy { it.start }.iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() {} -} -- cgit v1.2.3 From b2308e1077dc60ec6a4dc646613a4be5b59695a6 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 1 Sep 2021 11:29:27 +0200 Subject: refactor(trace): Implement trace API for WTF reader This change updates the WTF trace reader to support the new streaming trace API. --- .../kotlin/org/opendc/format/trace/TraceEntry.kt | 44 ------- .../kotlin/org/opendc/format/trace/TraceReader.kt | 32 ------ .../org/opendc/format/trace/wtf/WtfTraceReader.kt | 126 --------------------- 3 files changed, 202 deletions(-) delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt deleted file mode 100644 index 3ce79d69..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace - -import java.util.UUID - -/** - * An entry in a workload trace. - * - * @param uid The unique identifier of the entry. - * @param name The name of the entry. - * @param start The start time of the workload. - * @param workload The workload of the entry. - * @param meta The meta-data associated with the workload. - */ -public data class TraceEntry( - val uid: UUID, - val name: String, - val start: Long, - val workload: T, - val meta: Map -) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt deleted file mode 100644 index 797a88d5..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace - -/** - * An interface for reading workloads into memory. - * - * This interface must guarantee that the entries are delivered in order of submission time. - * - * @param T The shape of the workloads supported by this reader. - */ -public interface TraceReader : Iterator>, AutoCloseable diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt deleted file mode 100644 index e8e72f0e..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.wtf - -import org.apache.avro.generic.GenericRecord -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.trace.util.parquet.LocalParquetReader -import org.opendc.workflow.api.Job -import org.opendc.workflow.api.Task -import org.opendc.workflow.api.WORKFLOW_TASK_CORES -import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE -import java.io.File -import java.nio.file.Path -import java.util.UUID -import kotlin.math.min - -/** - * A [TraceReader] for the Workflow Trace Format (WTF). See the Workflow Trace Archive - * (https://wta.atlarge-research.com/) for more information about the format. - * - * @param path The path to the trace. - */ -public class WtfTraceReader(path: Path) : TraceReader { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator> - - /** - * Construct a [TraceReader] from the specified [path]. - * - * @param path The path to the trace. - */ - public constructor(path: File) : this(path.toPath()) - - /** - * Initialize the reader. - */ - init { - val workflows = mutableMapOf() - val starts = mutableMapOf() - val tasks = mutableMapOf() - val taskDependencies = mutableMapOf>() - - LocalParquetReader(path.resolve("tasks/schema-1.0")).use { reader -> - while (true) { - val nextRecord = reader.read() ?: break - - val workflowId = nextRecord.get("workflow_id") as Long - val taskId = nextRecord.get("id") as Long - val submitTime = nextRecord.get("ts_submit") as Long - val runtime = nextRecord.get("runtime") as Long - val cores = (nextRecord.get("resource_amount_requested") as Double).toInt() - - @Suppress("UNCHECKED_CAST") - val dependencies = (nextRecord.get("parents") as ArrayList).map { - it.get("item") as Long - } - - val flops: Long = 4100 * (runtime / 1000) * cores - - val workflow = workflows.getOrPut(workflowId) { - Job(UUID(0L, workflowId), "", HashSet()) - } - val workload = SimFlopsWorkload(flops) - val task = Task( - UUID(0L, taskId), - "", - HashSet(), - mapOf( - "workload" to workload, - WORKFLOW_TASK_CORES to cores, - WORKFLOW_TASK_DEADLINE to runtime - ) - ) - - starts.merge(workflowId, submitTime, ::min) - (workflow.tasks as MutableSet).add(task) - tasks[taskId] = task - taskDependencies[task] = dependencies - } - } - - // Fix dependencies and dependents for all tasks - taskDependencies.forEach { (task, dependencies) -> - (task.dependencies as MutableSet).addAll( - dependencies.map { taskId -> - tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found") - } - ) - } - - // Create the entry iterator - iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) } - .sortedBy { it.start } - .iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() {} -} -- cgit v1.2.3