From fff89d25bd3c7b874e68261d21695c473c30ed7d Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Tue, 16 Apr 2024 09:29:53 +0200 Subject: =?UTF-8?q?Revamped=20the=20trace=20system.=20All=20TraceFormat=20?= =?UTF-8?q?files=20are=20now=20in=20the=20api=20m=E2=80=A6=20(#216)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Revamped the trace system. All TraceFormat files are now in the api module. This fixes some problems with not being able to use types of traces * applied spotless --- .../formats/azure/AzureResourceStateTableReader.kt | 219 +++++++++++++ .../formats/azure/AzureResourceTableReader.kt | 246 ++++++++++++++ .../opendc/trace/formats/azure/AzureTraceFormat.kt | 147 +++++++++ .../BitbrainsExResourceStateTableReader.kt | 292 +++++++++++++++++ .../formats/bitbrains/BitbrainsExTraceFormat.kt | 135 ++++++++ .../bitbrains/BitbrainsResourceStateTableReader.kt | 365 +++++++++++++++++++++ .../bitbrains/BitbrainsResourceTableReader.kt | 175 ++++++++++ .../formats/bitbrains/BitbrainsTraceFormat.kt | 159 +++++++++ .../opendc/trace/formats/gwf/GwfTaskTableReader.kt | 286 ++++++++++++++++ .../org/opendc/trace/formats/gwf/GwfTraceFormat.kt | 104 ++++++ .../opendc/OdcVmInterferenceJsonTableReader.kt | 225 +++++++++++++ .../opendc/OdcVmInterferenceJsonTableWriter.kt | 192 +++++++++++ .../opendc/OdcVmResourceStateTableReader.kt | 166 ++++++++++ .../opendc/OdcVmResourceStateTableWriter.kt | 209 ++++++++++++ .../formats/opendc/OdcVmResourceTableReader.kt | 168 ++++++++++ .../formats/opendc/OdcVmResourceTableWriter.kt | 197 +++++++++++ .../trace/formats/opendc/OdcVmTraceFormat.kt | 190 +++++++++++ .../trace/formats/opendc/parquet/Resource.kt | 37 +++ .../formats/opendc/parquet/ResourceReadSupport.kt | 159 +++++++++ .../opendc/parquet/ResourceRecordMaterializer.kt | 127 +++++++ .../trace/formats/opendc/parquet/ResourceState.kt | 34 ++ .../opendc/parquet/ResourceStateReadSupport.kt | 149 +++++++++ .../parquet/ResourceStateRecordMaterializer.kt | 114 +++++++ .../opendc/parquet/ResourceStateWriteSupport.kt | 112 +++++++ .../formats/opendc/parquet/ResourceWriteSupport.kt | 121 +++++++ .../opendc/trace/formats/swf/SwfTaskTableReader.kt | 236 +++++++++++++ .../org/opendc/trace/formats/swf/SwfTraceFormat.kt | 100 ++++++ .../formats/wfformat/WfFormatTaskTableReader.kt | 314 ++++++++++++++++++ .../trace/formats/wfformat/WfFormatTraceFormat.kt | 95 ++++++ .../opendc/trace/formats/wtf/WtfTaskTableReader.kt | 187 +++++++++++ .../org/opendc/trace/formats/wtf/WtfTraceFormat.kt | 102 ++++++ .../org/opendc/trace/formats/wtf/parquet/Task.kt | 42 +++ .../trace/formats/wtf/parquet/TaskReadSupport.kt | 148 +++++++++ .../formats/wtf/parquet/TaskRecordMaterializer.kt | 188 +++++++++++ .../kotlin/org/opendc/trace/spi/TraceFormat.kt | 29 +- 35 files changed, 5767 insertions(+), 2 deletions(-) create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceStateTableReader.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExResourceStateTableReader.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExTraceFormat.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceStateTableReader.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceTableReader.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsTraceFormat.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTaskTableReader.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTraceFormat.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableReader.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableWriter.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableReader.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableWriter.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceState.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateWriteSupport.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTaskTableReader.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTraceFormat.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTaskTableReader.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTraceFormat.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTaskTableReader.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTraceFormat.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/Task.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskReadSupport.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskRecordMaterializer.kt (limited to 'opendc-trace/opendc-trace-api/src/main') diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceStateTableReader.kt new file mode 100644 index 00000000..bcf6ff52 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceStateTableReader.kt @@ -0,0 +1,219 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.azure + +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.dataformat.csv.CsvParser +import com.fasterxml.jackson.dataformat.csv.CsvSchema +import org.opendc.trace.TableReader +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceStateCpuUsagePct +import org.opendc.trace.conv.resourceStateTimestamp +import java.time.Duration +import java.time.Instant +import java.util.UUID + +/** + * A [TableReader] for the Azure v1 VM resource state table. + */ +internal class AzureResourceStateTableReader(private val parser: CsvParser) : TableReader { + /** + * A flag to indicate whether a single row has been read already. + */ + private var isStarted = false + + init { + parser.schema = schema + } + + override fun nextRow(): Boolean { + if (!isStarted) { + isStarted = true + } + + reset() + + if (!nextStart()) { + return false + } + + while (true) { + val token = parser.nextValue() + + if (token == null || token == JsonToken.END_OBJECT) { + break + } + + when (parser.currentName) { + "timestamp" -> timestamp = Instant.ofEpochSecond(parser.longValue) + "vm id" -> id = parser.text + "CPU avg cpu" -> cpuUsagePct = (parser.doubleValue / 100.0) // Convert from % to [0, 1] + } + } + + return true + } + + private val colID = 0 + private val colTimestamp = 1 + private val colCpuUsagePct = 2 + + override fun resolve(name: String): Int { + return when (name) { + resourceID -> colID + resourceStateTimestamp -> colTimestamp + resourceStateCpuUsagePct -> colCpuUsagePct + else -> -1 + } + } + + override fun isNull(index: Int): Boolean { + require(index in 0..colCpuUsagePct) { "Invalid column index" } + return false + } + + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + throw IllegalArgumentException("Invalid column") + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + checkActive() + return when (index) { + colCpuUsagePct -> cpuUsagePct + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getString(index: Int): String? { + checkActive() + return when (index) { + colID -> id + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant? { + checkActive() + return when (index) { + colTimestamp -> timestamp + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column") + } + + override fun getList( + index: Int, + elementType: Class, + ): List? { + throw IllegalArgumentException("Invalid column") + } + + override fun getMap( + index: Int, + keyType: Class, + valueType: Class, + ): Map? { + throw IllegalArgumentException("Invalid column") + } + + override fun getSet( + index: Int, + elementType: Class, + ): Set? { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + parser.close() + } + + /** + * Helper method to check if the reader is active. + */ + private fun checkActive() { + check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } + } + + /** + * Advance the parser until the next object start. + */ + private fun nextStart(): Boolean { + var token = parser.nextValue() + + while (token != null && token != JsonToken.START_OBJECT) { + token = parser.nextValue() + } + + return token != null + } + + /** + * State fields of the reader. + */ + private var id: String? = null + private var timestamp: Instant? = null + private var cpuUsagePct = Double.NaN + + /** + * Reset the state. + */ + private fun reset() { + id = null + timestamp = null + cpuUsagePct = Double.NaN + } + + companion object { + /** + * The [CsvSchema] that is used to parse the trace. + */ + private val schema = + CsvSchema.builder() + .addColumn("timestamp", CsvSchema.ColumnType.NUMBER) + .addColumn("vm id", CsvSchema.ColumnType.STRING) + .addColumn("CPU min cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU max cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU avg cpu", CsvSchema.ColumnType.NUMBER) + .setAllowComments(true) + .build() + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt new file mode 100644 index 00000000..d86a0466 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureResourceTableReader.kt @@ -0,0 +1,246 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.azure + +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.dataformat.csv.CsvParser +import com.fasterxml.jackson.dataformat.csv.CsvSchema +import org.opendc.trace.TableReader +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStartTime +import org.opendc.trace.conv.resourceStopTime +import java.time.Duration +import java.time.Instant +import java.util.UUID + +/** + * A [TableReader] for the Azure v1 VM resources table. + */ +internal class AzureResourceTableReader(private val parser: CsvParser) : TableReader { + /** + * A flag to indicate whether a single row has been read already. + */ + private var isStarted = false + + init { + parser.schema = schema + } + + override fun nextRow(): Boolean { + if (!isStarted) { + isStarted = true + } + + reset() + + if (!nextStart()) { + return false + } + + while (true) { + val token = parser.nextValue() + + if (token == null || token == JsonToken.END_OBJECT) { + break + } + + when (parser.currentName) { + "vm id" -> id = parser.text + "timestamp vm created" -> startTime = Instant.ofEpochSecond(parser.longValue) + "timestamp vm deleted" -> stopTime = Instant.ofEpochSecond(parser.longValue) + "vm virtual core count" -> cpuCores = parser.intValue + "vm memory" -> memCapacity = parser.doubleValue * 1e6 // GB to KB + } + } + + return true + } + + private val colID = 0 + private val colStartTime = 1 + private val colStopTime = 2 + private val colCpuCount = 3 + private val colMemCapacity = 4 + + override fun resolve(name: String): Int { + return when (name) { + resourceID -> colID + resourceStartTime -> colStartTime + resourceStopTime -> colStopTime + resourceCpuCount -> colCpuCount + resourceMemCapacity -> colMemCapacity + else -> -1 + } + } + + override fun isNull(index: Int): Boolean { + require(index in 0..colMemCapacity) { "Invalid column index" } + return false + } + + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + checkActive() + return when (index) { + colCpuCount -> cpuCores + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(index: Int): Long { + checkActive() + return when (index) { + colCpuCount -> cpuCores.toLong() + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + checkActive() + return when (index) { + colMemCapacity -> memCapacity + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getString(index: Int): String? { + checkActive() + return when (index) { + colID -> id + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant? { + checkActive() + return when (index) { + colStartTime -> startTime + colStopTime -> stopTime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column") + } + + override fun getList( + index: Int, + elementType: Class, + ): List? { + throw IllegalArgumentException("Invalid column") + } + + override fun getSet( + index: Int, + elementType: Class, + ): Set? { + throw IllegalArgumentException("Invalid column") + } + + override fun getMap( + index: Int, + keyType: Class, + valueType: Class, + ): Map? { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + parser.close() + } + + /** + * Helper method to check if the reader is active. + */ + private fun checkActive() { + check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } + } + + /** + * Advance the parser until the next object start. + */ + private fun nextStart(): Boolean { + var token = parser.nextValue() + + while (token != null && token != JsonToken.START_OBJECT) { + token = parser.nextValue() + } + + return token != null + } + + /** + * State fields of the reader. + */ + private var id: String? = null + private var startTime: Instant? = null + private var stopTime: Instant? = null + private var cpuCores = -1 + private var memCapacity = Double.NaN + + /** + * Reset the state. + */ + private fun reset() { + id = null + startTime = null + stopTime = null + cpuCores = -1 + memCapacity = Double.NaN + } + + companion object { + /** + * The [CsvSchema] that is used to parse the trace. + */ + private val schema = + CsvSchema.builder() + .addColumn("vm id", CsvSchema.ColumnType.NUMBER) + .addColumn("subscription id", CsvSchema.ColumnType.STRING) + .addColumn("deployment id", CsvSchema.ColumnType.NUMBER) + .addColumn("timestamp vm created", CsvSchema.ColumnType.NUMBER) + .addColumn("timestamp vm deleted", CsvSchema.ColumnType.NUMBER) + .addColumn("max cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("avg cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("p95 cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("vm category", CsvSchema.ColumnType.NUMBER) + .addColumn("vm virtual core count", CsvSchema.ColumnType.NUMBER) + .addColumn("vm memory", CsvSchema.ColumnType.NUMBER) + .setAllowComments(true) + .build() + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt new file mode 100644 index 00000000..a75da9d9 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/azure/AzureTraceFormat.kt @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.azure + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter +import org.opendc.trace.conv.TABLE_RESOURCES +import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStartTime +import org.opendc.trace.conv.resourceStateCpuUsagePct +import org.opendc.trace.conv.resourceStateTimestamp +import org.opendc.trace.conv.resourceStopTime +import org.opendc.trace.spi.TableDetails +import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.CompositeTableReader +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import java.util.zip.GZIPInputStream +import kotlin.io.path.inputStream +import kotlin.io.path.name + +/** + * A format implementation for the Azure v1 format. + */ +public class AzureTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "azure" + + /** + * The [CsvFactory] used to create the parser. + */ + private val factory = + CsvFactory() + .enable(CsvParser.Feature.ALLOW_COMMENTS) + .enable(CsvParser.Feature.TRIM_SPACES) + + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + + override fun getTables(path: Path): List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun getDetails( + path: Path, + table: String, + ): TableDetails { + return when (table) { + TABLE_RESOURCES -> + TableDetails( + listOf( + TableColumn(resourceID, TableColumnType.String), + TableColumn(resourceStartTime, TableColumnType.Instant), + TableColumn(resourceStopTime, TableColumnType.Instant), + TableColumn(resourceCpuCount, TableColumnType.Int), + TableColumn(resourceMemCapacity, TableColumnType.Double), + ), + ) + TABLE_RESOURCE_STATES -> + TableDetails( + listOf( + TableColumn(resourceID, TableColumnType.String), + TableColumn(resourceStateTimestamp, TableColumnType.Instant), + TableColumn(resourceStateCpuUsagePct, TableColumnType.Double), + ), + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader( + path: Path, + table: String, + projection: List?, + ): TableReader { + return when (table) { + TABLE_RESOURCES -> { + val stream = GZIPInputStream(path.resolve("vmtable/vmtable.csv.gz").inputStream()) + AzureResourceTableReader(factory.createParser(stream)) + } + TABLE_RESOURCE_STATES -> newResourceStateReader(path) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter( + path: Path, + table: String, + ): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } + + /** + * Construct a [TableReader] for reading over all VM CPU readings. + */ + private fun newResourceStateReader(path: Path): TableReader { + val partitions = + Files.walk(path.resolve("vm_cpu_readings"), 1) + .filter { !Files.isDirectory(it) && it.name.endsWith(".csv.gz") } + .collect(Collectors.toMap({ it.name.removeSuffix(".csv.gz") }, { it })) + .toSortedMap() + val it = partitions.iterator() + + return object : CompositeTableReader() { + override fun nextReader(): TableReader? { + return if (it.hasNext()) { + val (_, partPath) = it.next() + val stream = GZIPInputStream(partPath.inputStream()) + return AzureResourceStateTableReader(factory.createParser(stream)) + } else { + null + } + } + + override fun toString(): String = "AzureCompositeTableReader" + } + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExResourceStateTableReader.kt new file mode 100644 index 00000000..8387d1ed --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExResourceStateTableReader.kt @@ -0,0 +1,292 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import org.opendc.trace.TableReader +import org.opendc.trace.conv.resourceClusterID +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStateCpuDemand +import org.opendc.trace.conv.resourceStateCpuReadyPct +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateCpuUsagePct +import org.opendc.trace.conv.resourceStateDiskRead +import org.opendc.trace.conv.resourceStateDiskWrite +import org.opendc.trace.conv.resourceStateTimestamp +import java.io.BufferedReader +import java.time.Duration +import java.time.Instant +import java.util.UUID + +/** + * A [TableReader] for the Bitbrains resource state table. + */ +internal class BitbrainsExResourceStateTableReader(private val reader: BufferedReader) : TableReader { + private var state = State.Pending + + override fun nextRow(): Boolean { + val state = state + if (state == State.Closed) { + return false + } else if (state == State.Pending) { + this.state = State.Active + } + + reset() + + var line: String? + var num = 0 + + while (true) { + line = reader.readLine() + + if (line == null) { + this.state = State.Closed + return false + } + + num++ + + if (line[0] == '#' || line.isBlank()) { + // Ignore empty lines or comments + continue + } + + break + } + + line = line!!.trim() + + val length = line.length + var col = 0 + var start: Int + var end = 0 + + while (end < length) { + // Trim all whitespace before the field + start = end + while (start < length && line[start].isWhitespace()) { + start++ + } + + end = line.indexOf(' ', start) + + if (end < 0) { + end = length + } + + val field = line.subSequence(start, end) as String + when (col++) { + colTimestamp -> timestamp = Instant.ofEpochSecond(field.toLong(10)) + colCpuUsage -> cpuUsage = field.toDouble() + colCpuDemand -> cpuDemand = field.toDouble() + colDiskRead -> diskRead = field.toDouble() + colDiskWrite -> diskWrite = field.toDouble() + colClusterID -> cluster = field.trim() + colNcpus -> cpuCores = field.toInt(10) + colCpuReadyPct -> cpuReadyPct = field.toDouble() + colPoweredOn -> poweredOn = field.toInt(10) == 1 + colCpuCapacity -> cpuCapacity = field.toDouble() + colID -> id = field.trim() + colMemCapacity -> memCapacity = field.toDouble() * 1000 // Convert from MB to KB + } + } + + return true + } + + override fun resolve(name: String): Int { + return when (name) { + resourceID -> colID + resourceClusterID -> colClusterID + resourceStateTimestamp -> colTimestamp + resourceCpuCount -> colNcpus + resourceCpuCapacity -> colCpuCapacity + resourceStateCpuUsage -> colCpuUsage + resourceStateCpuUsagePct -> colCpuUsagePct + resourceStateCpuDemand -> colCpuDemand + resourceStateCpuReadyPct -> colCpuReadyPct + resourceMemCapacity -> colMemCapacity + resourceStateDiskRead -> colDiskRead + resourceStateDiskWrite -> colDiskWrite + else -> -1 + } + } + + override fun isNull(index: Int): Boolean { + require(index in 0 until colMax) { "Invalid column index" } + return false + } + + override fun getBoolean(index: Int): Boolean { + check(state == State.Active) { "No active row" } + return when (index) { + colPoweredOn -> poweredOn + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getInt(index: Int): Int { + check(state == State.Active) { "No active row" } + return when (index) { + colNcpus -> cpuCores + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + check(state == State.Active) { "No active row" } + return when (index) { + colCpuCapacity -> cpuCapacity + colCpuUsage -> cpuUsage + colCpuUsagePct -> cpuUsage / cpuCapacity + colCpuReadyPct -> cpuReadyPct + colCpuDemand -> cpuDemand + colMemCapacity -> memCapacity + colDiskRead -> diskRead + colDiskWrite -> diskWrite + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getString(index: Int): String? { + check(state == State.Active) { "No active row" } + return when (index) { + colID -> id + colClusterID -> cluster + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant? { + check(state == State.Active) { "No active row" } + return when (index) { + colTimestamp -> timestamp + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column") + } + + override fun getList( + index: Int, + elementType: Class, + ): List? { + throw IllegalArgumentException("Invalid column") + } + + override fun getSet( + index: Int, + elementType: Class, + ): Set? { + throw IllegalArgumentException("Invalid column") + } + + override fun getMap( + index: Int, + keyType: Class, + valueType: Class, + ): Map? { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + reader.close() + reset() + state = State.Closed + } + + /** + * State fields of the reader. + */ + private var id: String? = null + private var cluster: String? = null + private var timestamp: Instant? = null + private var cpuCores = -1 + private var cpuCapacity = Double.NaN + private var cpuUsage = Double.NaN + private var cpuDemand = Double.NaN + private var cpuReadyPct = Double.NaN + private var memCapacity = Double.NaN + private var diskRead = Double.NaN + private var diskWrite = Double.NaN + private var poweredOn: Boolean = false + + /** + * Reset the state of the reader. + */ + private fun reset() { + id = null + timestamp = null + cluster = null + cpuCores = -1 + cpuCapacity = Double.NaN + cpuUsage = Double.NaN + cpuDemand = Double.NaN + cpuReadyPct = Double.NaN + memCapacity = Double.NaN + diskRead = Double.NaN + diskWrite = Double.NaN + poweredOn = false + } + + /** + * Default column indices for the extended Bitbrains format. + */ + private val colTimestamp = 0 + private val colCpuUsage = 1 + private val colCpuDemand = 2 + private val colDiskRead = 4 + private val colDiskWrite = 6 + private val colClusterID = 10 + private val colNcpus = 12 + private val colCpuReadyPct = 13 + private val colPoweredOn = 14 + private val colCpuCapacity = 18 + private val colID = 19 + private val colMemCapacity = 20 + private val colCpuUsagePct = 21 + private val colMax = colCpuUsagePct + 1 + + private enum class State { + Pending, + Active, + Closed, + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExTraceFormat.kt new file mode 100644 index 00000000..6115953f --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsExTraceFormat.kt @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter +import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceClusterID +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStateCpuDemand +import org.opendc.trace.conv.resourceStateCpuReadyPct +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateCpuUsagePct +import org.opendc.trace.conv.resourceStateDiskRead +import org.opendc.trace.conv.resourceStateDiskWrite +import org.opendc.trace.conv.resourceStateTimestamp +import org.opendc.trace.spi.TableDetails +import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.CompositeTableReader +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.bufferedReader +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension + +/** + * A format implementation for the extended Bitbrains trace format. + */ +public class BitbrainsExTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "bitbrains-ex" + + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + + override fun getTables(path: Path): List = listOf(TABLE_RESOURCE_STATES) + + override fun getDetails( + path: Path, + table: String, + ): TableDetails { + return when (table) { + TABLE_RESOURCE_STATES -> + TableDetails( + listOf( + TableColumn(resourceID, TableColumnType.String), + TableColumn(resourceClusterID, TableColumnType.String), + TableColumn(resourceStateTimestamp, TableColumnType.Instant), + TableColumn(resourceCpuCount, TableColumnType.Int), + TableColumn(resourceCpuCapacity, TableColumnType.Double), + TableColumn(resourceStateCpuUsage, TableColumnType.Double), + TableColumn(resourceStateCpuUsagePct, TableColumnType.Double), + TableColumn(resourceStateCpuDemand, TableColumnType.Double), + TableColumn(resourceStateCpuReadyPct, TableColumnType.Double), + TableColumn(resourceMemCapacity, TableColumnType.Double), + TableColumn(resourceStateDiskRead, TableColumnType.Double), + TableColumn(resourceStateDiskWrite, TableColumnType.Double), + ), + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader( + path: Path, + table: String, + projection: List?, + ): TableReader { + return when (table) { + TABLE_RESOURCE_STATES -> newResourceStateReader(path) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter( + path: Path, + table: String, + ): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } + + /** + * Construct a [TableReader] for reading over all resource state partitions. + */ + private fun newResourceStateReader(path: Path): TableReader { + val partitions = + Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "txt" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + val it = partitions.iterator() + + return object : CompositeTableReader() { + override fun nextReader(): TableReader? { + return if (it.hasNext()) { + val (_, partPath) = it.next() + return BitbrainsExResourceStateTableReader(partPath.bufferedReader()) + } else { + null + } + } + + override fun toString(): String = "BitbrainsExCompositeTableReader" + } + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceStateTableReader.kt new file mode 100644 index 00000000..e264fccb --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceStateTableReader.kt @@ -0,0 +1,365 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import com.fasterxml.jackson.core.JsonParseException +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.dataformat.csv.CsvParser +import com.fasterxml.jackson.dataformat.csv.CsvSchema +import org.opendc.trace.TableReader +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateCpuUsagePct +import org.opendc.trace.conv.resourceStateDiskRead +import org.opendc.trace.conv.resourceStateDiskWrite +import org.opendc.trace.conv.resourceStateMemUsage +import org.opendc.trace.conv.resourceStateNetRx +import org.opendc.trace.conv.resourceStateNetTx +import org.opendc.trace.conv.resourceStateTimestamp +import java.text.NumberFormat +import java.time.Duration +import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneOffset +import java.time.format.DateTimeFormatter +import java.time.format.DateTimeParseException +import java.util.Locale +import java.util.UUID + +/** + * A [TableReader] for the Bitbrains resource state table. + */ +internal class BitbrainsResourceStateTableReader(private val partition: String, private val parser: CsvParser) : TableReader { + /** + * A flag to indicate whether a single row has been read already. + */ + private var isStarted = false + + /** + * The [DateTimeFormatter] used to parse the timestamps in case of the Materna trace. + */ + private val formatter = DateTimeFormatter.ofPattern("dd.MM.yyyy HH:mm:ss") + + /** + * The type of timestamps in the trace. + */ + private var timestampType: TimestampType = TimestampType.UNDECIDED + + /** + * The [NumberFormat] used to parse doubles containing a comma. + */ + private val nf = NumberFormat.getInstance(Locale.GERMAN) + + /** + * A flag to indicate that the trace contains decimals with a comma separator. + */ + private var usesCommaDecimalSeparator = false + + init { + parser.schema = schema + } + + override fun nextRow(): Boolean { + if (!isStarted) { + isStarted = true + } + + // Reset the row state + reset() + + if (!nextStart()) { + return false + } + + while (true) { + val token = parser.nextValue() + + if (token == null || token == JsonToken.END_OBJECT) { + break + } + + when (parser.currentName) { + "Timestamp [ms]" -> { + timestamp = + when (timestampType) { + TimestampType.UNDECIDED -> { + try { + val res = LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC) + timestampType = TimestampType.DATE_TIME + res + } catch (e: DateTimeParseException) { + timestampType = TimestampType.EPOCH_MILLIS + Instant.ofEpochSecond(parser.longValue) + } + } + TimestampType.DATE_TIME -> LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC) + TimestampType.EPOCH_MILLIS -> Instant.ofEpochSecond(parser.longValue) + } + } + "CPU cores" -> cpuCores = parser.intValue + "CPU capacity provisioned [MHZ]" -> cpuCapacity = parseSafeDouble() + "CPU usage [MHZ]" -> cpuUsage = parseSafeDouble() + "CPU usage [%]" -> cpuUsagePct = parseSafeDouble() / 100.0 // Convert to range [0, 1] + "Memory capacity provisioned [KB]" -> memCapacity = parseSafeDouble() + "Memory usage [KB]" -> memUsage = parseSafeDouble() + "Disk read throughput [KB/s]" -> diskRead = parseSafeDouble() + "Disk write throughput [KB/s]" -> diskWrite = parseSafeDouble() + "Network received throughput [KB/s]" -> netReceived = parseSafeDouble() + "Network transmitted throughput [KB/s]" -> netTransmitted = parseSafeDouble() + } + } + + return true + } + + private val colTimestamp = 0 + private val colCpuCount = 1 + private val colCpuCapacity = 2 + private val colCpuUsage = 3 + private val colCpuUsagePct = 4 + private val colMemCapacity = 5 + private val colMemUsage = 6 + private val colDiskRead = 7 + private val colDiskWrite = 8 + private val colNetRx = 9 + private val colNetTx = 10 + private val colID = 11 + + override fun resolve(name: String): Int { + return when (name) { + resourceID -> colID + resourceStateTimestamp -> colTimestamp + resourceCpuCount -> colCpuCount + resourceCpuCapacity -> colCpuCapacity + resourceStateCpuUsage -> colCpuUsage + resourceStateCpuUsagePct -> colCpuUsagePct + resourceMemCapacity -> colMemCapacity + resourceStateMemUsage -> colMemUsage + resourceStateDiskRead -> colDiskRead + resourceStateDiskWrite -> colDiskWrite + resourceStateNetRx -> colNetRx + resourceStateNetTx -> colNetTx + else -> -1 + } + } + + override fun isNull(index: Int): Boolean { + require(index in 0..colID) { "Invalid column index" } + return false + } + + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + checkActive() + return when (index) { + colCpuCount -> cpuCores + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + checkActive() + return when (index) { + colCpuCapacity -> cpuCapacity + colCpuUsage -> cpuUsage + colCpuUsagePct -> cpuUsagePct + colMemCapacity -> memCapacity + colMemUsage -> memUsage + colDiskRead -> diskRead + colDiskWrite -> diskWrite + colNetRx -> netReceived + colNetTx -> netTransmitted + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getString(index: Int): String { + checkActive() + return when (index) { + colID -> partition + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant? { + checkActive() + return when (index) { + colTimestamp -> timestamp + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column") + } + + override fun getList( + index: Int, + elementType: Class, + ): List? { + throw IllegalArgumentException("Invalid column") + } + + override fun getSet( + index: Int, + elementType: Class, + ): Set? { + throw IllegalArgumentException("Invalid column") + } + + override fun getMap( + index: Int, + keyType: Class, + valueType: Class, + ): Map? { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + parser.close() + } + + /** + * Helper method to check if the reader is active. + */ + private fun checkActive() { + check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } + } + + /** + * Advance the parser until the next object start. + */ + private fun nextStart(): Boolean { + var token = parser.nextValue() + + while (token != null && token != JsonToken.START_OBJECT) { + token = parser.nextValue() + } + + return token != null + } + + /** + * Try to parse the current value safely as double. + */ + private fun parseSafeDouble(): Double { + if (!usesCommaDecimalSeparator) { + try { + return parser.doubleValue + } catch (e: JsonParseException) { + usesCommaDecimalSeparator = true + } + } + + val text = parser.text + if (text.isBlank()) { + return 0.0 + } + + return nf.parse(text).toDouble() + } + + /** + * State fields of the reader. + */ + private var timestamp: Instant? = null + private var cpuCores = -1 + private var cpuCapacity = Double.NaN + private var cpuUsage = Double.NaN + private var cpuUsagePct = Double.NaN + private var memCapacity = Double.NaN + private var memUsage = Double.NaN + private var diskRead = Double.NaN + private var diskWrite = Double.NaN + private var netReceived = Double.NaN + private var netTransmitted = Double.NaN + + /** + * Reset the state. + */ + private fun reset() { + timestamp = null + cpuCores = -1 + cpuCapacity = Double.NaN + cpuUsage = Double.NaN + cpuUsagePct = Double.NaN + memCapacity = Double.NaN + memUsage = Double.NaN + diskRead = Double.NaN + diskWrite = Double.NaN + netReceived = Double.NaN + netTransmitted = Double.NaN + } + + /** + * The type of the timestamp in the trace. + */ + private enum class TimestampType { + UNDECIDED, + DATE_TIME, + EPOCH_MILLIS, + } + + companion object { + /** + * The [CsvSchema] that is used to parse the trace. + */ + private val schema = + CsvSchema.builder() + .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER_OR_STRING) + .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory usage [%]", CsvSchema.ColumnType.NUMBER) + .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Disk size [GB]", CsvSchema.ColumnType.NUMBER) + .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .setAllowComments(true) + .setUseHeader(true) + .setColumnSeparator(';') + .build() + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceTableReader.kt new file mode 100644 index 00000000..a12785f0 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsResourceTableReader.kt @@ -0,0 +1,175 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.TableReader +import org.opendc.trace.conv.resourceID +import java.nio.file.Path +import java.time.Duration +import java.time.Instant +import java.util.UUID + +/** + * A [TableReader] for the Bitbrains resource table. + */ +internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms: Map) : TableReader { + /** + * An iterator to iterate over the resource entries. + */ + private val it = vms.iterator() + + /** + * The state of the reader. + */ + private var state = State.Pending + + override fun nextRow(): Boolean { + if (state == State.Pending) { + state = State.Active + } + + reset() + + while (it.hasNext()) { + val (name, path) = it.next() + + val parser = factory.createParser(path.toFile()) + val reader = BitbrainsResourceStateTableReader(name, parser) + val idCol = reader.resolve(resourceID) + + try { + if (!reader.nextRow()) { + continue + } + + id = reader.getString(idCol) + return true + } finally { + reader.close() + } + } + + state = State.Closed + return false + } + + private val colID = 0 + + override fun resolve(name: String): Int { + return when (name) { + resourceID -> colID + else -> -1 + } + } + + override fun isNull(index: Int): Boolean { + require(index in 0..colID) { "Invalid column index" } + return false + } + + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + throw IllegalArgumentException("Invalid column") + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun getString(index: Int): String? { + check(state == State.Active) { "No active row" } + return when (index) { + colID -> id + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant? { + throw IllegalArgumentException("Invalid column") + } + + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column") + } + + override fun getList( + index: Int, + elementType: Class, + ): List? { + throw IllegalArgumentException("Invalid column") + } + + override fun getSet( + index: Int, + elementType: Class, + ): Set? { + throw IllegalArgumentException("Invalid column") + } + + override fun getMap( + index: Int, + keyType: Class, + valueType: Class, + ): Map? { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + reset() + state = State.Closed + } + + /** + * State fields of the reader. + */ + private var id: String? = null + + /** + * Reset the state of the reader. + */ + private fun reset() { + id = null + } + + private enum class State { + Pending, + Active, + Closed, + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsTraceFormat.kt new file mode 100644 index 00000000..23853077 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/bitbrains/BitbrainsTraceFormat.kt @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.bitbrains + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter +import org.opendc.trace.conv.TABLE_RESOURCES +import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateCpuUsagePct +import org.opendc.trace.conv.resourceStateDiskRead +import org.opendc.trace.conv.resourceStateDiskWrite +import org.opendc.trace.conv.resourceStateMemUsage +import org.opendc.trace.conv.resourceStateNetRx +import org.opendc.trace.conv.resourceStateNetTx +import org.opendc.trace.conv.resourceStateTimestamp +import org.opendc.trace.spi.TableDetails +import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.CompositeTableReader +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension + +/** + * A format implementation for the GWF trace format. + */ +public class BitbrainsTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "bitbrains" + + /** + * The [CsvFactory] used to create the parser. + */ + private val factory = + CsvFactory() + .enable(CsvParser.Feature.ALLOW_COMMENTS) + .enable(CsvParser.Feature.TRIM_SPACES) + + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + + override fun getTables(path: Path): List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun getDetails( + path: Path, + table: String, + ): TableDetails { + return when (table) { + TABLE_RESOURCES -> + TableDetails( + listOf( + TableColumn(resourceID, TableColumnType.String), + ), + ) + TABLE_RESOURCE_STATES -> + TableDetails( + listOf( + TableColumn(resourceID, TableColumnType.String), + TableColumn(resourceStateTimestamp, TableColumnType.Instant), + TableColumn(resourceCpuCount, TableColumnType.Int), + TableColumn(resourceCpuCapacity, TableColumnType.Double), + TableColumn(resourceStateCpuUsage, TableColumnType.Double), + TableColumn(resourceStateCpuUsagePct, TableColumnType.Double), + TableColumn(resourceMemCapacity, TableColumnType.Double), + TableColumn(resourceStateMemUsage, TableColumnType.Double), + TableColumn(resourceStateDiskRead, TableColumnType.Double), + TableColumn(resourceStateDiskWrite, TableColumnType.Double), + TableColumn(resourceStateNetRx, TableColumnType.Double), + TableColumn(resourceStateNetTx, TableColumnType.Double), + ), + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader( + path: Path, + table: String, + projection: List?, + ): TableReader { + return when (table) { + TABLE_RESOURCES -> { + val vms = + Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + BitbrainsResourceTableReader(factory, vms) + } + TABLE_RESOURCE_STATES -> newResourceStateReader(path) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter( + path: Path, + table: String, + ): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } + + /** + * Construct a [TableReader] for reading over all resource state partitions. + */ + private fun newResourceStateReader(path: Path): TableReader { + val partitions = + Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + val it = partitions.iterator() + + return object : CompositeTableReader() { + override fun nextReader(): TableReader? { + return if (it.hasNext()) { + val (partition, partPath) = it.next() + return BitbrainsResourceStateTableReader(partition, factory.createParser(partPath.toFile())) + } else { + null + } + } + + override fun toString(): String = "BitbrainsCompositeTableReader" + } + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTaskTableReader.kt new file mode 100644 index 00000000..8a2a99cb --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTaskTableReader.kt @@ -0,0 +1,286 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.gwf + +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.dataformat.csv.CsvParser +import com.fasterxml.jackson.dataformat.csv.CsvSchema +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader +import org.opendc.trace.conv.TASK_ALLOC_NCPUS +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.conv.TASK_PARENTS +import org.opendc.trace.conv.TASK_REQ_NCPUS +import org.opendc.trace.conv.TASK_RUNTIME +import org.opendc.trace.conv.TASK_SUBMIT_TIME +import org.opendc.trace.conv.TASK_WORKFLOW_ID +import org.opendc.trace.util.convertTo +import java.time.Duration +import java.time.Instant +import java.util.UUID +import java.util.regex.Pattern + +/** + * A [TableReader] implementation for the GWF format. + */ +internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { + /** + * A flag to indicate whether a single row has been read already. + */ + private var isStarted = false + + init { + parser.schema = schema + } + + override fun nextRow(): Boolean { + if (!isStarted) { + isStarted = true + } + + // Reset the row state + reset() + + if (parser.isClosed || !nextStart()) { + return false + } + + while (true) { + val token = parser.nextValue() + + if (token == null || token == JsonToken.END_OBJECT) { + break + } + + when (parser.currentName) { + "WorkflowID" -> workflowId = parser.text + "JobID" -> jobId = parser.text + "SubmitTime" -> submitTime = Instant.ofEpochSecond(parser.longValue) + "RunTime" -> runtime = Duration.ofSeconds(parser.longValue) + "NProcs" -> nProcs = parser.intValue + "ReqNProcs" -> reqNProcs = parser.intValue + "Dependencies" -> dependencies = parseParents(parser.valueAsString) + } + } + + return true + } + + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> colJobID + TASK_WORKFLOW_ID -> colWorkflowID + TASK_SUBMIT_TIME -> colSubmitTime + TASK_RUNTIME -> colRuntime + TASK_ALLOC_NCPUS -> colNproc + TASK_REQ_NCPUS -> colReqNproc + TASK_PARENTS -> colDeps + else -> -1 + } + } + + override fun isNull(index: Int): Boolean { + require(index in 0..colDeps) { "Invalid column" } + return false + } + + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + checkActive() + return when (index) { + colReqNproc -> reqNProcs + colNproc -> nProcs + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun getString(index: Int): String? { + checkActive() + return when (index) { + colJobID -> jobId + colWorkflowID -> workflowId + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant? { + checkActive() + return when (index) { + colSubmitTime -> submitTime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration? { + checkActive() + return when (index) { + colRuntime -> runtime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getList( + index: Int, + elementType: Class, + ): List? { + throw IllegalArgumentException("Invalid column") + } + + override fun getMap( + index: Int, + keyType: Class, + valueType: Class, + ): Map? { + throw IllegalArgumentException("Invalid column") + } + + override fun getSet( + index: Int, + elementType: Class, + ): Set? { + checkActive() + return when (index) { + colDeps -> typeDeps.convertTo(dependencies, elementType) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun close() { + parser.close() + } + + /** + * Helper method to check if the reader is active. + */ + private fun checkActive() { + check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } + } + + /** + * The pattern used to parse the parents. + */ + private val pattern = Pattern.compile("\\s+") + + /** + * Parse the parents into a set of longs. + */ + private fun parseParents(value: String): Set { + val result = mutableSetOf() + val deps = value.split(pattern) + + for (dep in deps) { + if (dep.isBlank()) { + continue + } + + result.add(dep) + } + + return result + } + + /** + * Advance the parser until the next object start. + */ + private fun nextStart(): Boolean { + var token = parser.nextValue() + + while (token != null && token != JsonToken.START_OBJECT) { + token = parser.nextValue() + } + + return token != null + } + + /** + * Reader state fields. + */ + private var workflowId: String? = null + private var jobId: String? = null + private var submitTime: Instant? = null + private var runtime: Duration? = null + private var nProcs = -1 + private var reqNProcs = -1 + private var dependencies = emptySet() + + /** + * Reset the state. + */ + private fun reset() { + workflowId = null + jobId = null + submitTime = null + runtime = null + nProcs = -1 + reqNProcs = -1 + dependencies = emptySet() + } + + private val colWorkflowID = 0 + private val colJobID = 1 + private val colSubmitTime = 2 + private val colRuntime = 3 + private val colNproc = 4 + private val colReqNproc = 5 + private val colDeps = 6 + + private val typeDeps = TableColumnType.Set(TableColumnType.String) + + companion object { + /** + * The [CsvSchema] that is used to parse the trace. + */ + private val schema = + CsvSchema.builder() + .addColumn("WorkflowID", CsvSchema.ColumnType.NUMBER) + .addColumn("JobID", CsvSchema.ColumnType.NUMBER) + .addColumn("SubmitTime", CsvSchema.ColumnType.NUMBER) + .addColumn("RunTime", CsvSchema.ColumnType.NUMBER) + .addColumn("NProcs", CsvSchema.ColumnType.NUMBER) + .addColumn("ReqNProcs", CsvSchema.ColumnType.NUMBER) + .addColumn("Dependencies", CsvSchema.ColumnType.STRING) + .setAllowComments(true) + .setUseHeader(true) + .setColumnSeparator(',') + .build() + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTraceFormat.kt new file mode 100644 index 00000000..097c5593 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/gwf/GwfTraceFormat.kt @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.gwf + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter +import org.opendc.trace.conv.TABLE_TASKS +import org.opendc.trace.conv.TASK_ALLOC_NCPUS +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.conv.TASK_PARENTS +import org.opendc.trace.conv.TASK_REQ_NCPUS +import org.opendc.trace.conv.TASK_RUNTIME +import org.opendc.trace.conv.TASK_SUBMIT_TIME +import org.opendc.trace.conv.TASK_WORKFLOW_ID +import org.opendc.trace.spi.TableDetails +import org.opendc.trace.spi.TraceFormat +import java.nio.file.Path + +/** + * A [TraceFormat] implementation for the GWF trace format. + */ +public class GwfTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "gwf" + + /** + * The [CsvFactory] used to create the parser. + */ + private val factory = + CsvFactory() + .enable(CsvParser.Feature.ALLOW_COMMENTS) + .enable(CsvParser.Feature.TRIM_SPACES) + + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + + override fun getTables(path: Path): List = listOf(TABLE_TASKS) + + override fun getDetails( + path: Path, + table: String, + ): TableDetails { + return when (table) { + TABLE_TASKS -> + TableDetails( + listOf( + TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + ), + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader( + path: Path, + table: String, + projection: List?, + ): TableReader { + return when (table) { + TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile())) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter( + path: Path, + table: String, + ): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableReader.kt new file mode 100644 index 00000000..dba971d7 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableReader.kt @@ -0,0 +1,225 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.opendc + +import com.fasterxml.jackson.core.JsonParseException +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.core.JsonToken +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader +import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS +import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE +import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET +import org.opendc.trace.util.convertTo +import java.time.Duration +import java.time.Instant +import java.util.UUID + +/** + * A [TableReader] implementation for the OpenDC VM interference JSON format. + */ +internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) : TableReader { + /** + * A flag to indicate whether a single row has been read already. + */ + private var isStarted = false + + override fun nextRow(): Boolean { + if (!isStarted) { + isStarted = true + + parser.nextToken() + + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}") + } + } + + return if (parser.isClosed || parser.nextToken() == JsonToken.END_ARRAY) { + parser.close() + reset() + false + } else { + parseGroup(parser) + true + } + } + + private val colMembers = 0 + private val colTarget = 1 + private val colScore = 2 + + private val typeMembers = TableColumnType.Set(TableColumnType.String) + + override fun resolve(name: String): Int { + return when (name) { + INTERFERENCE_GROUP_MEMBERS -> colMembers + INTERFERENCE_GROUP_TARGET -> colTarget + INTERFERENCE_GROUP_SCORE -> colScore + else -> -1 + } + } + + override fun isNull(index: Int): Boolean { + return when (index) { + colMembers, colTarget, colScore -> false + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getInt(index: Int): Int { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getDouble(index: Int): Double { + checkActive() + return when (index) { + colTarget -> targetLoad + colScore -> score + else -> throw IllegalArgumentException("Invalid column $index") + } + } + + override fun getString(index: Int): String? { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getInstant(index: Int): Instant? { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getList( + index: Int, + elementType: Class, + ): List? { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getSet( + index: Int, + elementType: Class, + ): Set? { + checkActive() + return when (index) { + colMembers -> typeMembers.convertTo(members, elementType) + else -> throw IllegalArgumentException("Invalid column $index") + } + } + + override fun getMap( + index: Int, + keyType: Class, + valueType: Class, + ): Map? { + throw IllegalArgumentException("Invalid column $index") + } + + override fun close() { + parser.close() + } + + private var members = emptySet() + private var targetLoad = Double.POSITIVE_INFINITY + private var score = 1.0 + + /** + * Helper method to check if the reader is active. + */ + private fun checkActive() { + check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } + } + + /** + * Reset the state. + */ + private fun reset() { + members = emptySet() + targetLoad = Double.POSITIVE_INFINITY + score = 1.0 + } + + /** + * Parse a group an interference JSON file. + */ + private fun parseGroup(parser: JsonParser) { + var targetLoad = Double.POSITIVE_INFINITY + var score = 1.0 + val members = mutableSetOf() + + if (!parser.isExpectedStartObjectToken) { + throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}") + } + + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "vms" -> parseGroupMembers(parser, members) + "minServerLoad" -> targetLoad = parser.doubleValue + "performanceScore" -> score = parser.doubleValue + } + } + + this.members = members + this.targetLoad = targetLoad + this.score = score + } + + /** + * Parse the members of a group. + */ + private fun parseGroupMembers( + parser: JsonParser, + members: MutableSet, + ) { + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}") + } + + while (parser.nextValue() != JsonToken.END_ARRAY) { + if (parser.currentToken() != JsonToken.VALUE_STRING) { + throw JsonParseException(parser, "Expected string value for group member") + } + + members.add(parser.text) + } + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableWriter.kt new file mode 100644 index 00000000..b3286a1c --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmInterferenceJsonTableWriter.kt @@ -0,0 +1,192 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.opendc + +import com.fasterxml.jackson.core.JsonGenerator +import org.opendc.trace.TableWriter +import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS +import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE +import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET +import java.time.Duration +import java.time.Instant +import java.util.UUID + +/** + * A [TableWriter] implementation for the OpenDC VM interference JSON format. + */ +internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGenerator) : TableWriter { + /** + * A flag to indicate whether a row has been started. + */ + private var isRowActive = false + + init { + generator.writeStartArray() + } + + override fun startRow() { + // Reset state + members = emptySet() + targetLoad = Double.POSITIVE_INFINITY + score = 1.0 + + // Mark row as active + isRowActive = true + } + + override fun endRow() { + check(isRowActive) { "No active row" } + + generator.writeStartObject() + generator.writeArrayFieldStart("vms") + for (member in members) { + generator.writeString(member) + } + generator.writeEndArray() + generator.writeNumberField("minServerLoad", targetLoad) + generator.writeNumberField("performanceScore", score) + generator.writeEndObject() + } + + override fun resolve(name: String): Int { + return when (name) { + INTERFERENCE_GROUP_MEMBERS -> colMembers + INTERFERENCE_GROUP_TARGET -> colTarget + INTERFERENCE_GROUP_SCORE -> colScore + else -> -1 + } + } + + override fun setBoolean( + index: Int, + value: Boolean, + ) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setInt( + index: Int, + value: Int, + ) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setLong( + index: Int, + value: Long, + ) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setFloat( + index: Int, + value: Float, + ) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setDouble( + index: Int, + value: Double, + ) { + check(isRowActive) { "No active row" } + + when (index) { + colTarget -> targetLoad = (value as Number).toDouble() + colScore -> score = (value as Number).toDouble() + else -> throw IllegalArgumentException("Invalid column $index") + } + } + + override fun setString( + index: Int, + value: String, + ) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setUUID( + index: Int, + value: UUID, + ) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setInstant( + index: Int, + value: Instant, + ) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setDuration( + index: Int, + value: Duration, + ) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setList( + index: Int, + value: List, + ) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setSet( + index: Int, + value: Set, + ) { + check(isRowActive) { "No active row" } + + @Suppress("UNCHECKED_CAST") + when (index) { + colMembers -> members = value as Set + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun setMap( + index: Int, + value: Map, + ) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun flush() { + generator.flush() + } + + override fun close() { + generator.writeEndArray() + generator.close() + } + + private val colMembers = 0 + private val colTarget = 1 + private val colScore = 2 + + private var members = emptySet() + private var targetLoad = Double.POSITIVE_INFINITY + private var score = 1.0 +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableReader.kt new file mode 100644 index 00000000..39475f9f --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableReader.kt @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.opendc + +import org.opendc.trace.TableReader +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateDuration +import org.opendc.trace.conv.resourceStateTimestamp +import org.opendc.trace.formats.opendc.parquet.ResourceState +import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Duration +import java.time.Instant +import java.util.UUID + +/** + * A [TableReader] implementation for the OpenDC virtual machine trace format. + */ +internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader) : TableReader { + /** + * The current record. + */ + private var record: ResourceState? = null + + override fun nextRow(): Boolean { + try { + val record = reader.read() + this.record = record + + return record != null + } catch (e: Throwable) { + this.record = null + throw e + } + } + + private val colID = 0 + private val colTimestamp = 1 + private val colDuration = 2 + private val colCpuCount = 3 + private val colCpuUsage = 4 + + override fun resolve(name: String): Int { + return when (name) { + resourceID -> colID + resourceStateTimestamp -> colTimestamp + resourceStateDuration -> colDuration + resourceCpuCount -> colCpuCount + resourceStateCpuUsage -> colCpuUsage + else -> -1 + } + } + + override fun isNull(index: Int): Boolean { + require(index in 0..colCpuUsage) { "Invalid column index" } + return false + } + + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun getInt(index: Int): Int { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + colCpuCount -> record.cpuCount + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun getDouble(index: Int): Double { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + colCpuUsage -> record.cpuUsage + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun getString(index: Int): String { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + colID -> record.id + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun getInstant(index: Int): Instant { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + colTimestamp -> record.timestamp + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun getDuration(index: Int): Duration { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + colDuration -> record.duration + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun getList( + index: Int, + elementType: Class, + ): List? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun getSet( + index: Int, + elementType: Class, + ): Set? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun getMap( + index: Int, + keyType: Class, + valueType: Class, + ): Map? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun close() { + reader.close() + } + + override fun toString(): String = "OdcVmResourceStateTableReader" +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableWriter.kt new file mode 100644 index 00000000..1421d77c --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceStateTableWriter.kt @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.opendc + +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.trace.TableWriter +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateDuration +import org.opendc.trace.conv.resourceStateTimestamp +import org.opendc.trace.formats.opendc.parquet.ResourceState +import java.time.Duration +import java.time.Instant +import java.util.UUID + +/** + * A [TableWriter] implementation for the OpenDC virtual machine trace format. + */ +internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter) : TableWriter { + /** + * The current state for the record that is being written. + */ + private var localIsActive = false + private var localID: String = "" + private var localTimestamp: Instant = Instant.MIN + private var localDuration: Duration = Duration.ZERO + private var localCpuCount: Int = 0 + private var localCpuUsage: Double = Double.NaN + + override fun startRow() { + localIsActive = true + localID = "" + localTimestamp = Instant.MIN + localDuration = Duration.ZERO + localCpuCount = 0 + localCpuUsage = Double.NaN + } + + override fun endRow() { + check(localIsActive) { "No active row" } + localIsActive = false + + check(lastId != localID || localTimestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" } + + writer.write(ResourceState(localID, localTimestamp, localDuration, localCpuCount, localCpuUsage)) + + lastId = localID + lastTimestamp = localTimestamp + } + + override fun resolve(name: String): Int { + return when (name) { + resourceID -> colID + resourceStateTimestamp -> colTimestamp + resourceStateDuration -> colDuration + resourceCpuCount -> colCpuCount + resourceStateCpuUsage -> colCpuUsage + else -> -1 + } + } + + override fun setBoolean( + index: Int, + value: Boolean, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setInt( + index: Int, + value: Int, + ) { + check(localIsActive) { "No active row" } + when (index) { + colCpuCount -> localCpuCount = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun setLong( + index: Int, + value: Long, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setFloat( + index: Int, + value: Float, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setDouble( + index: Int, + value: Double, + ) { + check(localIsActive) { "No active row" } + when (index) { + colCpuUsage -> localCpuUsage = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun setString( + index: Int, + value: String, + ) { + check(localIsActive) { "No active row" } + + when (index) { + colID -> localID = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun setUUID( + index: Int, + value: UUID, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setInstant( + index: Int, + value: Instant, + ) { + check(localIsActive) { "No active row" } + + when (index) { + colTimestamp -> localTimestamp = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun setDuration( + index: Int, + value: Duration, + ) { + check(localIsActive) { "No active row" } + + when (index) { + colDuration -> localDuration = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun setList( + index: Int, + value: List, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setSet( + index: Int, + value: Set, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setMap( + index: Int, + value: Map, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun flush() { + // Not available + } + + override fun close() { + writer.close() + } + + /** + * Last column values that are used to check for correct partitioning. + */ + private var lastId: String? = null + private var lastTimestamp: Instant = Instant.MAX + + private val colID = 0 + private val colTimestamp = 1 + private val colDuration = 2 + private val colCpuCount = 3 + private val colCpuUsage = 4 +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt new file mode 100644 index 00000000..34197d7f --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableReader.kt @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.opendc + +import org.opendc.trace.TableReader +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStartTime +import org.opendc.trace.conv.resourceStopTime +import org.opendc.trace.formats.opendc.parquet.Resource +import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Duration +import java.time.Instant +import java.util.UUID + +/** + * A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format. + */ +internal class OdcVmResourceTableReader(private val reader: LocalParquetReader) : TableReader { + /** + * The current record. + */ + private var record: Resource? = null + + override fun nextRow(): Boolean { + try { + val record = reader.read() + this.record = record + + return record != null + } catch (e: Throwable) { + this.record = null + throw e + } + } + + private val colID = 0 + private val colStartTime = 1 + private val colStopTime = 2 + private val colCpuCount = 3 + private val colCpuCapacity = 4 + private val colMemCapacity = 5 + + override fun resolve(name: String): Int { + return when (name) { + resourceID -> colID + resourceStartTime -> colStartTime + resourceStopTime -> colStopTime + resourceCpuCount -> colCpuCount + resourceCpuCapacity -> colCpuCapacity + resourceMemCapacity -> colMemCapacity + else -> -1 + } + } + + override fun isNull(index: Int): Boolean { + require(index in 0..colMemCapacity) { "Invalid column index" } + return false + } + + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + colCpuCount -> record.cpuCount + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + colCpuCapacity -> record.cpuCapacity + colMemCapacity -> record.memCapacity + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getString(index: Int): String { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + colID -> record.id + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + colStartTime -> record.startTime + colStopTime -> record.stopTime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column") + } + + override fun getList( + index: Int, + elementType: Class, + ): List? { + throw IllegalArgumentException("Invalid column") + } + + override fun getSet( + index: Int, + elementType: Class, + ): Set? { + throw IllegalArgumentException("Invalid column") + } + + override fun getMap( + index: Int, + keyType: Class, + valueType: Class, + ): Map? { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + reader.close() + } + + override fun toString(): String = "OdcVmResourceTableReader" +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt new file mode 100644 index 00000000..e0a11368 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmResourceTableWriter.kt @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.opendc + +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.trace.TableWriter +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStartTime +import org.opendc.trace.conv.resourceStopTime +import org.opendc.trace.formats.opendc.parquet.Resource +import java.time.Duration +import java.time.Instant +import java.util.UUID + +/** + * A [TableWriter] implementation for the OpenDC virtual machine trace format. + */ +internal class OdcVmResourceTableWriter(private val writer: ParquetWriter) : TableWriter { + /** + * The current state for the record that is being written. + */ + private var localIsActive = false + private var localId: String = "" + private var localStartTime: Instant = Instant.MIN + private var localStopTime: Instant = Instant.MIN + private var localCpuCount: Int = 0 + private var localCpuCapacity: Double = Double.NaN + private var localMemCapacity: Double = Double.NaN + + override fun startRow() { + localIsActive = true + localId = "" + localStartTime = Instant.MIN + localStopTime = Instant.MIN + localCpuCount = 0 + localCpuCapacity = Double.NaN + localMemCapacity = Double.NaN + } + + override fun endRow() { + check(localIsActive) { "No active row" } + localIsActive = false + writer.write(Resource(localId, localStartTime, localStopTime, localCpuCount, localCpuCapacity, localMemCapacity)) + } + + override fun resolve(name: String): Int { + return when (name) { + resourceID -> colID + resourceStartTime -> colStartTime + resourceStopTime -> colStopTime + resourceCpuCount -> colCpuCount + resourceCpuCapacity -> colCpuCapacity + resourceMemCapacity -> colMemCapacity + else -> -1 + } + } + + override fun setBoolean( + index: Int, + value: Boolean, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setInt( + index: Int, + value: Int, + ) { + check(localIsActive) { "No active row" } + when (index) { + colCpuCount -> localCpuCount = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun setLong( + index: Int, + value: Long, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setFloat( + index: Int, + value: Float, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setDouble( + index: Int, + value: Double, + ) { + check(localIsActive) { "No active row" } + when (index) { + colCpuCapacity -> localCpuCapacity = value + colMemCapacity -> localMemCapacity = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun setString( + index: Int, + value: String, + ) { + check(localIsActive) { "No active row" } + when (index) { + colID -> localId = value + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun setUUID( + index: Int, + value: UUID, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setInstant( + index: Int, + value: Instant, + ) { + check(localIsActive) { "No active row" } + when (index) { + colStartTime -> localStartTime = value + colStopTime -> localStopTime = value + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun setDuration( + index: Int, + value: Duration, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setList( + index: Int, + value: List, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setSet( + index: Int, + value: Set, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setMap( + index: Int, + value: Map, + ) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun flush() { + // Not available + } + + override fun close() { + writer.close() + } + + private val colID = 0 + private val colStartTime = 1 + private val colStopTime = 2 + private val colCpuCount = 3 + private val colCpuCapacity = 4 + private val colMemCapacity = 5 +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt new file mode 100644 index 00000000..7a7bc834 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/OdcVmTraceFormat.kt @@ -0,0 +1,190 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.opendc + +import com.fasterxml.jackson.core.JsonEncoding +import com.fasterxml.jackson.core.JsonFactory +import org.apache.parquet.column.ParquetProperties +import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter +import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS +import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE +import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET +import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS +import org.opendc.trace.conv.TABLE_RESOURCES +import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStartTime +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateDuration +import org.opendc.trace.conv.resourceStateTimestamp +import org.opendc.trace.conv.resourceStopTime +import org.opendc.trace.formats.opendc.parquet.ResourceReadSupport +import org.opendc.trace.formats.opendc.parquet.ResourceStateReadSupport +import org.opendc.trace.formats.opendc.parquet.ResourceStateWriteSupport +import org.opendc.trace.formats.opendc.parquet.ResourceWriteSupport +import org.opendc.trace.spi.TableDetails +import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.parquet.LocalParquetReader +import org.opendc.trace.util.parquet.LocalParquetWriter +import java.nio.file.Files +import java.nio.file.Path +import kotlin.io.path.exists + +/** + * A [TraceFormat] implementation of the OpenDC virtual machine trace format. + */ +public class OdcVmTraceFormat : TraceFormat { + /** + * A [JsonFactory] that is used to parse the JSON-based interference model. + */ + private val jsonFactory = JsonFactory() + + /** + * The name of this trace format. + */ + override val name: String = "opendc-vm" + + override fun create(path: Path) { + // Construct directory containing the trace files + Files.createDirectories(path) + + val tables = getTables(path) + + for (table in tables) { + val writer = newWriter(path, table) + writer.close() + } + } + + override fun getTables(path: Path): List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES, TABLE_INTERFERENCE_GROUPS) + + override fun getDetails( + path: Path, + table: String, + ): TableDetails { + return when (table) { + TABLE_RESOURCES -> + TableDetails( + listOf( + TableColumn(resourceID, TableColumnType.String), + TableColumn(resourceStartTime, TableColumnType.Instant), + TableColumn(resourceStopTime, TableColumnType.Instant), + TableColumn(resourceCpuCount, TableColumnType.Int), + TableColumn(resourceCpuCapacity, TableColumnType.Double), + TableColumn(resourceMemCapacity, TableColumnType.Double), + ), + ) + TABLE_RESOURCE_STATES -> + TableDetails( + listOf( + TableColumn(resourceID, TableColumnType.String), + TableColumn(resourceStateTimestamp, TableColumnType.Instant), + TableColumn(resourceStateDuration, TableColumnType.Duration), + TableColumn(resourceCpuCount, TableColumnType.Int), + TableColumn(resourceStateCpuUsage, TableColumnType.Double), + ), + ) + TABLE_INTERFERENCE_GROUPS -> + TableDetails( + listOf( + TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)), + TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double), + TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double), + ), + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader( + path: Path, + table: String, + projection: List?, + ): TableReader { + return when (table) { + TABLE_RESOURCES -> { + val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection)) + OdcVmResourceTableReader(reader) + } + TABLE_RESOURCE_STATES -> { + val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport(projection)) + OdcVmResourceStateTableReader(reader) + } + TABLE_INTERFERENCE_GROUPS -> { + val modelPath = path.resolve("interference-model.json") + val parser = + if (modelPath.exists()) { + jsonFactory.createParser(modelPath.toFile()) + } else { + jsonFactory.createParser("[]") // If model does not exist, return empty model + } + + OdcVmInterferenceJsonTableReader(parser) + } + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter( + path: Path, + table: String, + ): TableWriter { + return when (table) { + TABLE_RESOURCES -> { + val writer = + LocalParquetWriter.builder(path.resolve("meta.parquet"), ResourceWriteSupport()) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withPageWriteChecksumEnabled(true) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() + OdcVmResourceTableWriter(writer) + } + TABLE_RESOURCE_STATES -> { + val writer = + LocalParquetWriter.builder(path.resolve("trace.parquet"), ResourceStateWriteSupport()) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withDictionaryEncoding("id", true) + .withBloomFilterEnabled("id", true) + .withPageWriteChecksumEnabled(true) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() + OdcVmResourceStateTableWriter(writer) + } + TABLE_INTERFERENCE_GROUPS -> { + val generator = jsonFactory.createGenerator(path.resolve("interference-model.json").toFile(), JsonEncoding.UTF8) + OdcVmInterferenceJsonTableWriter(generator) + } + else -> throw IllegalArgumentException("Table $table not supported") + } + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt new file mode 100644 index 00000000..e8efe60f --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/Resource.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.opendc.parquet + +import java.time.Instant + +/** + * A description of a resource in a trace. + */ +internal data class Resource( + val id: String, + val startTime: Instant, + val stopTime: Instant, + val cpuCount: Int, + val cpuCapacity: Double, + val memCapacity: Double, +) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt new file mode 100644 index 00000000..75238344 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceReadSupport.kt @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.opendc.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Types +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStartTime +import org.opendc.trace.conv.resourceStopTime + +/** + * A [ReadSupport] instance for [Resource] objects. + */ +internal class ResourceReadSupport(private val projection: List?) : ReadSupport() { + /** + * Mapping from field names to [TableColumn]s. + */ + private val fieldMap = + mapOf( + "id" to resourceID, + "submissionTime" to resourceStartTime, + "start_time" to resourceStartTime, + "endTime" to resourceStopTime, + "stop_time" to resourceStopTime, + "maxCores" to resourceCpuCount, + "cpu_count" to resourceCpuCount, + "cpu_capacity" to resourceCpuCapacity, + "requiredMemory" to resourceMemCapacity, + "mem_capacity" to resourceMemCapacity, + ) + + override fun init(context: InitContext): ReadContext { + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val projectionSet = projection.toSet() + + for (field in READ_SCHEMA.fields) { + val col = fieldMap[field.name] ?: continue + if (col in projectionSet) { + addField(field) + } + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + + return ReadContext(projectedSchema) + } + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map, + fileSchema: MessageType, + readContext: ReadContext, + ): RecordMaterializer = ResourceRecordMaterializer(readContext.requestedSchema) + + companion object { + /** + * Parquet read schema (version 2.0) for the "resources" table in the trace. + */ + @JvmStatic + val READ_SCHEMA_V2_0: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("submissionTime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("endTime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("maxCores"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("requiredMemory"), + ) + .named("resource") + + /** + * Parquet read schema (version 2.1) for the "resources" table in the trace. + */ + @JvmStatic + val READ_SCHEMA_V2_1: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("start_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("stop_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + ) + .named("resource") + + /** + * Parquet read schema for the "resources" table in the trace. + */ + @JvmStatic + val READ_SCHEMA: MessageType = + READ_SCHEMA_V2_0 + .union(READ_SCHEMA_V2_1) + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt new file mode 100644 index 00000000..2e32c2e2 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceRecordMaterializer.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.opendc.parquet + +import org.apache.parquet.io.api.Binary +import org.apache.parquet.io.api.Converter +import org.apache.parquet.io.api.GroupConverter +import org.apache.parquet.io.api.PrimitiveConverter +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.MessageType +import java.time.Instant + +/** + * A [RecordMaterializer] for [Resource] records. + */ +internal class ResourceRecordMaterializer(schema: MessageType) : RecordMaterializer() { + /** + * State of current record being read. + */ + private var localId = "" + private var localStartTime = Instant.MIN + private var localStopTime = Instant.MIN + private var localCpuCount = 0 + private var localCpuCapacity = 0.0 + private var localMemCapacity = 0.0 + + /** + * Root converter for the record. + */ + private val root = + object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = + schema.fields.map { type -> + when (type.name) { + "id" -> + object : PrimitiveConverter() { + override fun addBinary(value: Binary) { + localId = value.toStringUsingUTF8() + } + } + "start_time", "submissionTime" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localStartTime = Instant.ofEpochMilli(value) + } + } + "stop_time", "endTime" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localStopTime = Instant.ofEpochMilli(value) + } + } + "cpu_count", "maxCores" -> + object : PrimitiveConverter() { + override fun addInt(value: Int) { + localCpuCount = value + } + } + "cpu_capacity" -> + object : PrimitiveConverter() { + override fun addDouble(value: Double) { + localCpuCapacity = value + } + } + "mem_capacity", "requiredMemory" -> + object : PrimitiveConverter() { + override fun addDouble(value: Double) { + localMemCapacity = value + } + + override fun addLong(value: Long) { + localMemCapacity = value.toDouble() + } + } + else -> error("Unknown column $type") + } + } + + override fun start() { + localId = "" + localStartTime = Instant.MIN + localStopTime = Instant.MIN + localCpuCount = 0 + localCpuCapacity = 0.0 + localMemCapacity = 0.0 + } + + override fun end() {} + + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } + + override fun getCurrentRecord(): Resource = + Resource( + localId, + localStartTime, + localStopTime, + localCpuCount, + localCpuCapacity, + localMemCapacity, + ) + + override fun getRootConverter(): GroupConverter = root +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceState.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceState.kt new file mode 100644 index 00000000..64ab9dca --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceState.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.opendc.parquet + +import java.time.Duration +import java.time.Instant + +internal class ResourceState( + val id: String, + val timestamp: Instant, + val duration: Duration, + val cpuCount: Int, + val cpuUsage: Double, +) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt new file mode 100644 index 00000000..e7d35630 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateReadSupport.kt @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.opendc.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Types +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateDuration +import org.opendc.trace.conv.resourceStateTimestamp + +/** + * A [ReadSupport] instance for [ResourceState] objects. + */ +internal class ResourceStateReadSupport(private val projection: List?) : ReadSupport() { + /** + * Mapping from field names to [TableColumn]s. + */ + private val fieldMap = + mapOf( + "id" to resourceID, + "time" to resourceStateTimestamp, + "timestamp" to resourceStateTimestamp, + "duration" to resourceStateDuration, + "cores" to resourceCpuCount, + "cpu_count" to resourceCpuCount, + "cpuUsage" to resourceStateCpuUsage, + "cpu_usage" to resourceStateCpuUsage, + ) + + override fun init(context: InitContext): ReadContext { + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val projectionSet = projection.toSet() + + for (field in READ_SCHEMA.fields) { + val col = fieldMap[field.name] ?: continue + if (col in projectionSet) { + addField(field) + } + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + + return ReadContext(projectedSchema) + } + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map, + fileSchema: MessageType, + readContext: ReadContext, + ): RecordMaterializer = ResourceStateRecordMaterializer(readContext.requestedSchema) + + companion object { + /** + * Parquet read schema (version 2.0) for the "resource states" table in the trace. + */ + @JvmStatic + val READ_SCHEMA_V2_0: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cores"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpuUsage"), + ) + .named("resource_state") + + /** + * Parquet read schema (version 2.1) for the "resource states" table in the trace. + */ + @JvmStatic + val READ_SCHEMA_V2_1: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage"), + ) + .named("resource_state") + + /** + * Parquet read schema for the "resource states" table in the trace. + */ + @JvmStatic + val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0.union(READ_SCHEMA_V2_1) + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt new file mode 100644 index 00000000..8ff0e476 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateRecordMaterializer.kt @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.opendc.parquet + +import org.apache.parquet.io.api.Binary +import org.apache.parquet.io.api.Converter +import org.apache.parquet.io.api.GroupConverter +import org.apache.parquet.io.api.PrimitiveConverter +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.MessageType +import java.time.Duration +import java.time.Instant + +/** + * A [RecordMaterializer] for [ResourceState] records. + */ +internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMaterializer() { + /** + * State of current record being read. + */ + private var localId = "" + private var localTimestamp = Instant.MIN + private var localDuration = Duration.ZERO + private var localCpuCount = 0 + private var localCpuUsage = 0.0 + + /** + * Root converter for the record. + */ + private val root = + object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = + schema.fields.map { type -> + when (type.name) { + "id" -> + object : PrimitiveConverter() { + override fun addBinary(value: Binary) { + localId = value.toStringUsingUTF8() + } + } + "timestamp", "time" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localTimestamp = Instant.ofEpochMilli(value) + } + } + "duration" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localDuration = Duration.ofMillis(value) + } + } + "cpu_count", "cores" -> + object : PrimitiveConverter() { + override fun addInt(value: Int) { + localCpuCount = value + } + } + "cpu_usage", "cpuUsage" -> + object : PrimitiveConverter() { + override fun addDouble(value: Double) { + localCpuUsage = value + } + } + "flops" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + // Ignore to support v1 format + } + } + else -> error("Unknown column $type") + } + } + + override fun start() { + localId = "" + localTimestamp = Instant.MIN + localDuration = Duration.ZERO + localCpuCount = 0 + localCpuUsage = 0.0 + } + + override fun end() {} + + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } + + override fun getCurrentRecord(): ResourceState = ResourceState(localId, localTimestamp, localDuration, localCpuCount, localCpuUsage) + + override fun getRootConverter(): GroupConverter = root +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateWriteSupport.kt new file mode 100644 index 00000000..58c43916 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceStateWriteSupport.kt @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.opendc.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.Binary +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Types + +/** + * Support for writing [Resource] instances to Parquet format. + */ +internal class ResourceStateWriteSupport : WriteSupport() { + /** + * The current active record consumer. + */ + private lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(WRITE_SCHEMA, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: ResourceState) { + write(recordConsumer, record) + } + + private fun write( + consumer: RecordConsumer, + record: ResourceState, + ) { + consumer.startMessage() + + consumer.startField("id", 0) + consumer.addBinary(Binary.fromCharSequence(record.id)) + consumer.endField("id", 0) + + consumer.startField("timestamp", 1) + consumer.addLong(record.timestamp.toEpochMilli()) + consumer.endField("timestamp", 1) + + consumer.startField("duration", 2) + consumer.addLong(record.duration.toMillis()) + consumer.endField("duration", 2) + + consumer.startField("cpu_count", 3) + consumer.addInteger(record.cpuCount) + consumer.endField("cpu_count", 3) + + consumer.startField("cpu_usage", 4) + consumer.addDouble(record.cpuUsage) + consumer.endField("cpu_usage", 4) + + consumer.endMessage() + } + + companion object { + /** + * Parquet schema for the "resource states" table in the trace. + */ + @JvmStatic + val WRITE_SCHEMA: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage"), + ) + .named("resource_state") + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt new file mode 100644 index 00000000..a9937ffd --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/opendc/parquet/ResourceWriteSupport.kt @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.formats.opendc.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.Binary +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Types +import kotlin.math.roundToLong + +/** + * Support for writing [Resource] instances to Parquet format. + */ +internal class ResourceWriteSupport : WriteSupport() { + /** + * The current active record consumer. + */ + private lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(WRITE_SCHEMA, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: Resource) { + write(recordConsumer, record) + } + + private fun write( + consumer: RecordConsumer, + record: Resource, + ) { + consumer.startMessage() + + consumer.startField("id", 0) + consumer.addBinary(Binary.fromCharSequence(record.id)) + consumer.endField("id", 0) + + consumer.startField("start_time", 1) + consumer.addLong(record.startTime.toEpochMilli()) + consumer.endField("start_time", 1) + + consumer.startField("stop_time", 2) + consumer.addLong(record.stopTime.toEpochMilli()) + consumer.endField("stop_time", 2) + + consumer.startField("cpu_count", 3) + consumer.addInteger(record.cpuCount) + consumer.endField("cpu_count", 3) + + consumer.startField("cpu_capacity", 4) + consumer.addDouble(record.cpuCapacity) + consumer.endField("cpu_capacity", 4) + + consumer.startField("mem_capacity", 5) + consumer.addLong(record.memCapacity.roundToLong()) + consumer.endField("mem_capacity", 5) + + consumer.endMessage() + } + + companion object { + /** + * Parquet schema for the "resources" table in the trace. + */ + @JvmStatic + val WRITE_SCHEMA: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("start_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("stop_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + ) + .named("resource") + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTaskTableReader.kt new file mode 100644 index 00000000..5a79fd6f --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTaskTableReader.kt @@ -0,0 +1,236 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.swf + +import org.opendc.trace.TableReader +import org.opendc.trace.conv.TASK_ALLOC_NCPUS +import org.opendc.trace.conv.TASK_GROUP_ID +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.conv.TASK_PARENTS +import org.opendc.trace.conv.TASK_REQ_NCPUS +import org.opendc.trace.conv.TASK_RUNTIME +import org.opendc.trace.conv.TASK_STATUS +import org.opendc.trace.conv.TASK_SUBMIT_TIME +import org.opendc.trace.conv.TASK_USER_ID +import org.opendc.trace.conv.TASK_WAIT_TIME +import java.io.BufferedReader +import java.time.Duration +import java.time.Instant +import java.util.UUID + +/** + * A [TableReader] implementation for the SWF format. + */ +internal class SwfTaskTableReader(private val reader: BufferedReader) : TableReader { + /** + * A flag to indicate the state of the reader + */ + private var state = State.Pending + + /** + * The current row. + */ + private var fields = emptyList() + + /** + * A [Regex] object to match whitespace. + */ + private val whitespace = "\\s+".toRegex() + + override fun nextRow(): Boolean { + var line: String? + var num = 0 + + val state = state + if (state == State.Closed) { + return false + } else if (state == State.Pending) { + this.state = State.Active + } + + while (true) { + line = reader.readLine() + + if (line == null) { + this.state = State.Closed + return false + } + num++ + + if (line.isBlank()) { + // Ignore empty lines + continue + } else if (line.startsWith(";")) { + // Ignore comments for now + continue + } + + break + } + + fields = line!!.trim().split(whitespace) + + if (fields.size < 18) { + throw IllegalArgumentException("Invalid format at line $line") + } + + return true + } + + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> colJobID + TASK_SUBMIT_TIME -> colSubmitTime + TASK_WAIT_TIME -> colWaitTime + TASK_RUNTIME -> colRunTime + TASK_ALLOC_NCPUS -> colAllocNcpus + TASK_REQ_NCPUS -> colReqNcpus + TASK_STATUS -> colStatus + TASK_USER_ID -> colUserID + TASK_GROUP_ID -> colGroupID + TASK_PARENTS -> colParentJob + else -> -1 + } + } + + override fun isNull(index: Int): Boolean { + require(index in colJobID..colParentThinkTime) { "Invalid column index" } + return false + } + + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + check(state == State.Active) { "No active row" } + return when (index) { + colReqNcpus, colAllocNcpus, colStatus, colGroupID, colUserID -> fields[index].toInt(10) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun getString(index: Int): String { + check(state == State.Active) { "No active row" } + return when (index) { + colJobID -> fields[index] + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant? { + check(state == State.Active) { "No active row" } + return when (index) { + colSubmitTime -> Instant.ofEpochSecond(fields[index].toLong(10)) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration? { + check(state == State.Active) { "No active row" } + return when (index) { + colWaitTime, colRunTime -> Duration.ofSeconds(fields[index].toLong(10)) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getList( + index: Int, + elementType: Class, + ): List? { + throw IllegalArgumentException("Invalid column") + } + + override fun getSet( + index: Int, + elementType: Class, + ): Set? { + check(state == State.Active) { "No active row" } + @Suppress("UNCHECKED_CAST") + return when (index) { + colParentJob -> { + require(elementType.isAssignableFrom(String::class.java)) + val parent = fields[index].toLong(10) + if (parent < 0) emptySet() else setOf(parent) + } + else -> throw IllegalArgumentException("Invalid column") + } as Set? + } + + override fun getMap( + index: Int, + keyType: Class, + valueType: Class, + ): Map? { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + reader.close() + state = State.Closed + } + + /** + * Default column indices for the SWF format. + */ + private val colJobID = 0 + private val colSubmitTime = 1 + private val colWaitTime = 2 + private val colRunTime = 3 + private val colAllocNcpus = 4 + private val colAvgCpuTime = 5 + private val colUsedMem = 6 + private val colReqNcpus = 7 + private val colReqTime = 8 + private val colReqMem = 9 + private val colStatus = 10 + private val colUserID = 11 + private val colGroupID = 12 + private val colExecNum = 13 + private val colQueueNum = 14 + private val colPartNum = 15 + private val colParentJob = 16 + private val colParentThinkTime = 17 + + private enum class State { + Pending, + Active, + Closed, + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTraceFormat.kt new file mode 100644 index 00000000..d59b07b4 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/swf/SwfTraceFormat.kt @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.swf + +import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter +import org.opendc.trace.conv.TABLE_TASKS +import org.opendc.trace.conv.TASK_ALLOC_NCPUS +import org.opendc.trace.conv.TASK_GROUP_ID +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.conv.TASK_PARENTS +import org.opendc.trace.conv.TASK_REQ_NCPUS +import org.opendc.trace.conv.TASK_RUNTIME +import org.opendc.trace.conv.TASK_STATUS +import org.opendc.trace.conv.TASK_SUBMIT_TIME +import org.opendc.trace.conv.TASK_USER_ID +import org.opendc.trace.conv.TASK_WAIT_TIME +import org.opendc.trace.spi.TableDetails +import org.opendc.trace.spi.TraceFormat +import java.nio.file.Path +import kotlin.io.path.bufferedReader + +/** + * Support for the Standard Workload Format (SWF) in OpenDC. + * + * The standard is defined by the PWA, see here: https://www.cse.huji.ac.il/labs/parallel/workload/swf.html + */ +public class SwfTraceFormat : TraceFormat { + override val name: String = "swf" + + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + + override fun getTables(path: Path): List = listOf(TABLE_TASKS) + + override fun getDetails( + path: Path, + table: String, + ): TableDetails { + return when (table) { + TABLE_TASKS -> + TableDetails( + listOf( + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), + TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_STATUS, TableColumnType.Int), + TableColumn(TASK_GROUP_ID, TableColumnType.Int), + TableColumn(TASK_USER_ID, TableColumnType.Int), + ), + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader( + path: Path, + table: String, + projection: List?, + ): TableReader { + return when (table) { + TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader()) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter( + path: Path, + table: String, + ): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTaskTableReader.kt new file mode 100644 index 00000000..8f84e51f --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTaskTableReader.kt @@ -0,0 +1,314 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonParseException +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.core.JsonToken +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader +import org.opendc.trace.conv.TASK_CHILDREN +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.conv.TASK_PARENTS +import org.opendc.trace.conv.TASK_REQ_NCPUS +import org.opendc.trace.conv.TASK_RUNTIME +import org.opendc.trace.conv.TASK_WORKFLOW_ID +import org.opendc.trace.util.convertTo +import java.time.Duration +import java.time.Instant +import java.util.UUID +import kotlin.math.roundToInt + +/** + * A [TableReader] implementation for the WfCommons workload trace format. + */ +internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableReader { + /** + * The current nesting of the parser. + */ + private var level: ParserLevel = ParserLevel.TOP + + override fun nextRow(): Boolean { + reset() + + var hasJob = false + + while (!hasJob) { + when (level) { + ParserLevel.TOP -> { + val token = parser.nextToken() + + // Check whether the document is not empty and starts with an object + if (token == null) { + parser.close() + break + } else if (token != JsonToken.START_OBJECT) { + throw JsonParseException(parser, "Expected object", parser.currentLocation) + } else { + level = ParserLevel.TRACE + } + } + ParserLevel.TRACE -> { + // Seek for the workflow object in the file + if (!seekWorkflow()) { + parser.close() + break + } else if (!parser.isExpectedStartObjectToken) { + throw JsonParseException(parser, "Expected object", parser.currentLocation) + } else { + level = ParserLevel.WORKFLOW + } + } + ParserLevel.WORKFLOW -> { + // Seek for the jobs object in the file + level = + if (!seekJobs()) { + ParserLevel.TRACE + } else if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array", parser.currentLocation) + } else { + ParserLevel.JOB + } + } + ParserLevel.JOB -> { + when (parser.nextToken()) { + JsonToken.END_ARRAY -> level = ParserLevel.WORKFLOW + JsonToken.START_OBJECT -> { + parseJob() + hasJob = true + break + } + else -> throw JsonParseException(parser, "Unexpected token", parser.currentLocation) + } + } + } + } + + return hasJob + } + + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> colID + TASK_WORKFLOW_ID -> colWorkflowID + TASK_RUNTIME -> colRuntime + TASK_REQ_NCPUS -> colNproc + TASK_PARENTS -> colParents + TASK_CHILDREN -> colChildren + else -> -1 + } + } + + override fun isNull(index: Int): Boolean { + require(index in 0..colChildren) { "Invalid column value" } + return false + } + + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + checkActive() + return when (index) { + colNproc -> cores + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun getString(index: Int): String? { + checkActive() + return when (index) { + colID -> id + colWorkflowID -> workflowId + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant? { + throw IllegalArgumentException("Invalid column") + } + + override fun getDuration(index: Int): Duration? { + checkActive() + return when (index) { + colRuntime -> runtime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getList( + index: Int, + elementType: Class, + ): List? { + throw IllegalArgumentException("Invalid column") + } + + override fun getSet( + index: Int, + elementType: Class, + ): Set? { + checkActive() + return when (index) { + colParents -> typeParents.convertTo(parents, elementType) + colChildren -> typeChildren.convertTo(children, elementType) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getMap( + index: Int, + keyType: Class, + valueType: Class, + ): Map? { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + parser.close() + } + + /** + * Helper method to check if the reader is active. + */ + private fun checkActive() { + check(level != ParserLevel.TOP && !parser.isClosed) { "No active row. Did you call nextRow()?" } + } + + /** + * Parse the trace and seek until the workflow description. + */ + private fun seekWorkflow(): Boolean { + while (parser.nextValue() != JsonToken.END_OBJECT && !parser.isClosed) { + when (parser.currentName) { + "name" -> workflowId = parser.text + "workflow" -> return true + else -> parser.skipChildren() + } + } + + return false + } + + /** + * Parse the workflow description in the file and seek until the first job. + */ + private fun seekJobs(): Boolean { + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "jobs" -> return true + else -> parser.skipChildren() + } + } + + return false + } + + /** + * Parse a single job in the file. + */ + private fun parseJob() { + while (parser.nextValue() != JsonToken.END_OBJECT) { + when (parser.currentName) { + "name" -> id = parser.text + "parents" -> parents = parseIds() + "children" -> children = parseIds() + "runtime" -> runtime = Duration.ofSeconds(parser.numberValue.toLong()) + "cores" -> cores = parser.floatValue.roundToInt() + else -> parser.skipChildren() + } + } + } + + /** + * Parse the parents/children of a job. + */ + private fun parseIds(): Set { + if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array", parser.currentLocation) + } + + val ids = mutableSetOf() + + while (parser.nextToken() != JsonToken.END_ARRAY) { + if (parser.currentToken != JsonToken.VALUE_STRING) { + throw JsonParseException(parser, "Expected token", parser.currentLocation) + } + + ids.add(parser.valueAsString) + } + + return ids + } + + private enum class ParserLevel { + TOP, + TRACE, + WORKFLOW, + JOB, + } + + /** + * State fields for the parser. + */ + private var id: String? = null + private var workflowId: String? = null + private var runtime: Duration? = null + private var parents: Set? = null + private var children: Set? = null + private var cores = -1 + + private fun reset() { + id = null + runtime = null + parents = null + children = null + cores = -1 + } + + private val colID = 0 + private val colWorkflowID = 1 + private val colRuntime = 3 + private val colNproc = 4 + private val colParents = 5 + private val colChildren = 6 + + private val typeParents = TableColumnType.Set(TableColumnType.String) + private val typeChildren = TableColumnType.Set(TableColumnType.String) +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTraceFormat.kt new file mode 100644 index 00000000..2178fac6 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wfformat/WfFormatTraceFormat.kt @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wfformat + +import com.fasterxml.jackson.core.JsonFactory +import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter +import org.opendc.trace.conv.TABLE_TASKS +import org.opendc.trace.conv.TASK_CHILDREN +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.conv.TASK_PARENTS +import org.opendc.trace.conv.TASK_REQ_NCPUS +import org.opendc.trace.conv.TASK_RUNTIME +import org.opendc.trace.conv.TASK_WORKFLOW_ID +import org.opendc.trace.spi.TableDetails +import org.opendc.trace.spi.TraceFormat +import java.nio.file.Path + +/** + * A [TraceFormat] implementation for the WfCommons workload trace format. + */ +public class WfFormatTraceFormat : TraceFormat { + /** + * The [JsonFactory] that is used to created JSON parsers. + */ + private val factory = JsonFactory() + + override val name: String = "wfformat" + + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + + override fun getTables(path: Path): List = listOf(TABLE_TASKS) + + override fun getDetails( + path: Path, + table: String, + ): TableDetails { + return when (table) { + TABLE_TASKS -> + TableDetails( + listOf( + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), + ), + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader( + path: Path, + table: String, + projection: List?, + ): TableReader { + return when (table) { + TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile())) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter( + path: Path, + table: String, + ): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTaskTableReader.kt new file mode 100644 index 00000000..95582388 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTaskTableReader.kt @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf + +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader +import org.opendc.trace.conv.TASK_CHILDREN +import org.opendc.trace.conv.TASK_GROUP_ID +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.conv.TASK_PARENTS +import org.opendc.trace.conv.TASK_REQ_NCPUS +import org.opendc.trace.conv.TASK_RUNTIME +import org.opendc.trace.conv.TASK_SUBMIT_TIME +import org.opendc.trace.conv.TASK_USER_ID +import org.opendc.trace.conv.TASK_WAIT_TIME +import org.opendc.trace.conv.TASK_WORKFLOW_ID +import org.opendc.trace.util.convertTo +import org.opendc.trace.util.parquet.LocalParquetReader +import org.opendc.trace.wtf.parquet.Task +import java.time.Duration +import java.time.Instant +import java.util.UUID + +/** + * A [TableReader] implementation for the WTF format. + */ +internal class WtfTaskTableReader(private val reader: LocalParquetReader) : TableReader { + /** + * The current record. + */ + private var record: Task? = null + + override fun nextRow(): Boolean { + try { + val record = reader.read() + this.record = record + + return record != null + } catch (e: Throwable) { + this.record = null + throw e + } + } + + private val colID = 0 + private val colWorkflowID = 1 + private val colSubmitTime = 2 + private val colWaitTime = 3 + private val colRuntime = 4 + private val colReqNcpus = 5 + private val colParents = 6 + private val colChildren = 7 + private val colGroupID = 8 + private val colUserID = 9 + + private val typeParents = TableColumnType.Set(TableColumnType.String) + private val typeChildren = TableColumnType.Set(TableColumnType.String) + + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> colID + TASK_WORKFLOW_ID -> colWorkflowID + TASK_SUBMIT_TIME -> colSubmitTime + TASK_WAIT_TIME -> colWaitTime + TASK_RUNTIME -> colRuntime + TASK_REQ_NCPUS -> colReqNcpus + TASK_PARENTS -> colParents + TASK_CHILDREN -> colChildren + TASK_GROUP_ID -> colGroupID + TASK_USER_ID -> colUserID + else -> -1 + } + } + + override fun isNull(index: Int): Boolean { + require(index in colID..colUserID) { "Invalid column index" } + return false + } + + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + colReqNcpus -> record.requestedCpus + colGroupID -> record.groupId + colUserID -> record.userId + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun getString(index: Int): String { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + colID -> record.id + colWorkflowID -> record.workflowId + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + colSubmitTime -> record.submitTime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + colWaitTime -> record.waitTime + colRuntime -> record.runtime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getList( + index: Int, + elementType: Class, + ): List? { + throw IllegalArgumentException("Invalid column") + } + + override fun getSet( + index: Int, + elementType: Class, + ): Set? { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + colParents -> typeParents.convertTo(record.parents, elementType) + colChildren -> typeChildren.convertTo(record.children, elementType) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getMap( + index: Int, + keyType: Class, + valueType: Class, + ): Map? { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + reader.close() + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTraceFormat.kt new file mode 100644 index 00000000..1386d2ef --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/WtfTraceFormat.kt @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf + +import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter +import org.opendc.trace.conv.TABLE_TASKS +import org.opendc.trace.conv.TASK_CHILDREN +import org.opendc.trace.conv.TASK_GROUP_ID +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.conv.TASK_PARENTS +import org.opendc.trace.conv.TASK_REQ_NCPUS +import org.opendc.trace.conv.TASK_RUNTIME +import org.opendc.trace.conv.TASK_SUBMIT_TIME +import org.opendc.trace.conv.TASK_USER_ID +import org.opendc.trace.conv.TASK_WAIT_TIME +import org.opendc.trace.conv.TASK_WORKFLOW_ID +import org.opendc.trace.spi.TableDetails +import org.opendc.trace.spi.TraceFormat +import org.opendc.trace.util.parquet.LocalParquetReader +import org.opendc.trace.wtf.parquet.TaskReadSupport +import java.nio.file.Path + +/** + * A [TraceFormat] implementation for the Workflow Trace Format (WTF). + */ +public class WtfTraceFormat : TraceFormat { + override val name: String = "wtf" + + override fun create(path: Path) { + throw UnsupportedOperationException("Writing not supported for this format") + } + + override fun getTables(path: Path): List = listOf(TABLE_TASKS) + + override fun getDetails( + path: Path, + table: String, + ): TableDetails { + return when (table) { + TABLE_TASKS -> + TableDetails( + listOf( + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), + TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), + TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_GROUP_ID, TableColumnType.Int), + TableColumn(TASK_USER_ID, TableColumnType.Int), + ), + ) + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newReader( + path: Path, + table: String, + projection: List?, + ): TableReader { + return when (table) { + TABLE_TASKS -> { + val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection), strictTyping = false) + WtfTaskTableReader(reader) + } + else -> throw IllegalArgumentException("Table $table not supported") + } + } + + override fun newWriter( + path: Path, + table: String, + ): TableWriter { + throw UnsupportedOperationException("Writing not supported for this format") + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/Task.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/Task.kt new file mode 100644 index 00000000..a1db0cab --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/Task.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf.parquet + +import java.time.Duration +import java.time.Instant + +/** + * A task in the Workflow Trace Format. + */ +internal data class Task( + val id: String, + val workflowId: String, + val submitTime: Instant, + val waitTime: Duration, + val runtime: Duration, + val requestedCpus: Int, + val groupId: Int, + val userId: Int, + val parents: Set, + val children: Set, +) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskReadSupport.kt new file mode 100644 index 00000000..1f9c506d --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskReadSupport.kt @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Type +import org.apache.parquet.schema.Types +import org.opendc.trace.conv.TASK_CHILDREN +import org.opendc.trace.conv.TASK_GROUP_ID +import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.conv.TASK_PARENTS +import org.opendc.trace.conv.TASK_REQ_NCPUS +import org.opendc.trace.conv.TASK_RUNTIME +import org.opendc.trace.conv.TASK_SUBMIT_TIME +import org.opendc.trace.conv.TASK_USER_ID +import org.opendc.trace.conv.TASK_WAIT_TIME +import org.opendc.trace.conv.TASK_WORKFLOW_ID + +/** + * A [ReadSupport] instance for [Task] objects. + * + * @param projection The projection of the table to read. + */ +internal class TaskReadSupport(private val projection: List?) : ReadSupport() { + /** + * Mapping of table columns to their Parquet column names. + */ + private val colMap = + mapOf( + TASK_ID to "id", + TASK_WORKFLOW_ID to "workflow_id", + TASK_SUBMIT_TIME to "ts_submit", + TASK_WAIT_TIME to "wait_time", + TASK_RUNTIME to "runtime", + TASK_REQ_NCPUS to "resource_amount_requested", + TASK_PARENTS to "parents", + TASK_CHILDREN to "children", + TASK_GROUP_ID to "group_id", + TASK_USER_ID to "user_id", + ) + + override fun init(context: InitContext): ReadContext { + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val fieldByName = READ_SCHEMA.fields.associateBy { it.name } + + for (col in projection) { + val fieldName = colMap[col] ?: continue + addField(fieldByName.getValue(fieldName)) + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + return ReadContext(projectedSchema) + } + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map, + fileSchema: MessageType, + readContext: ReadContext, + ): RecordMaterializer = TaskRecordMaterializer(readContext.requestedSchema) + + companion object { + /** + * Parquet read schema for the "tasks" table in the trace. + */ + @JvmStatic + val READ_SCHEMA: MessageType = + Types.buildMessage() + .addFields( + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("workflow_id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("ts_submit"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("wait_time"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("runtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("resource_amount_requested"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("user_id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("group_id"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("children"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("parents"), + ) + .named("task") + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskRecordMaterializer.kt new file mode 100644 index 00000000..412a4f8b --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/formats/wtf/parquet/TaskRecordMaterializer.kt @@ -0,0 +1,188 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf.parquet + +import org.apache.parquet.io.api.Converter +import org.apache.parquet.io.api.GroupConverter +import org.apache.parquet.io.api.PrimitiveConverter +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.MessageType +import java.time.Duration +import java.time.Instant +import kotlin.math.roundToInt +import kotlin.math.roundToLong + +/** + * A [RecordMaterializer] for [Task] records. + */ +internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer() { + /** + * State of current record being read. + */ + private var localID = "" + private var localWorkflowID = "" + private var localSubmitTime = Instant.MIN + private var localWaitTime = Duration.ZERO + private var localRuntime = Duration.ZERO + private var localRequestedCpus = 0 + private var localGroupId = 0 + private var localUserId = 0 + private var localParents = mutableSetOf() + private var localChildren = mutableSetOf() + + /** + * Root converter for the record. + */ + private val root = + object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = + schema.fields.map { type -> + when (type.name) { + "id" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localID = value.toString() + } + } + "workflow_id" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localWorkflowID = value.toString() + } + } + "ts_submit" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localSubmitTime = Instant.ofEpochMilli(value) + } + } + "wait_time" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localWaitTime = Duration.ofMillis(value) + } + } + "runtime" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localRuntime = Duration.ofMillis(value) + } + } + "resource_amount_requested" -> + object : PrimitiveConverter() { + override fun addDouble(value: Double) { + localRequestedCpus = value.roundToInt() + } + } + "group_id" -> + object : PrimitiveConverter() { + override fun addInt(value: Int) { + localGroupId = value + } + } + "user_id" -> + object : PrimitiveConverter() { + override fun addInt(value: Int) { + localUserId = value + } + } + "children" -> RelationConverter(localChildren) + "parents" -> RelationConverter(localParents) + else -> error("Unknown column $type") + } + } + + override fun start() { + localID = "" + localWorkflowID = "" + localSubmitTime = Instant.MIN + localWaitTime = Duration.ZERO + localRuntime = Duration.ZERO + localRequestedCpus = 0 + localGroupId = 0 + localUserId = 0 + localParents.clear() + localChildren.clear() + } + + override fun end() {} + + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } + + override fun getCurrentRecord(): Task = + Task( + localID, + localWorkflowID, + localSubmitTime, + localWaitTime, + localRuntime, + localRequestedCpus, + localGroupId, + localUserId, + localParents.toSet(), + localChildren.toSet(), + ) + + override fun getRootConverter(): GroupConverter = root + + /** + * Helper class to convert parent and child relations and add them to [relations]. + */ + private class RelationConverter(private val relations: MutableSet) : GroupConverter() { + private val entryConverter = + object : PrimitiveConverter() { + override fun addLong(value: Long) { + relations.add(value.toString()) + } + + override fun addDouble(value: Double) { + relations.add(value.roundToLong().toString()) + } + } + + private val listConverter = + object : GroupConverter() { + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return entryConverter + } + + override fun start() {} + + override fun end() {} + } + + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return listConverter + } + + override fun start() {} + + override fun end() {} + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt index 83537822..89cac608 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -24,6 +24,13 @@ package org.opendc.trace.spi import org.opendc.trace.TableReader import org.opendc.trace.TableWriter +import org.opendc.trace.azure.AzureTraceFormat +import org.opendc.trace.bitbrains.BitbrainsTraceFormat +import org.opendc.trace.formats.opendc.OdcVmTraceFormat +import org.opendc.trace.gwf.GwfTraceFormat +import org.opendc.trace.swf.SwfTraceFormat +import org.opendc.trace.wfformat.WfFormatTraceFormat +import org.opendc.trace.wtf.WtfTraceFormat import java.nio.file.Path import java.util.ServiceLoader @@ -107,13 +114,31 @@ public interface TraceFormat { return ServiceLoader.load(TraceFormat::class.java) } +// /** +// * Obtain a [TraceFormat] implementation by [name]. +// */ +// @JvmStatic +// public fun byName(name: String): TraceFormat? { +// +// val loader = ServiceLoader.load(TraceFormat::class.java) +// return loader.find { it.name == name } +// } + /** * Obtain a [TraceFormat] implementation by [name]. */ @JvmStatic public fun byName(name: String): TraceFormat? { - val loader = ServiceLoader.load(TraceFormat::class.java) - return loader.find { it.name == name } + return when (name) { + "opendc-vm" -> OdcVmTraceFormat() + "azure" -> AzureTraceFormat() + "bitbrains" -> BitbrainsTraceFormat() + "gwf" -> GwfTraceFormat() + "swf" -> SwfTraceFormat() + "wfformat" -> WfFormatTraceFormat() + "wtf" -> WtfTraceFormat() + else -> null + } } } } -- cgit v1.2.3