From cc9310efad6177909ff2f7415384d7c393383106 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 5 May 2021 23:14:56 +0200 Subject: chore: Address deprecations due to Kotlin 1.5 This change addresses the deprecations that were caused by the migration to Kotlin 1.5. --- .../src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt | 4 ---- 1 file changed, 4 deletions(-) (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt index 0d1f3cea..50ab652e 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt @@ -69,8 +69,6 @@ public class SwfTraceReader( var cores: Int var memory: Long var slicedWaitTime: Long - var flopsPerSecond: Long - var flopsPartialSlice: Long var runtimePartialSliceRemainder: Long BufferedReader(FileReader(file)).use { reader -> @@ -121,9 +119,7 @@ public class SwfTraceReader( // Insert run time slices - flopsPerSecond = 4_000L * cores runtimePartialSliceRemainder = runTime % sliceDuration - flopsPartialSlice = flopsPerSecond * runtimePartialSliceRemainder for ( tick in (submitTime + slicedWaitTime) -- cgit v1.2.3 From 9097811e0ac6872de3e4ff5f521d8859870b1000 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 8 Jun 2021 23:42:48 +0200 Subject: format: Add implementation of local Parquet InputFile This change adds an implementation of Parquet's local InputFile in order to eliminate the dependency on the entire Hadoop system. This implementation allows users to read Parquet files locally without needing a Parquet filesystem implementation. --- .../org/opendc/format/util/LocalInputFile.kt | 101 +++++++++++++++++++ .../org/opendc/format/util/LocalParquetReader.kt | 112 +++++++++++++++++++++ 2 files changed, 213 insertions(+) create mode 100644 opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt create mode 100644 opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt new file mode 100644 index 00000000..d8c25a62 --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.util + +import org.apache.parquet.io.InputFile +import org.apache.parquet.io.SeekableInputStream +import java.io.EOFException +import java.nio.ByteBuffer +import java.nio.channels.FileChannel +import java.nio.file.Path +import java.nio.file.StandardOpenOption + +/** + * An [InputFile] on the local filesystem. + */ +public class LocalInputFile(private val path: Path) : InputFile { + /** + * The [FileChannel] used for accessing the input path. + */ + private val channel = FileChannel.open(path, StandardOpenOption.READ) + + override fun getLength(): Long = channel.size() + + override fun newStream(): SeekableInputStream = object : SeekableInputStream() { + override fun read(buf: ByteBuffer): Int { + return channel.read(buf) + } + + override fun read(): Int { + val single = ByteBuffer.allocate(1) + var read: Int + + // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte + do { + read = channel.read(single) + } while (read == 0) + + return if (read == -1) { + read + } else { + single.get(0).toInt() and 0xff + } + } + + override fun getPos(): Long { + return channel.position() + } + + override fun seek(newPos: Long) { + channel.position(newPos) + } + + override fun readFully(bytes: ByteArray) { + readFully(ByteBuffer.wrap(bytes)) + } + + override fun readFully(bytes: ByteArray, start: Int, len: Int) { + readFully(ByteBuffer.wrap(bytes, start, len)) + } + + override fun readFully(buf: ByteBuffer) { + var remainder = buf.remaining() + while (remainder > 0) { + val read = channel.read(buf) + remainder -= read + + if (read == -1 && remainder > 0) { + throw EOFException() + } + } + } + + override fun close() { + channel.close() + } + + override fun toString(): String = "NioSeekableInputStream" + } + + override fun toString(): String = "LocalInputFile[path=$path]" +} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt new file mode 100644 index 00000000..5083f3e1 --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.util + +import org.apache.parquet.avro.AvroParquetReader +import org.apache.parquet.hadoop.ParquetReader +import org.apache.parquet.io.InputFile +import java.io.File +import java.io.IOException +import java.nio.file.Files +import java.nio.file.Path +import kotlin.io.path.isDirectory + +/** + * A helper class to read Parquet files. + * + * @param path The path to the Parquet file or directory to read. + */ +public class LocalParquetReader(path: Path) : AutoCloseable { + /** + * The input files to process. + */ + private val filesIterator = if (path.isDirectory()) + Files.list(path) + .filter { !it.isDirectory() } + .sorted() + .map { LocalInputFile(it) } + .iterator() + else + listOf(LocalInputFile(path)).iterator() + + /** + * The Parquet reader to use. + */ + private var reader: ParquetReader? = null + + /** + * Construct a [LocalParquetReader] for the specified [file]. + */ + public constructor(file: File) : this(file.toPath()) + + /** + * Read a single entry in the Parquet file. + */ + public fun read(): T? { + return try { + val next = reader?.read() + if (next != null) { + next + } else { + initReader() + + if (reader == null) + null + else + read() + } + } catch (e: InterruptedException) { + throw IOException(e) + } + } + + /** + * Close the Parquet reader. + */ + override fun close() { + reader?.close() + } + + /** + * Initialize the next reader. + */ + private fun initReader() { + reader?.close() + + this.reader = if (filesIterator.hasNext()) { + createReader(filesIterator.next()) + } else { + null + } + } + + /** + * Create a Parquet reader for the specified file. + */ + private fun createReader(input: InputFile): ParquetReader { + return AvroParquetReader + .builder(input) + .disableCompatibility() + .build() + } +} -- cgit v1.2.3 From 1b52a443e508bc4130071e67a1a8e17a6714c6b8 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 8 Jun 2021 23:46:07 +0200 Subject: format: Use LocalInputFile for Parquet reader This change updates the format implementations that use Parquet by switching to our InputFile implementation for local files, which eliminates the need for Hadoop's filesystem support. --- .../org/opendc/format/trace/wtf/WtfTraceReader.kt | 76 ++++++++++++---------- 1 file changed, 42 insertions(+), 34 deletions(-) (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index feadf61f..dde1b340 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -23,15 +23,16 @@ package org.opendc.format.trace.wtf import org.apache.avro.generic.GenericRecord -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetReader import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader +import org.opendc.format.util.LocalParquetReader import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.workflow.api.Job import org.opendc.workflow.api.Task import org.opendc.workflow.api.WORKFLOW_TASK_CORES import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE +import java.io.File +import java.nio.file.Path import java.util.UUID import kotlin.math.min @@ -41,12 +42,19 @@ import kotlin.math.min * * @param path The path to the trace. */ -public class WtfTraceReader(path: String) : TraceReader { +public class WtfTraceReader(path: Path) : TraceReader { /** * The internal iterator to use for this reader. */ private val iterator: Iterator> + /** + * Construct a [TraceReader] from the specified [path]. + * + * @param path The path to the trace. + */ + public constructor(path: File) : this(path.toPath()) + /** * Initialize the reader. */ @@ -56,43 +64,43 @@ public class WtfTraceReader(path: String) : TraceReader { val tasks = mutableMapOf() val taskDependencies = mutableMapOf>() - @Suppress("DEPRECATION") - val reader = AvroParquetReader.builder(Path(path, "tasks/schema-1.0")).build() + LocalParquetReader(path.resolve("tasks/schema-1.0")).use { reader -> + while (true) { + val nextRecord = reader.read() ?: break - while (true) { - val nextRecord = reader.read() ?: break + val workflowId = nextRecord.get("workflow_id") as Long + val taskId = nextRecord.get("id") as Long + val submitTime = nextRecord.get("ts_submit") as Long + val runtime = nextRecord.get("runtime") as Long + val cores = (nextRecord.get("resource_amount_requested") as Double).toInt() - val workflowId = nextRecord.get("workflow_id") as Long - val taskId = nextRecord.get("id") as Long - val submitTime = nextRecord.get("ts_submit") as Long - val runtime = nextRecord.get("runtime") as Long - val cores = (nextRecord.get("resource_amount_requested") as Double).toInt() - @Suppress("UNCHECKED_CAST") - val dependencies = (nextRecord.get("parents") as ArrayList).map { - it.get("item") as Long - } + @Suppress("UNCHECKED_CAST") + val dependencies = (nextRecord.get("parents") as ArrayList).map { + it.get("item") as Long + } - val flops: Long = 4100 * (runtime / 1000) * cores + val flops: Long = 4100 * (runtime / 1000) * cores - val workflow = workflows.getOrPut(workflowId) { - Job(UUID(0L, workflowId), "", HashSet()) - } - val workload = SimFlopsWorkload(flops) - val task = Task( - UUID(0L, taskId), - "", - HashSet(), - mapOf( - "workload" to workload, - WORKFLOW_TASK_CORES to cores, - WORKFLOW_TASK_DEADLINE to runtime + val workflow = workflows.getOrPut(workflowId) { + Job(UUID(0L, workflowId), "", HashSet()) + } + val workload = SimFlopsWorkload(flops) + val task = Task( + UUID(0L, taskId), + "", + HashSet(), + mapOf( + "workload" to workload, + WORKFLOW_TASK_CORES to cores, + WORKFLOW_TASK_DEADLINE to runtime + ) ) - ) - starts.merge(workflowId, submitTime, ::min) - (workflow.tasks as MutableSet).add(task) - tasks[taskId] = task - taskDependencies[task] = dependencies + starts.merge(workflowId, submitTime, ::min) + (workflow.tasks as MutableSet).add(task) + tasks[taskId] = task + taskDependencies[task] = dependencies + } } // Fix dependencies and dependents for all tasks -- cgit v1.2.3 From f0f59a0b98fe474da4411c0d5048ccdf4a2d7c43 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 9 Jun 2021 09:48:07 +0200 Subject: exp: Use LocalInputFile for Parquet readers This change updates the Parquet readers used in the Capelin experiments to use our InputFile implementation for local files, to reduce our dependency on Apache Hadoop. --- .../src/main/kotlin/org/opendc/format/util/LocalInputFile.kt | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt index d8c25a62..92319ace 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt @@ -25,6 +25,7 @@ package org.opendc.format.util import org.apache.parquet.io.InputFile import org.apache.parquet.io.SeekableInputStream import java.io.EOFException +import java.io.File import java.nio.ByteBuffer import java.nio.channels.FileChannel import java.nio.file.Path @@ -39,6 +40,11 @@ public class LocalInputFile(private val path: Path) : InputFile { */ private val channel = FileChannel.open(path, StandardOpenOption.READ) + /** + * Construct a [LocalInputFile] for the specified [file]. + */ + public constructor(file: File) : this(file.toPath()) + override fun getLength(): Long = channel.size() override fun newStream(): SeekableInputStream = object : SeekableInputStream() { -- cgit v1.2.3 From 2837ee439aef908b3fe281b9707dbb961e700f1c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 9 Jun 2021 12:35:44 +0200 Subject: format: Add implementation of local Parquet OutputFile This change adds an implementation of Parquet OutputFile for local files in order to eliminate the dependency on the entire Hadoop system. This implementation allows users to read Parquet files locally without needing a Parquet filesystem implementation. --- .../org/opendc/format/util/LocalOutputFile.kt | 95 ++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt new file mode 100644 index 00000000..657bca5a --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.util + +import org.apache.parquet.io.OutputFile +import org.apache.parquet.io.PositionOutputStream +import java.io.File +import java.io.OutputStream +import java.nio.file.Files +import java.nio.file.Path +import java.nio.file.StandardOpenOption + +/** + * An [OutputFile] on the local filesystem. + */ +public class LocalOutputFile(private val path: Path) : OutputFile { + /** + * Construct a [LocalOutputFile] from the specified [file] + */ + public constructor(file: File) : this(file.toPath()) + + override fun create(blockSizeHint: Long): PositionOutputStream { + val output = Files.newOutputStream(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE) + return NioPositionOutputStream(output) + } + + override fun createOrOverwrite(blockSizeHint: Long): PositionOutputStream { + val output = Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING) + return NioPositionOutputStream(output) + } + + override fun supportsBlockSize(): Boolean = false + + override fun defaultBlockSize(): Long = + throw UnsupportedOperationException("Local filesystem does not have default block size") + + override fun getPath(): String = path.toString() + + /** + * Implementation of [PositionOutputStream] for an [OutputStream]. + */ + private class NioPositionOutputStream(private val output: OutputStream) : PositionOutputStream() { + /** + * The current position in the file. + */ + private var _pos = 0L + + override fun getPos(): Long = _pos + + override fun write(b: Int) { + output.write(b) + _pos++ + } + + override fun write(b: ByteArray) { + output.write(b) + _pos += b.size + } + + override fun write(b: ByteArray, off: Int, len: Int) { + output.write(b, off, len) + _pos += len + } + + override fun flush() { + output.flush() + } + + override fun close() { + output.close() + } + + override fun toString(): String = "NioPositionOutputStream[output=$output]" + } +} -- cgit v1.2.3 From 81c3b51169cc5dfafb80abb0cf55abb49646a72a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 16 Jun 2021 16:14:39 +0200 Subject: exp: Fix power tracking for energy experiments This change fixes an issue where the power in the energy experiments is always reported as zero due to the changes in commit 652b869. --- .../org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt | 2 +- .../kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index cf90da68..bef6b742 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -106,7 +106,7 @@ public class Sc20ClusterEnvironmentReader( // For now we assume a simple linear load model with an idle draw of ~200W and a maximum // power draw of 350W. // Source: https://stackoverflow.com/questions/6128960 - LinearPowerModel(350.0, 200 / 350.0) + LinearPowerModel(350.0, idlePower = 200.0) ) ) } diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt index c6a19430..2fa1caab 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -84,7 +84,7 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja // For now we assume a simple linear load model with an idle draw of ~200W and a maximum // power draw of 350W. // Source: https://stackoverflow.com/questions/6128960 - LinearPowerModel(350.0, 200 / 350.0) + LinearPowerModel(350.0, idlePower = 200.0) ) } } -- cgit v1.2.3 From b29f90e5ad5bcac29cde86e56c06e0b65a52cedc Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 21 Jun 2021 20:57:06 +0200 Subject: simulator: Re-organize compute simulator module This change re-organizes the classes of the compute simulator module to make a clearer distinction between the hardware, firmware and software interfaces in this module. --- .../src/main/kotlin/org/opendc/format/environment/MachineDef.kt | 4 ++-- .../org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt | 4 ++-- .../opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt | 4 ++-- .../org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt index 9b799cc2..f65c4880 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt @@ -22,7 +22,7 @@ package org.opendc.format.environment -import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.power.PowerModel import java.util.* @@ -30,6 +30,6 @@ public data class MachineDef( val uid: UUID, val name: String, val meta: Map, - val model: SimMachineModel, + val model: MachineModel, val powerModel: PowerModel ) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index 1f080c2d..7780a98e 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -27,7 +27,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.MachineDef -import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit @@ -75,7 +75,7 @@ public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja UUID(0L, counter++.toLong()), "node-$counter", emptyMap(), - SimMachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))), + MachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))), ConstantPowerModel(0.0) ) } diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index bef6b742..1efd2ddf 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -24,7 +24,7 @@ package org.opendc.format.environment.sc20 import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.MachineDef -import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit @@ -97,7 +97,7 @@ public class Sc20ClusterEnvironmentReader( UUID(random.nextLong(), random.nextLong()), "node-$clusterId-$it", mapOf("cluster" to clusterId), - SimMachineModel( + MachineModel( List(coresPerHost) { coreId -> ProcessingUnit(unknownProcessingNode, coreId, speed) }, diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt index 2fa1caab..9b77702e 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -27,7 +27,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import org.opendc.format.environment.EnvironmentReader import org.opendc.format.environment.MachineDef -import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit @@ -80,7 +80,7 @@ public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = ja UUID(0L, counter++.toLong()), "node-$counter", emptyMap(), - SimMachineModel(cores, memories), + MachineModel(cores, memories), // For now we assume a simple linear load model with an idle draw of ~200W and a maximum // power draw of 350W. // Source: https://stackoverflow.com/questions/6128960 -- cgit v1.2.3 From be34a55c2c2fe94a6883c6b97d2abe4c43288e8a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 23 Jun 2021 16:54:31 +0200 Subject: format: Remove performance interference from trace readers This change updates the trace reader implementation to remove their dependency on the performance interference model. In a future commit, we will instead pass the performance interference model via the host/hypervisor. --- .../org/opendc/format/environment/sc20/Model.kt | 67 -------- .../sc20/Sc20ClusterEnvironmentReader.kt | 120 -------------- .../environment/sc20/Sc20EnvironmentReader.kt | 97 ----------- .../trace/PerformanceInterferenceModelReader.kt | 37 ----- .../kotlin/org/opendc/format/trace/TraceReader.kt | 4 +- .../org/opendc/format/trace/VmPlacementReader.kt | 37 ----- .../trace/sc20/PerformanceInterferenceEntry.kt | 7 - .../sc20/Sc20PerformanceInterferenceReader.kt | 65 -------- .../opendc/format/trace/sc20/Sc20TraceReader.kt | 181 --------------------- .../format/trace/sc20/Sc20VmPlacementReader.kt | 51 ------ 10 files changed, 1 insertion(+), 665 deletions(-) delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt deleted file mode 100644 index 58af8453..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Model.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment.sc20 - -import com.fasterxml.jackson.annotation.JsonSubTypes -import com.fasterxml.jackson.annotation.JsonTypeInfo - -/** - * A topology setup. - * - * @property name The name of the setup. - * @property rooms The rooms in the topology. - */ -internal data class Setup(val name: String, val rooms: List) - -/** - * A room in a topology. - * - * @property type The type of room in the topology. - * @property objects The objects in the room. - */ -internal data class Room(val type: String, val objects: List) - -/** - * An object in a [Room]. - * - * @property type The type of the room object. - */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") -@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)]) -internal sealed class RoomObject(val type: String) { - /** - * A rack in a server room. - * - * @property machines The machines in the rack. - */ - internal data class Rack(val machines: List) : RoomObject("RACK") -} - -/** - * A machine in the setup that consists of the specified CPU's represented as - * integer identifiers and ethernet speed. - * - * @property cpus The CPUs in the machine represented as integer identifiers. - * @property memories The memories in the machine represented as integer identifiers. - */ -internal data class Machine(val cpus: List, val memories: List) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt deleted file mode 100644 index 1efd2ddf..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment.sc20 - -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.MachineDef -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.power.LinearPowerModel -import java.io.File -import java.io.FileInputStream -import java.io.InputStream -import java.util.* - -/** - * A [EnvironmentReader] for the internal environment format. - * - * @param environmentFile The file describing the physical cluster. - */ -public class Sc20ClusterEnvironmentReader( - private val input: InputStream -) : EnvironmentReader { - - public constructor(file: File) : this(FileInputStream(file)) - - public override fun read(): List { - var clusterIdCol = 0 - var speedCol = 0 - var numberOfHostsCol = 0 - var memoryPerHostCol = 0 - var coresPerHostCol = 0 - - var clusterIdx: Int = 0 - var clusterId: String - var speed: Double - var numberOfHosts: Int - var memoryPerHost: Long - var coresPerHost: Int - - val nodes = mutableListOf() - val random = Random(0) - - input.bufferedReader().use { reader -> - reader.lineSequence() - .filter { line -> - // Ignore comments in the file - !line.startsWith("#") && line.isNotBlank() - } - .forEachIndexed { idx, line -> - val values = line.split(";") - - if (idx == 0) { - val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() - clusterIdCol = header["ClusterID"]!! - speedCol = header["Speed"]!! - numberOfHostsCol = header["numberOfHosts"]!! - memoryPerHostCol = header["memoryCapacityPerHost"]!! - coresPerHostCol = header["coreCountPerHost"]!! - return@forEachIndexed - } - - clusterIdx++ - clusterId = values[clusterIdCol].trim() - speed = values[speedCol].trim().toDouble() * 1000.0 - numberOfHosts = values[numberOfHostsCol].trim().toInt() - memoryPerHost = values[memoryPerHostCol].trim().toLong() * 1000L - coresPerHost = values[coresPerHostCol].trim().toInt() - - val unknownProcessingNode = ProcessingNode("unknown", "unknown", "unknown", coresPerHost) - val unknownMemoryUnit = MemoryUnit("unknown", "unknown", -1.0, memoryPerHost) - - repeat(numberOfHosts) { - nodes.add( - MachineDef( - UUID(random.nextLong(), random.nextLong()), - "node-$clusterId-$it", - mapOf("cluster" to clusterId), - MachineModel( - List(coresPerHost) { coreId -> - ProcessingUnit(unknownProcessingNode, coreId, speed) - }, - listOf(unknownMemoryUnit) - ), - // For now we assume a simple linear load model with an idle draw of ~200W and a maximum - // power draw of 350W. - // Source: https://stackoverflow.com/questions/6128960 - LinearPowerModel(350.0, idlePower = 200.0) - ) - ) - } - } - } - - return nodes - } - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt deleted file mode 100644 index 9b77702e..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment.sc20 - -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.MachineDef -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.power.LinearPowerModel -import java.io.InputStream -import java.util.* - -/** - * A parser for the JSON experiment setup files used for the SC20 paper. - * - * @param input The input stream to read from. - * @param mapper The Jackson object mapper to use. - */ -public class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader { - /** - * The environment that was read from the file. - */ - private val setup: Setup = mapper.readValue(input) - - /** - * Read the environment. - */ - public override fun read(): List { - var counter = 0 - return setup.rooms.flatMap { room -> - room.objects.flatMap { roomObject -> - when (roomObject) { - is RoomObject.Rack -> { - roomObject.machines.map { machine -> - val cores = machine.cpus.flatMap { id -> - when (id) { - 1 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) - List(node.coreCount) { ProcessingUnit(node, it, 4100.0) } - } - 2 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) - List(node.coreCount) { ProcessingUnit(node, it, 3500.0) } - } - else -> throw IllegalArgumentException("The cpu id $id is not recognized") - } - } - val memories = machine.memories.map { id -> - when (id) { - 1 -> MemoryUnit("Samsung", "PC DRAM K4A4G045WD", 1600.0, 4_000L) - else -> throw IllegalArgumentException("The cpu id $id is not recognized") - } - } - MachineDef( - UUID(0L, counter++.toLong()), - "node-$counter", - emptyMap(), - MachineModel(cores, memories), - // For now we assume a simple linear load model with an idle draw of ~200W and a maximum - // power draw of 350W. - // Source: https://stackoverflow.com/questions/6128960 - LinearPowerModel(350.0, idlePower = 200.0) - ) - } - } - } - } - } - } - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt deleted file mode 100644 index f30e64cf..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/PerformanceInterferenceModelReader.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace - -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import java.io.Closeable -import kotlin.random.Random - -/** - * An interface for reading descriptions of performance interference models into memory. - */ -public interface PerformanceInterferenceModelReader : Closeable { - /** - * Construct a [PerformanceInterferenceModel]. - */ - public fun construct(random: Random): Map -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt index 7df1acd3..797a88d5 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt @@ -22,8 +22,6 @@ package org.opendc.format.trace -import java.io.Closeable - /** * An interface for reading workloads into memory. * @@ -31,4 +29,4 @@ import java.io.Closeable * * @param T The shape of the workloads supported by this reader. */ -public interface TraceReader : Iterator>, Closeable +public interface TraceReader : Iterator>, AutoCloseable diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt deleted file mode 100644 index 6861affe..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/VmPlacementReader.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace - -import java.io.Closeable - -/** - * An interface for reading VM placement data into memory. - */ -public interface VmPlacementReader : Closeable { - /** - * Construct a map of VMs to clusters. - */ - public fun construct(): Map -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt deleted file mode 100644 index 0da1f7c2..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/PerformanceInterferenceEntry.kt +++ /dev/null @@ -1,7 +0,0 @@ -package org.opendc.format.trace.sc20 - -internal data class PerformanceInterferenceEntry( - val vms: List, - val minServerLoad: Double, - val performanceScore: Double -) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt deleted file mode 100644 index 4267737d..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20PerformanceInterferenceReader.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.sc20 - -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.trace.PerformanceInterferenceModelReader -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import java.io.InputStream -import java.util.* -import kotlin.random.Random - -/** - * A parser for the JSON performance interference setup files used for the SC20 paper. - * - * @param input The input stream to read from. - * @param mapper The Jackson object mapper to use. - */ -public class Sc20PerformanceInterferenceReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : - PerformanceInterferenceModelReader { - /** - * The computed value from the file. - */ - private val items: Map> - - init { - val entries: List = mapper.readValue(input) - val res = mutableMapOf>() - for (entry in entries) { - val item = PerformanceInterferenceModel.Item(TreeSet(entry.vms), entry.minServerLoad, entry.performanceScore) - for (workload in entry.vms) { - res.computeIfAbsent(workload) { TreeSet() }.add(item) - } - } - - items = res - } - - override fun construct(random: Random): Map { - return items.mapValues { PerformanceInterferenceModel(it.value, Random(random.nextInt())) } - } - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt deleted file mode 100644 index 1eb4bac2..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20TraceReader.kt +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.sc20 - -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import java.io.BufferedReader -import java.io.File -import java.io.FileReader -import java.util.* -import kotlin.math.max -import kotlin.math.min -import kotlin.random.Random - -/** - * A [TraceReader] for the internal VM workload trace format. - * - * @param traceDirectory The directory of the traces. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. - */ -public class Sc20TraceReader( - traceDirectory: File, - performanceInterferenceModel: PerformanceInterferenceModel, - selectedVms: List, - random: Random -) : TraceReader { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator> - - /** - * Initialize the reader. - */ - init { - val entries = mutableMapOf>() - - val timestampCol = 0 - val cpuUsageCol = 1 - val coreCol = 12 - val provisionedMemoryCol = 20 - val traceInterval = 5 * 60 * 1000L - - val vms = if (selectedVms.isEmpty()) { - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - } else { - selectedVms.map { - File(traceDirectory, it) - } - } - - vms - .forEachIndexed { idx, vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var timestamp: Long - var cores = -1 - var minTime = Long.MAX_VALUE - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .filter { line -> - // Ignore comments in the trace - !line.startsWith("#") && line.isNotBlank() - } - .forEach { line -> - val values = line.split(" ") - - vmId = vmFile.name - timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - cores = values[coreCol].trim().toInt() - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - } - } - - val flopsFragments = sequence { - var last: SimTraceWorkload.Fragment? = null - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split(" ") - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - - last = if (last != null && last!!.usage == 0.0 && cpuUsage == 0.0) { - val oldFragment = last!! - SimTraceWorkload.Fragment( - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = - SimTraceWorkload.Fragment(traceInterval, cpuUsage, cores) - if (last != null) { - yield(last!!) - } - fragment - } - } - } - - if (last != null) { - yield(last!!) - } - } - } - - val uuid = UUID(0, idx.toLong()) - - val relevantPerformanceInterferenceModelItems = - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId) }.toSortedSet(), - Random(random.nextInt()) - ) - val workload = SimTraceWorkload(flopsFragments.asSequence()) - entries[uuid] = TraceEntry( - uuid, - vmId, - minTime, - workload, - mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, - "cores" to cores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - } - - // Create the entry iterator - iterator = entries.values.sortedBy { it.start }.iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() {} -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt deleted file mode 100644 index 61bdea60..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/sc20/Sc20VmPlacementReader.kt +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.sc20 - -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.trace.VmPlacementReader -import java.io.InputStream - -/** - * A parser for the JSON VM placement data files used for the SC20 paper. - * - * @param input The input stream to read from. - * @param mapper The Jackson object mapper to use. - */ -public class Sc20VmPlacementReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : - VmPlacementReader { - /** - * The environment that was read from the file. - */ - private val placements = mapper.readValue>(input) - - override fun construct(): Map { - return placements - .mapKeys { "vm__workload__${it.key}.txt" } - .mapValues { it.value.split("/")[1] } // Clusters have format XX0 / X00 - } - - override fun close() {} -} -- cgit v1.2.3 From e56967a29ac2b2d26cc085b1f3e27096dad6a170 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 24 Jun 2021 12:54:52 +0200 Subject: simulator: Re-implement performance interference model This change updates reimplements the performance interference model to work on top of the universal resource model in `opendc-simulator-resources`. This enables us to model interference and performance variability of other resources such as disk or network in the future. --- .../opendc/format/trace/bitbrains/BitbrainsTraceReader.kt | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt index 769b2b13..aaf8a240 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -24,8 +24,6 @@ package org.opendc.format.trace.bitbrains import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.interference.IMAGE_PERF_INTERFERENCE_MODEL -import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload import java.io.BufferedReader @@ -38,12 +36,8 @@ import kotlin.math.min * A [TraceReader] for the public VM workload trace format. * * @param traceDirectory The directory of the traces. - * @param performanceInterferenceModel The performance model covering the workload in the VM trace. */ -public class BitbrainsTraceReader( - traceDirectory: File, - performanceInterferenceModel: PerformanceInterferenceModel -) : TraceReader { +public class BitbrainsTraceReader(traceDirectory: File) : TraceReader { /** * The internal iterator to use for this reader. */ @@ -123,12 +117,6 @@ public class BitbrainsTraceReader( val uuid = UUID(0L, vmId) - val relevantPerformanceInterferenceModelItems = - PerformanceInterferenceModel( - performanceInterferenceModel.items.filter { it.workloadNames.contains(vmId.toString()) } - .toSortedSet() - ) - val workload = SimTraceWorkload(flopsHistory.asSequence()) entries[vmId] = TraceEntry( uuid, @@ -136,7 +124,6 @@ public class BitbrainsTraceReader( startTime, workload, mapOf( - IMAGE_PERF_INTERFERENCE_MODEL to relevantPerformanceInterferenceModelItems, "cores" to cores, "required-memory" to requiredMemory, "workload" to workload -- cgit v1.2.3 From 31a1f298c71cd3203fdcd57bd39ba8813009dd5b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 17 Aug 2021 19:25:11 +0200 Subject: refactor(simulator): Execute traces based on timestamps This change refactors the trace workload in the OpenDC simulator to track execute a fragment based on the fragment's timestamp. This makes sure that the trace is replayed identically to the original execution. --- .../org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt | 7 +++++-- .../src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt | 5 ++++- 2 files changed, 9 insertions(+), 3 deletions(-) (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt index aaf8a240..cd8021fe 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -85,17 +85,19 @@ public class BitbrainsTraceReader(traceDirectory: File) : TraceReader 0) { flopsHistory.add( SimTraceWorkload.Fragment( + submitTime + slicedWaitTime + runTime, sliceDuration, runtimePartialSliceRemainder / sliceDuration.toDouble(), cores -- cgit v1.2.3 From a23ad09d5a1c4033781bd5403ad766cae83a2beb Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 18 Aug 2021 12:11:22 +0200 Subject: refactor(format): Clean up Bitbrains trace reader to enable re-use This change updates the code for the Bitbrains trace reader and upgrades the TraceConverter to re-use existing code of the Bitbrains trace reader. --- .../trace/bitbrains/BitbrainsRawTraceReader.kt | 100 +++++++++++++++++++++ .../format/trace/bitbrains/BitbrainsTraceReader.kt | 97 ++++++++------------ 2 files changed, 139 insertions(+), 58 deletions(-) create mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt new file mode 100644 index 00000000..ff6cdd02 --- /dev/null +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.format.trace.bitbrains + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.databind.MappingIterator +import com.fasterxml.jackson.dataformat.csv.CsvMapper +import com.fasterxml.jackson.dataformat.csv.CsvSchema +import java.io.InputStream + +/** + * A trace reader that enables the user to read Bitbrains specific trace data. + */ +public class BitbrainsRawTraceReader(input: InputStream) : Iterator, AutoCloseable { + /** + * The [CsvSchema] that is used to parse the trace. + */ + private val schema = CsvSchema.builder() + .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER) + .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .setAllowComments(true) + .setUseHeader(true) + .setColumnSeparator(';') + .build() + + /** + * The mapping iterator to use. + */ + private val iterator: MappingIterator = CsvMapper().readerFor(Entry::class.java).with(schema) + .readValues(input) + + override fun hasNext(): Boolean { + return iterator.hasNext() + } + + override fun next(): Entry { + return iterator.next() + } + + override fun close() { + iterator.close() + } + + /** + * A single entry in the trace. + */ + public data class Entry( + @JsonProperty("Timestamp [ms]") + val timestamp: Long, + @JsonProperty("CPU cores") + val cpuCores: Int, + @JsonProperty("CPU capacity provisioned [MHZ]") + val cpuCapacity: Double, + @JsonProperty("CPU usage [MHZ]") + val cpuUsage: Double, + @JsonProperty("CPU usage [%]") + val cpuUsagePct: Double, + @JsonProperty("Memory capacity provisioned [KB]") + val memCapacity: Double, + @JsonProperty("Memory usage [KB]") + val memUsage: Double, + @JsonProperty("Disk read throughput [KB/s]") + val diskRead: Double, + @JsonProperty("Disk write throughput [KB/s]") + val diskWrite: Double, + @JsonProperty("Network received throughput [KB/s]") + val netReceived: Double, + @JsonProperty("Network transmitted throughput [KB/s]") + val netTransmitted: Double + ) +} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt index cd8021fe..9e4876df 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt @@ -26,10 +26,10 @@ import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload -import java.io.BufferedReader import java.io.File -import java.io.FileReader +import java.io.FileInputStream import java.util.* +import kotlin.math.max import kotlin.math.min /** @@ -48,74 +48,55 @@ public class BitbrainsTraceReader(traceDirectory: File) : TraceReader>() - - var timestampCol = 0 - var coreCol = 0 - var cpuUsageCol = 0 - var provisionedMemoryCol = 0 val traceInterval = 5 * 60 * 1000L traceDirectory.walk() .filterNot { it.isDirectory } + .filter { it.extension == "csv" } .forEach { vmFile -> - println(vmFile) val flopsHistory = mutableListOf() var vmId = -1L - var cores = -1 - var requiredMemory = -1L - var startTime = -1L - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .filter { line -> - // Ignore comments in the trace - !line.startsWith("#") && line.isNotBlank() + var maxCores = Int.MIN_VALUE + var requiredMemory = Long.MIN_VALUE + var startTime = Long.MAX_VALUE + var lastTimestamp = Long.MIN_VALUE + + BitbrainsRawTraceReader(FileInputStream(vmFile)).use { reader -> + reader.forEach { entry -> + val timestamp = entry.timestamp * 1000L + val cpuUsage = entry.cpuUsage + vmId = vmFile.nameWithoutExtension.trim().toLong() + val cores = entry.cpuCores + maxCores = max(maxCores, cores) + requiredMemory = max(requiredMemory, (entry.memCapacity / 1000).toLong()) + + if (lastTimestamp < 0) { + lastTimestamp = timestamp - 5 * 60 * 1000L + startTime = min(startTime, lastTimestamp) } - .forEachIndexed { idx, line -> - val values = line.split(";\t") - - // Parse GWF header - if (idx == 0) { - val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() - timestampCol = header["Timestamp [ms]"]!! - coreCol = header["CPU cores"]!! - cpuUsageCol = header["CPU usage [MHZ]"]!! - provisionedMemoryCol = header["Memory capacity provisioned [KB]"]!! - return@forEachIndexed - } - vmId = vmFile.nameWithoutExtension.trim().toLong() - val timestamp = values[timestampCol].trim().toLong() - 5 * 60 - startTime = min(startTime, timestamp) - cores = values[coreCol].trim().toInt() - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - requiredMemory = (values[provisionedMemoryCol].trim().toDouble() / 1000).toLong() - - if (flopsHistory.isEmpty()) { - flopsHistory.add(SimTraceWorkload.Fragment(timestamp, traceInterval, cpuUsage, cores)) + if (flopsHistory.isEmpty()) { + flopsHistory.add(SimTraceWorkload.Fragment(lastTimestamp, traceInterval, cpuUsage, cores)) + } else { + val last = flopsHistory.last() + val duration = timestamp - lastTimestamp + // Perform run-length encoding + if (duration == 0L || last.usage == cpuUsage) { + flopsHistory[flopsHistory.size - 1] = last.copy(duration = last.duration + duration) } else { - if (flopsHistory.last().usage != cpuUsage) { - flopsHistory.add( - SimTraceWorkload.Fragment( - timestamp, - traceInterval, - cpuUsage, - cores - ) - ) - } else { - val oldFragment = flopsHistory.removeAt(flopsHistory.size - 1) - flopsHistory.add( - SimTraceWorkload.Fragment( - oldFragment.timestamp, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) + flopsHistory.add( + SimTraceWorkload.Fragment( + lastTimestamp, + duration, + cpuUsage, + cores ) - } + ) } } + + lastTimestamp = timestamp + } } val uuid = UUID(0L, vmId) @@ -127,7 +108,7 @@ public class BitbrainsTraceReader(traceDirectory: File) : TraceReader Date: Tue, 31 Aug 2021 14:56:08 +0200 Subject: refactor(trace): Move GWF trace reader into separate module This change starts the process of moving the different trace formats into separate modules. This change in particular moves the GWF trace format into a new module, opendc-trace-gwf. Furthermore, this change also implements the trace API for the GWF module. --- .../org/opendc/format/trace/gwf/GwfTraceReader.kt | 176 --------------------- 1 file changed, 176 deletions(-) delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt deleted file mode 100644 index e68afeb7..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/gwf/GwfTraceReader.kt +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.gwf - -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.workflow.api.Job -import org.opendc.workflow.api.Task -import org.opendc.workflow.api.WORKFLOW_TASK_CORES -import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE -import java.io.BufferedReader -import java.io.File -import java.io.InputStream -import java.util.* -import kotlin.collections.HashSet -import kotlin.collections.Iterator -import kotlin.collections.List -import kotlin.collections.MutableSet -import kotlin.collections.component1 -import kotlin.collections.component2 -import kotlin.collections.filter -import kotlin.collections.forEach -import kotlin.collections.getOrPut -import kotlin.collections.map -import kotlin.collections.mapIndexed -import kotlin.collections.mapOf -import kotlin.collections.mutableMapOf -import kotlin.collections.set -import kotlin.collections.sortedBy -import kotlin.collections.toMap -import kotlin.math.max -import kotlin.math.min - -/** - * A [TraceReader] for the Grid Workload Format. See the Grid Workloads Archive (http://gwa.ewi.tudelft.nl/) for more - * information about the format. - * - * Be aware that in the Grid Workload Format, workflows are not required to be ordered by submission time and therefore - * this reader needs to read the whole trace into memory before an entry can be read. Consider converting the trace to a - * different format for better performance. - * - * @param reader The buffered reader to read the trace with. - */ -public class GwfTraceReader(reader: BufferedReader) : TraceReader { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator> - - /** - * Create a [GwfTraceReader] instance from the specified [File]. - * - * @param file The file to read from. - */ - public constructor(file: File) : this(file.bufferedReader()) - - /** - * Create a [GwfTraceReader] instance from the specified [InputStream]. - * - * @param input The input stream to read from. - */ - public constructor(input: InputStream) : this(input.bufferedReader()) - - /** - * Initialize the reader. - */ - init { - val workflows = mutableMapOf() - val starts = mutableMapOf() - val tasks = mutableMapOf() - val taskDependencies = mutableMapOf>() - - var workflowIdCol = 0 - var taskIdCol = 0 - var submitTimeCol = 0 - var runtimeCol = 0 - var coreCol = 0 - var dependencyCol = 0 - - try { - reader.lineSequence() - .filter { line -> - // Ignore comments in the trace - !line.startsWith("#") && line.isNotBlank() - } - .forEachIndexed { idx, line -> - val values = line.split(",") - - // Parse GWF header - if (idx == 0) { - val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() - workflowIdCol = header["WorkflowID"]!! - taskIdCol = header["JobID"]!! - submitTimeCol = header["SubmitTime"]!! - runtimeCol = header["RunTime"]!! - coreCol = header["NProcs"]!! - dependencyCol = header["Dependencies"]!! - return@forEachIndexed - } - - val workflowId = values[workflowIdCol].trim().toLong() - val taskId = values[taskIdCol].trim().toLong() - val submitTime = values[submitTimeCol].trim().toLong() * 1000 // ms - val runtime = max(0, values[runtimeCol].trim().toLong()) // s - val cores = values[coreCol].trim().toInt() - val dependencies = values[dependencyCol].split(" ") - .filter { it.isNotEmpty() } - .map { it.trim().toLong() } - - val flops: Long = 4000 * runtime * cores - - val workflow = workflows.getOrPut(workflowId) { - Job(UUID(0L, workflowId), "", HashSet()) - } - val workload = SimFlopsWorkload(flops) - val task = Task( - UUID(0L, taskId), - "", - HashSet(), - mapOf( - "workload" to workload, - WORKFLOW_TASK_CORES to cores, - WORKFLOW_TASK_DEADLINE to (runtime * 1000) - ), - ) - starts.merge(workflowId, submitTime, ::min) - (workflow.tasks as MutableSet).add(task) - tasks[taskId] = task - taskDependencies[task] = dependencies - } - } finally { - reader.close() - } - - // Fix dependencies and dependents for all tasks - taskDependencies.forEach { (task, dependencies) -> - (task.dependencies as MutableSet).addAll( - dependencies.map { taskId -> - tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found") - } - ) - } - - // Create the entry iterator - iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) } - .sortedBy { it.start } - .iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() {} -} -- cgit v1.2.3 From e8cdfbcec3f75b3f303ce52bac5f5595a94555e4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 31 Aug 2021 15:14:46 +0200 Subject: refactor(trace): Extract Parquet helpers into separate module This change extracts the Parquet helpers outside format module into a new module, in order to improve re-usability of these helpers. --- .../org/opendc/format/trace/wtf/WtfTraceReader.kt | 2 +- .../org/opendc/format/util/LocalInputFile.kt | 107 -------------------- .../org/opendc/format/util/LocalOutputFile.kt | 95 ----------------- .../org/opendc/format/util/LocalParquetReader.kt | 112 --------------------- 4 files changed, 1 insertion(+), 315 deletions(-) delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt index dde1b340..e8e72f0e 100644 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt @@ -25,8 +25,8 @@ package org.opendc.format.trace.wtf import org.apache.avro.generic.GenericRecord import org.opendc.format.trace.TraceEntry import org.opendc.format.trace.TraceReader -import org.opendc.format.util.LocalParquetReader import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.trace.util.parquet.LocalParquetReader import org.opendc.workflow.api.Job import org.opendc.workflow.api.Task import org.opendc.workflow.api.WORKFLOW_TASK_CORES diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt deleted file mode 100644 index 92319ace..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalInputFile.kt +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.util - -import org.apache.parquet.io.InputFile -import org.apache.parquet.io.SeekableInputStream -import java.io.EOFException -import java.io.File -import java.nio.ByteBuffer -import java.nio.channels.FileChannel -import java.nio.file.Path -import java.nio.file.StandardOpenOption - -/** - * An [InputFile] on the local filesystem. - */ -public class LocalInputFile(private val path: Path) : InputFile { - /** - * The [FileChannel] used for accessing the input path. - */ - private val channel = FileChannel.open(path, StandardOpenOption.READ) - - /** - * Construct a [LocalInputFile] for the specified [file]. - */ - public constructor(file: File) : this(file.toPath()) - - override fun getLength(): Long = channel.size() - - override fun newStream(): SeekableInputStream = object : SeekableInputStream() { - override fun read(buf: ByteBuffer): Int { - return channel.read(buf) - } - - override fun read(): Int { - val single = ByteBuffer.allocate(1) - var read: Int - - // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte - do { - read = channel.read(single) - } while (read == 0) - - return if (read == -1) { - read - } else { - single.get(0).toInt() and 0xff - } - } - - override fun getPos(): Long { - return channel.position() - } - - override fun seek(newPos: Long) { - channel.position(newPos) - } - - override fun readFully(bytes: ByteArray) { - readFully(ByteBuffer.wrap(bytes)) - } - - override fun readFully(bytes: ByteArray, start: Int, len: Int) { - readFully(ByteBuffer.wrap(bytes, start, len)) - } - - override fun readFully(buf: ByteBuffer) { - var remainder = buf.remaining() - while (remainder > 0) { - val read = channel.read(buf) - remainder -= read - - if (read == -1 && remainder > 0) { - throw EOFException() - } - } - } - - override fun close() { - channel.close() - } - - override fun toString(): String = "NioSeekableInputStream" - } - - override fun toString(): String = "LocalInputFile[path=$path]" -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt deleted file mode 100644 index 657bca5a..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalOutputFile.kt +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.util - -import org.apache.parquet.io.OutputFile -import org.apache.parquet.io.PositionOutputStream -import java.io.File -import java.io.OutputStream -import java.nio.file.Files -import java.nio.file.Path -import java.nio.file.StandardOpenOption - -/** - * An [OutputFile] on the local filesystem. - */ -public class LocalOutputFile(private val path: Path) : OutputFile { - /** - * Construct a [LocalOutputFile] from the specified [file] - */ - public constructor(file: File) : this(file.toPath()) - - override fun create(blockSizeHint: Long): PositionOutputStream { - val output = Files.newOutputStream(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE) - return NioPositionOutputStream(output) - } - - override fun createOrOverwrite(blockSizeHint: Long): PositionOutputStream { - val output = Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING) - return NioPositionOutputStream(output) - } - - override fun supportsBlockSize(): Boolean = false - - override fun defaultBlockSize(): Long = - throw UnsupportedOperationException("Local filesystem does not have default block size") - - override fun getPath(): String = path.toString() - - /** - * Implementation of [PositionOutputStream] for an [OutputStream]. - */ - private class NioPositionOutputStream(private val output: OutputStream) : PositionOutputStream() { - /** - * The current position in the file. - */ - private var _pos = 0L - - override fun getPos(): Long = _pos - - override fun write(b: Int) { - output.write(b) - _pos++ - } - - override fun write(b: ByteArray) { - output.write(b) - _pos += b.size - } - - override fun write(b: ByteArray, off: Int, len: Int) { - output.write(b, off, len) - _pos += len - } - - override fun flush() { - output.flush() - } - - override fun close() { - output.close() - } - - override fun toString(): String = "NioPositionOutputStream[output=$output]" - } -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt deleted file mode 100644 index 5083f3e1..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/util/LocalParquetReader.kt +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.util - -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.hadoop.ParquetReader -import org.apache.parquet.io.InputFile -import java.io.File -import java.io.IOException -import java.nio.file.Files -import java.nio.file.Path -import kotlin.io.path.isDirectory - -/** - * A helper class to read Parquet files. - * - * @param path The path to the Parquet file or directory to read. - */ -public class LocalParquetReader(path: Path) : AutoCloseable { - /** - * The input files to process. - */ - private val filesIterator = if (path.isDirectory()) - Files.list(path) - .filter { !it.isDirectory() } - .sorted() - .map { LocalInputFile(it) } - .iterator() - else - listOf(LocalInputFile(path)).iterator() - - /** - * The Parquet reader to use. - */ - private var reader: ParquetReader? = null - - /** - * Construct a [LocalParquetReader] for the specified [file]. - */ - public constructor(file: File) : this(file.toPath()) - - /** - * Read a single entry in the Parquet file. - */ - public fun read(): T? { - return try { - val next = reader?.read() - if (next != null) { - next - } else { - initReader() - - if (reader == null) - null - else - read() - } - } catch (e: InterruptedException) { - throw IOException(e) - } - } - - /** - * Close the Parquet reader. - */ - override fun close() { - reader?.close() - } - - /** - * Initialize the next reader. - */ - private fun initReader() { - reader?.close() - - this.reader = if (filesIterator.hasNext()) { - createReader(filesIterator.next()) - } else { - null - } - } - - /** - * Create a Parquet reader for the specified file. - */ - private fun createReader(input: InputFile): ParquetReader { - return AvroParquetReader - .builder(input) - .disableCompatibility() - .build() - } -} -- cgit v1.2.3 From 9fcce6ade8714f7f0a9073fe5b7ddd3f0b35c375 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 31 Aug 2021 15:44:47 +0200 Subject: refactor(format): Remove environment reader from format library This change removes the environment reader from the format library since they are highly specific for the particular experiment. In the future, we hope to have a single format to setup the entire datacenter (perhaps similar to the format used by the web runner). --- .../opendc/format/environment/EnvironmentReader.kt | 35 --------- .../org/opendc/format/environment/MachineDef.kt | 35 --------- .../org/opendc/format/environment/sc18/Model.kt | 66 ---------------- .../environment/sc18/Sc18EnvironmentReader.kt | 89 ---------------------- 4 files changed, 225 deletions(-) delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt deleted file mode 100644 index 97d6f239..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/EnvironmentReader.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment - -import java.io.Closeable - -/** - * An interface for reading descriptions of topology environments into memory. - */ -public interface EnvironmentReader : Closeable { - /** - * Read the environment into a list. - */ - public fun read(): List -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt deleted file mode 100644 index f65c4880..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/MachineDef.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment - -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.power.PowerModel -import java.util.* - -public data class MachineDef( - val uid: UUID, - val name: String, - val meta: Map, - val model: MachineModel, - val powerModel: PowerModel -) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt deleted file mode 100644 index c313467f..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Model.kt +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment.sc18 - -import com.fasterxml.jackson.annotation.JsonSubTypes -import com.fasterxml.jackson.annotation.JsonTypeInfo - -/** - * A topology setup. - * - * @property name The name of the setup. - * @property rooms The rooms in the topology. - */ -internal data class Setup(val name: String, val rooms: List) - -/** - * A room in a topology. - * - * @property type The type of room in the topology. - * @property objects The objects in the room. - */ -internal data class Room(val type: String, val objects: List) - -/** - * An object in a [Room]. - * - * @property type The type of the room object. - */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") -@JsonSubTypes(value = [JsonSubTypes.Type(name = "RACK", value = RoomObject.Rack::class)]) -internal sealed class RoomObject(val type: String) { - /** - * A rack in a server room. - * - * @property machines The machines in the rack. - */ - internal data class Rack(val machines: List) : RoomObject("RACK") -} - -/** - * A machine in the setup that consists of the specified CPU's represented as - * integer identifiers and ethernet speed. - * - * @property cpus The CPUs in the machine represented as integer identifiers. - */ -internal data class Machine(val cpus: List) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt deleted file mode 100644 index 7780a98e..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.environment.sc18 - -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import org.opendc.format.environment.EnvironmentReader -import org.opendc.format.environment.MachineDef -import org.opendc.simulator.compute.model.MachineModel -import org.opendc.simulator.compute.model.MemoryUnit -import org.opendc.simulator.compute.model.ProcessingNode -import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.power.ConstantPowerModel -import java.io.InputStream -import java.util.* - -/** - * A parser for the JSON experiment setup files used for the SC18 paper: "A Reference Architecture for Topology - * Schedulers". - * - * @param input The input stream to read from. - * @param mapper The Jackson object mapper to use. - */ -public class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonObjectMapper()) : EnvironmentReader { - /** - * The environment that was read from the file. - */ - private val setup: Setup = mapper.readValue(input) - - /** - * Read the environment. - */ - public override fun read(): List { - var counter = 0 - return setup.rooms.flatMap { room -> - room.objects.flatMap { roomObject -> - when (roomObject) { - is RoomObject.Rack -> { - roomObject.machines.map { machine -> - val cores = machine.cpus.flatMap { id -> - when (id) { - 1 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 4) - List(node.coreCount) { ProcessingUnit(node, it, 4100.0) } - } - 2 -> { - val node = ProcessingNode("Intel", "Core(TM) i7-6920HQ", "amd64", 2) - List(node.coreCount) { ProcessingUnit(node, it, 3500.0) } - } - else -> throw IllegalArgumentException("The cpu id $id is not recognized") - } - } - MachineDef( - UUID(0L, counter++.toLong()), - "node-$counter", - emptyMap(), - MachineModel(cores, listOf(MemoryUnit("", "", 2300.0, 16000))), - ConstantPowerModel(0.0) - ) - } - } - } - } - } - } - - override fun close() {} -} -- cgit v1.2.3 From 214480d154771f0b783829b6e5ec82b837304ad2 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 31 Aug 2021 16:18:56 +0200 Subject: refactor(trace): Move Bitbrains format into separate module This change moves Bitbrains trace support into a separate module and adds support for the new trace api. --- .../trace/bitbrains/BitbrainsRawTraceReader.kt | 100 ---------------- .../format/trace/bitbrains/BitbrainsTraceReader.kt | 127 --------------------- 2 files changed, 227 deletions(-) delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt deleted file mode 100644 index ff6cdd02..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsRawTraceReader.kt +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.bitbrains - -import com.fasterxml.jackson.annotation.JsonProperty -import com.fasterxml.jackson.databind.MappingIterator -import com.fasterxml.jackson.dataformat.csv.CsvMapper -import com.fasterxml.jackson.dataformat.csv.CsvSchema -import java.io.InputStream - -/** - * A trace reader that enables the user to read Bitbrains specific trace data. - */ -public class BitbrainsRawTraceReader(input: InputStream) : Iterator, AutoCloseable { - /** - * The [CsvSchema] that is used to parse the trace. - */ - private val schema = CsvSchema.builder() - .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER) - .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .setAllowComments(true) - .setUseHeader(true) - .setColumnSeparator(';') - .build() - - /** - * The mapping iterator to use. - */ - private val iterator: MappingIterator = CsvMapper().readerFor(Entry::class.java).with(schema) - .readValues(input) - - override fun hasNext(): Boolean { - return iterator.hasNext() - } - - override fun next(): Entry { - return iterator.next() - } - - override fun close() { - iterator.close() - } - - /** - * A single entry in the trace. - */ - public data class Entry( - @JsonProperty("Timestamp [ms]") - val timestamp: Long, - @JsonProperty("CPU cores") - val cpuCores: Int, - @JsonProperty("CPU capacity provisioned [MHZ]") - val cpuCapacity: Double, - @JsonProperty("CPU usage [MHZ]") - val cpuUsage: Double, - @JsonProperty("CPU usage [%]") - val cpuUsagePct: Double, - @JsonProperty("Memory capacity provisioned [KB]") - val memCapacity: Double, - @JsonProperty("Memory usage [KB]") - val memUsage: Double, - @JsonProperty("Disk read throughput [KB/s]") - val diskRead: Double, - @JsonProperty("Disk write throughput [KB/s]") - val diskWrite: Double, - @JsonProperty("Network received throughput [KB/s]") - val netReceived: Double, - @JsonProperty("Network transmitted throughput [KB/s]") - val netTransmitted: Double - ) -} diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt deleted file mode 100644 index 9e4876df..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/bitbrains/BitbrainsTraceReader.kt +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.bitbrains - -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import java.io.File -import java.io.FileInputStream -import java.util.* -import kotlin.math.max -import kotlin.math.min - -/** - * A [TraceReader] for the public VM workload trace format. - * - * @param traceDirectory The directory of the traces. - */ -public class BitbrainsTraceReader(traceDirectory: File) : TraceReader { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator> - - /** - * Initialize the reader. - */ - init { - val entries = mutableMapOf>() - val traceInterval = 5 * 60 * 1000L - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" } - .forEach { vmFile -> - val flopsHistory = mutableListOf() - var vmId = -1L - var maxCores = Int.MIN_VALUE - var requiredMemory = Long.MIN_VALUE - var startTime = Long.MAX_VALUE - var lastTimestamp = Long.MIN_VALUE - - BitbrainsRawTraceReader(FileInputStream(vmFile)).use { reader -> - reader.forEach { entry -> - val timestamp = entry.timestamp * 1000L - val cpuUsage = entry.cpuUsage - vmId = vmFile.nameWithoutExtension.trim().toLong() - val cores = entry.cpuCores - maxCores = max(maxCores, cores) - requiredMemory = max(requiredMemory, (entry.memCapacity / 1000).toLong()) - - if (lastTimestamp < 0) { - lastTimestamp = timestamp - 5 * 60 * 1000L - startTime = min(startTime, lastTimestamp) - } - - if (flopsHistory.isEmpty()) { - flopsHistory.add(SimTraceWorkload.Fragment(lastTimestamp, traceInterval, cpuUsage, cores)) - } else { - val last = flopsHistory.last() - val duration = timestamp - lastTimestamp - // Perform run-length encoding - if (duration == 0L || last.usage == cpuUsage) { - flopsHistory[flopsHistory.size - 1] = last.copy(duration = last.duration + duration) - } else { - flopsHistory.add( - SimTraceWorkload.Fragment( - lastTimestamp, - duration, - cpuUsage, - cores - ) - ) - } - } - - lastTimestamp = timestamp - } - } - - val uuid = UUID(0L, vmId) - - val workload = SimTraceWorkload(flopsHistory.asSequence()) - entries[vmId] = TraceEntry( - uuid, - vmId.toString(), - startTime, - workload, - mapOf( - "cores" to maxCores, - "required-memory" to requiredMemory, - "workload" to workload - ) - ) - } - - // Create the entry iterator - iterator = entries.values.sortedBy { it.start }.iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() {} -} -- cgit v1.2.3 From 5c6bf9739aa0ffd9651df4fcb4cd46a8545144f0 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 31 Aug 2021 18:08:14 +0200 Subject: refactor(trace): Implement trace API for SWF reader This change updates the SWF trace reader to support the new streaming trace API. --- .../org/opendc/format/trace/swf/SwfTraceReader.kt | 176 --------------------- 1 file changed, 176 deletions(-) delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt deleted file mode 100644 index bda392a9..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/swf/SwfTraceReader.kt +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.swf - -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.workload.SimTraceWorkload -import org.opendc.simulator.compute.workload.SimWorkload -import java.io.BufferedReader -import java.io.File -import java.io.FileReader -import java.util.* - -/** - * A [TraceReader] for reading SWF traces into VM-modeled workloads. - * - * The standard is defined by the PWA, see here: https://www.cse.huji.ac.il/labs/parallel/workload/swf.html - * - * @param file The trace file. - */ -public class SwfTraceReader( - file: File, - maxNumCores: Int = -1 -) : TraceReader { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator> - - /** - * Initialize the reader. - */ - init { - val entries = mutableMapOf>() - - val jobNumberCol = 0 - val submitTimeCol = 1 // seconds (begin of trace is 0) - val waitTimeCol = 2 // seconds - val runTimeCol = 3 // seconds - val numAllocatedCoresCol = 4 // We assume that single-core processors were used at the time - val requestedMemoryCol = 9 // KB per processor/core (-1 if not specified) - - val sliceDuration = 5 * 60L - - var jobNumber: Long - var submitTime: Long - var waitTime: Long - var runTime: Long - var cores: Int - var memory: Long - var slicedWaitTime: Long - var runtimePartialSliceRemainder: Long - - BufferedReader(FileReader(file)).use { reader -> - reader.lineSequence() - .filter { line -> - // Ignore comments in the trace - !line.startsWith(";") && line.isNotBlank() - } - .forEach { line -> - val values = line.trim().split("\\s+".toRegex()) - - jobNumber = values[jobNumberCol].trim().toLong() - submitTime = values[submitTimeCol].trim().toLong() - waitTime = values[waitTimeCol].trim().toLong() - runTime = values[runTimeCol].trim().toLong() - cores = values[numAllocatedCoresCol].trim().toInt() - memory = values[requestedMemoryCol].trim().toLong() - - if (maxNumCores != -1 && cores > maxNumCores) { - println("Skipped a task due to processor count ($cores > $maxNumCores).") - return@forEach - } - - if (memory == -1L) { - memory = 1000L * cores // assume 1GB of memory per processor if not specified - } else { - memory /= 1000 // convert KB to MB - } - - val flopsHistory = mutableListOf() - - // Insert waiting time slices - - // We ignore wait time remainders under one - slicedWaitTime = 0L - if (waitTime >= sliceDuration) { - for (tick in submitTime until (submitTime + waitTime - sliceDuration) step sliceDuration) { - flopsHistory.add( - SimTraceWorkload.Fragment( - tick, - sliceDuration * 1000, - 0.0, - cores - ) - ) - slicedWaitTime += sliceDuration - } - } - - // Insert run time slices - - runtimePartialSliceRemainder = runTime % sliceDuration - - for ( - tick in (submitTime + slicedWaitTime) - until (submitTime + slicedWaitTime + runTime - sliceDuration) - step sliceDuration - ) { - flopsHistory.add( - SimTraceWorkload.Fragment( - tick, - sliceDuration * 1000L, - 1.0, - cores - ) - ) - } - - if (runtimePartialSliceRemainder > 0) { - flopsHistory.add( - SimTraceWorkload.Fragment( - submitTime + slicedWaitTime + runTime, - sliceDuration, - runtimePartialSliceRemainder / sliceDuration.toDouble(), - cores - ) - ) - } - - val uuid = UUID(0L, jobNumber) - val workload = SimTraceWorkload(flopsHistory.asSequence()) - entries[jobNumber] = TraceEntry( - uuid, - jobNumber.toString(), - submitTime, - workload, - mapOf( - "cores" to cores, - "required-memory" to memory, - "workload" to workload - ) - ) - } - } - - // Create the entry iterator - iterator = entries.values.sortedBy { it.start }.iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() {} -} -- cgit v1.2.3 From b2308e1077dc60ec6a4dc646613a4be5b59695a6 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 1 Sep 2021 11:29:27 +0200 Subject: refactor(trace): Implement trace API for WTF reader This change updates the WTF trace reader to support the new streaming trace API. --- .../kotlin/org/opendc/format/trace/TraceEntry.kt | 44 ------- .../kotlin/org/opendc/format/trace/TraceReader.kt | 32 ------ .../org/opendc/format/trace/wtf/WtfTraceReader.kt | 126 --------------------- 3 files changed, 202 deletions(-) delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt delete mode 100644 opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt (limited to 'opendc-format/src/main') diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt deleted file mode 100644 index 3ce79d69..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceEntry.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace - -import java.util.UUID - -/** - * An entry in a workload trace. - * - * @param uid The unique identifier of the entry. - * @param name The name of the entry. - * @param start The start time of the workload. - * @param workload The workload of the entry. - * @param meta The meta-data associated with the workload. - */ -public data class TraceEntry( - val uid: UUID, - val name: String, - val start: Long, - val workload: T, - val meta: Map -) diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt deleted file mode 100644 index 797a88d5..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/TraceReader.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace - -/** - * An interface for reading workloads into memory. - * - * This interface must guarantee that the entries are delivered in order of submission time. - * - * @param T The shape of the workloads supported by this reader. - */ -public interface TraceReader : Iterator>, AutoCloseable diff --git a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt b/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt deleted file mode 100644 index e8e72f0e..00000000 --- a/opendc-format/src/main/kotlin/org/opendc/format/trace/wtf/WtfTraceReader.kt +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.format.trace.wtf - -import org.apache.avro.generic.GenericRecord -import org.opendc.format.trace.TraceEntry -import org.opendc.format.trace.TraceReader -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.trace.util.parquet.LocalParquetReader -import org.opendc.workflow.api.Job -import org.opendc.workflow.api.Task -import org.opendc.workflow.api.WORKFLOW_TASK_CORES -import org.opendc.workflow.api.WORKFLOW_TASK_DEADLINE -import java.io.File -import java.nio.file.Path -import java.util.UUID -import kotlin.math.min - -/** - * A [TraceReader] for the Workflow Trace Format (WTF). See the Workflow Trace Archive - * (https://wta.atlarge-research.com/) for more information about the format. - * - * @param path The path to the trace. - */ -public class WtfTraceReader(path: Path) : TraceReader { - /** - * The internal iterator to use for this reader. - */ - private val iterator: Iterator> - - /** - * Construct a [TraceReader] from the specified [path]. - * - * @param path The path to the trace. - */ - public constructor(path: File) : this(path.toPath()) - - /** - * Initialize the reader. - */ - init { - val workflows = mutableMapOf() - val starts = mutableMapOf() - val tasks = mutableMapOf() - val taskDependencies = mutableMapOf>() - - LocalParquetReader(path.resolve("tasks/schema-1.0")).use { reader -> - while (true) { - val nextRecord = reader.read() ?: break - - val workflowId = nextRecord.get("workflow_id") as Long - val taskId = nextRecord.get("id") as Long - val submitTime = nextRecord.get("ts_submit") as Long - val runtime = nextRecord.get("runtime") as Long - val cores = (nextRecord.get("resource_amount_requested") as Double).toInt() - - @Suppress("UNCHECKED_CAST") - val dependencies = (nextRecord.get("parents") as ArrayList).map { - it.get("item") as Long - } - - val flops: Long = 4100 * (runtime / 1000) * cores - - val workflow = workflows.getOrPut(workflowId) { - Job(UUID(0L, workflowId), "", HashSet()) - } - val workload = SimFlopsWorkload(flops) - val task = Task( - UUID(0L, taskId), - "", - HashSet(), - mapOf( - "workload" to workload, - WORKFLOW_TASK_CORES to cores, - WORKFLOW_TASK_DEADLINE to runtime - ) - ) - - starts.merge(workflowId, submitTime, ::min) - (workflow.tasks as MutableSet).add(task) - tasks[taskId] = task - taskDependencies[task] = dependencies - } - } - - // Fix dependencies and dependents for all tasks - taskDependencies.forEach { (task, dependencies) -> - (task.dependencies as MutableSet).addAll( - dependencies.map { taskId -> - tasks[taskId] ?: throw IllegalArgumentException("Dependency task with id $taskId not found") - } - ) - } - - // Create the entry iterator - iterator = workflows.map { (id, job) -> TraceEntry(job.uid, job.name, starts.getValue(id), job, job.metadata) } - .sortedBy { it.start } - .iterator() - } - - override fun hasNext(): Boolean = iterator.hasNext() - - override fun next(): TraceEntry = iterator.next() - - override fun close() {} -} -- cgit v1.2.3