diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2024-04-16 09:29:53 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-04-16 09:29:53 +0200 |
| commit | fff89d25bd3c7b874e68261d21695c473c30ed7d (patch) | |
| tree | be368dd745e8119dbdefd9cd0b012c7ff9080a7a /opendc-trace/opendc-trace-opendc/src/main | |
| parent | a7b0afbb5b7059274962ade234a50240677008fd (diff) | |
Revamped the trace system. All TraceFormat files are now in the api m… (#216)
* Revamped the trace system. All TraceFormat files are now in the api module. This fixes some problems with not being able to use types of traces
* applied spotless
Diffstat (limited to 'opendc-trace/opendc-trace-opendc/src/main')
16 files changed, 0 insertions, 2201 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt deleted file mode 100644 index 7bf48f1a..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc - -import com.fasterxml.jackson.core.JsonParseException -import com.fasterxml.jackson.core.JsonParser -import com.fasterxml.jackson.core.JsonToken -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS -import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE -import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET -import org.opendc.trace.util.convertTo -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] implementation for the OpenDC VM interference JSON format. - */ -internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) : TableReader { - /** - * A flag to indicate whether a single row has been read already. - */ - private var isStarted = false - - override fun nextRow(): Boolean { - if (!isStarted) { - isStarted = true - - parser.nextToken() - - if (!parser.isExpectedStartArrayToken) { - throw JsonParseException(parser, "Expected array at start, but got ${parser.currentToken()}") - } - } - - return if (parser.isClosed || parser.nextToken() == JsonToken.END_ARRAY) { - parser.close() - reset() - false - } else { - parseGroup(parser) - true - } - } - - private val colMembers = 0 - private val colTarget = 1 - private val colScore = 2 - - private val typeMembers = TableColumnType.Set(TableColumnType.String) - - override fun resolve(name: String): Int { - return when (name) { - INTERFERENCE_GROUP_MEMBERS -> colMembers - INTERFERENCE_GROUP_TARGET -> colTarget - INTERFERENCE_GROUP_SCORE -> colScore - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - return when (index) { - colMembers, colTarget, colScore -> false - else -> throw IllegalArgumentException("Invalid column index $index") - } - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getInt(index: Int): Int { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getDouble(index: Int): Double { - checkActive() - return when (index) { - colTarget -> targetLoad - colScore -> score - else -> throw IllegalArgumentException("Invalid column $index") - } - } - - override fun getString(index: Int): String? { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getInstant(index: Int): Instant? { - throw IllegalArgumentException("Invalid column $index") - } - - override fun getDuration(index: Int): Duration? { - throw IllegalArgumentException("Invalid column $index") - } - - override fun <T> getList( - index: Int, - elementType: Class<T>, - ): List<T>? { - throw IllegalArgumentException("Invalid column $index") - } - - override fun <T> getSet( - index: Int, - elementType: Class<T>, - ): Set<T>? { - checkActive() - return when (index) { - colMembers -> typeMembers.convertTo(members, elementType) - else -> throw IllegalArgumentException("Invalid column $index") - } - } - - override fun <K, V> getMap( - index: Int, - keyType: Class<K>, - valueType: Class<V>, - ): Map<K, V>? { - throw IllegalArgumentException("Invalid column $index") - } - - override fun close() { - parser.close() - } - - private var members = emptySet<String>() - private var targetLoad = Double.POSITIVE_INFINITY - private var score = 1.0 - - /** - * Helper method to check if the reader is active. - */ - private fun checkActive() { - check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } - } - - /** - * Reset the state. - */ - private fun reset() { - members = emptySet() - targetLoad = Double.POSITIVE_INFINITY - score = 1.0 - } - - /** - * Parse a group an interference JSON file. - */ - private fun parseGroup(parser: JsonParser) { - var targetLoad = Double.POSITIVE_INFINITY - var score = 1.0 - val members = mutableSetOf<String>() - - if (!parser.isExpectedStartObjectToken) { - throw JsonParseException(parser, "Expected object, but got ${parser.currentToken()}") - } - - while (parser.nextValue() != JsonToken.END_OBJECT) { - when (parser.currentName) { - "vms" -> parseGroupMembers(parser, members) - "minServerLoad" -> targetLoad = parser.doubleValue - "performanceScore" -> score = parser.doubleValue - } - } - - this.members = members - this.targetLoad = targetLoad - this.score = score - } - - /** - * Parse the members of a group. - */ - private fun parseGroupMembers( - parser: JsonParser, - members: MutableSet<String>, - ) { - if (!parser.isExpectedStartArrayToken) { - throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}") - } - - while (parser.nextValue() != JsonToken.END_ARRAY) { - if (parser.currentToken() != JsonToken.VALUE_STRING) { - throw JsonParseException(parser, "Expected string value for group member") - } - - members.add(parser.text) - } - } -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt deleted file mode 100644 index 93f5a976..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc - -import com.fasterxml.jackson.core.JsonGenerator -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS -import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE -import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableWriter] implementation for the OpenDC VM interference JSON format. - */ -internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGenerator) : TableWriter { - /** - * A flag to indicate whether a row has been started. - */ - private var isRowActive = false - - init { - generator.writeStartArray() - } - - override fun startRow() { - // Reset state - members = emptySet() - targetLoad = Double.POSITIVE_INFINITY - score = 1.0 - - // Mark row as active - isRowActive = true - } - - override fun endRow() { - check(isRowActive) { "No active row" } - - generator.writeStartObject() - generator.writeArrayFieldStart("vms") - for (member in members) { - generator.writeString(member) - } - generator.writeEndArray() - generator.writeNumberField("minServerLoad", targetLoad) - generator.writeNumberField("performanceScore", score) - generator.writeEndObject() - } - - override fun resolve(name: String): Int { - return when (name) { - INTERFERENCE_GROUP_MEMBERS -> colMembers - INTERFERENCE_GROUP_TARGET -> colTarget - INTERFERENCE_GROUP_SCORE -> colScore - else -> -1 - } - } - - override fun setBoolean( - index: Int, - value: Boolean, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setInt( - index: Int, - value: Int, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setLong( - index: Int, - value: Long, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setFloat( - index: Int, - value: Float, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setDouble( - index: Int, - value: Double, - ) { - check(isRowActive) { "No active row" } - - when (index) { - colTarget -> targetLoad = (value as Number).toDouble() - colScore -> score = (value as Number).toDouble() - else -> throw IllegalArgumentException("Invalid column $index") - } - } - - override fun setString( - index: Int, - value: String, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setUUID( - index: Int, - value: UUID, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setInstant( - index: Int, - value: Instant, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun setDuration( - index: Int, - value: Duration, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun <T> setList( - index: Int, - value: List<T>, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun <T> setSet( - index: Int, - value: Set<T>, - ) { - check(isRowActive) { "No active row" } - - @Suppress("UNCHECKED_CAST") - when (index) { - colMembers -> members = value as Set<String> - else -> throw IllegalArgumentException("Invalid column index $index") - } - } - - override fun <K, V> setMap( - index: Int, - value: Map<K, V>, - ) { - throw IllegalArgumentException("Invalid column $index") - } - - override fun flush() { - generator.flush() - } - - override fun close() { - generator.writeEndArray() - generator.close() - } - - private val colMembers = 0 - private val colTarget = 1 - private val colScore = 2 - - private var members = emptySet<String>() - private var targetLoad = Double.POSITIVE_INFINITY - private var score = 1.0 -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt deleted file mode 100644 index 8e54f2b0..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc - -import org.opendc.trace.TableReader -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateDuration -import org.opendc.trace.conv.resourceStateTimestamp -import org.opendc.trace.opendc.parquet.ResourceState -import org.opendc.trace.util.parquet.LocalParquetReader -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] implementation for the OpenDC virtual machine trace format. - */ -internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader<ResourceState>) : TableReader { - /** - * The current record. - */ - private var record: ResourceState? = null - - override fun nextRow(): Boolean { - try { - val record = reader.read() - this.record = record - - return record != null - } catch (e: Throwable) { - this.record = null - throw e - } - } - - private val colID = 0 - private val colTimestamp = 1 - private val colDuration = 2 - private val colCpuCount = 3 - private val colCpuUsage = 4 - - override fun resolve(name: String): Int { - return when (name) { - resourceID -> colID - resourceStateTimestamp -> colTimestamp - resourceStateDuration -> colDuration - resourceCpuCount -> colCpuCount - resourceStateCpuUsage -> colCpuUsage - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in 0..colCpuUsage) { "Invalid column index" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun getInt(index: Int): Int { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colCpuCount -> record.cpuCount - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun getDouble(index: Int): Double { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - colCpuUsage -> record.cpuUsage - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } - } - - override fun getString(index: Int): String { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colID -> record.id - else -> throw IllegalArgumentException("Invalid column index $index") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun getInstant(index: Int): Instant { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colTimestamp -> record.timestamp - else -> throw IllegalArgumentException("Invalid column index $index") - } - } - - override fun getDuration(index: Int): Duration { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colDuration -> record.duration - else -> throw IllegalArgumentException("Invalid column index $index") - } - } - - override fun <T> getList( - index: Int, - elementType: Class<T>, - ): List<T>? { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun <T> getSet( - index: Int, - elementType: Class<T>, - ): Set<T>? { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun <K, V> getMap( - index: Int, - keyType: Class<K>, - valueType: Class<V>, - ): Map<K, V>? { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun close() { - reader.close() - } - - override fun toString(): String = "OdcVmResourceStateTableReader" -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt deleted file mode 100644 index 01cd13c8..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc - -import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateDuration -import org.opendc.trace.conv.resourceStateTimestamp -import org.opendc.trace.opendc.parquet.ResourceState -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableWriter] implementation for the OpenDC virtual machine trace format. - */ -internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<ResourceState>) : TableWriter { - /** - * The current state for the record that is being written. - */ - private var localIsActive = false - private var localID: String = "" - private var localTimestamp: Instant = Instant.MIN - private var localDuration: Duration = Duration.ZERO - private var localCpuCount: Int = 0 - private var localCpuUsage: Double = Double.NaN - - override fun startRow() { - localIsActive = true - localID = "" - localTimestamp = Instant.MIN - localDuration = Duration.ZERO - localCpuCount = 0 - localCpuUsage = Double.NaN - } - - override fun endRow() { - check(localIsActive) { "No active row" } - localIsActive = false - - check(lastId != localID || localTimestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" } - - writer.write(ResourceState(localID, localTimestamp, localDuration, localCpuCount, localCpuUsage)) - - lastId = localID - lastTimestamp = localTimestamp - } - - override fun resolve(name: String): Int { - return when (name) { - resourceID -> colID - resourceStateTimestamp -> colTimestamp - resourceStateDuration -> colDuration - resourceCpuCount -> colCpuCount - resourceStateCpuUsage -> colCpuUsage - else -> -1 - } - } - - override fun setBoolean( - index: Int, - value: Boolean, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setInt( - index: Int, - value: Int, - ) { - check(localIsActive) { "No active row" } - when (index) { - colCpuCount -> localCpuCount = value - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } - } - - override fun setLong( - index: Int, - value: Long, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setFloat( - index: Int, - value: Float, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setDouble( - index: Int, - value: Double, - ) { - check(localIsActive) { "No active row" } - when (index) { - colCpuUsage -> localCpuUsage = value - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } - } - - override fun setString( - index: Int, - value: String, - ) { - check(localIsActive) { "No active row" } - - when (index) { - colID -> localID = value - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } - } - - override fun setUUID( - index: Int, - value: UUID, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setInstant( - index: Int, - value: Instant, - ) { - check(localIsActive) { "No active row" } - - when (index) { - colTimestamp -> localTimestamp = value - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } - } - - override fun setDuration( - index: Int, - value: Duration, - ) { - check(localIsActive) { "No active row" } - - when (index) { - colDuration -> localDuration = value - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } - } - - override fun <T> setList( - index: Int, - value: List<T>, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun <T> setSet( - index: Int, - value: Set<T>, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun <K, V> setMap( - index: Int, - value: Map<K, V>, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun flush() { - // Not available - } - - override fun close() { - writer.close() - } - - /** - * Last column values that are used to check for correct partitioning. - */ - private var lastId: String? = null - private var lastTimestamp: Instant = Instant.MAX - - private val colID = 0 - private val colTimestamp = 1 - private val colDuration = 2 - private val colCpuCount = 3 - private val colCpuUsage = 4 -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt deleted file mode 100644 index 195929aa..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc - -import org.opendc.trace.TableReader -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStartTime -import org.opendc.trace.conv.resourceStopTime -import org.opendc.trace.opendc.parquet.Resource -import org.opendc.trace.util.parquet.LocalParquetReader -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format. - */ -internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<Resource>) : TableReader { - /** - * The current record. - */ - private var record: Resource? = null - - override fun nextRow(): Boolean { - try { - val record = reader.read() - this.record = record - - return record != null - } catch (e: Throwable) { - this.record = null - throw e - } - } - - private val colID = 0 - private val colStartTime = 1 - private val colStopTime = 2 - private val colCpuCount = 3 - private val colCpuCapacity = 4 - private val colMemCapacity = 5 - - override fun resolve(name: String): Int { - return when (name) { - resourceID -> colID - resourceStartTime -> colStartTime - resourceStopTime -> colStopTime - resourceCpuCount -> colCpuCount - resourceCpuCapacity -> colCpuCapacity - resourceMemCapacity -> colMemCapacity - else -> -1 - } - } - - override fun isNull(index: Int): Boolean { - require(index in 0..colMemCapacity) { "Invalid column index" } - return false - } - - override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") - } - - override fun getInt(index: Int): Int { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colCpuCount -> record.cpuCount - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") - } - - override fun getFloat(index: Int): Float { - throw IllegalArgumentException("Invalid column") - } - - override fun getDouble(index: Int): Double { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colCpuCapacity -> record.cpuCapacity - colMemCapacity -> record.memCapacity - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getString(index: Int): String { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colID -> record.id - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getUUID(index: Int): UUID? { - throw IllegalArgumentException("Invalid column") - } - - override fun getInstant(index: Int): Instant { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - colStartTime -> record.startTime - colStopTime -> record.stopTime - else -> throw IllegalArgumentException("Invalid column") - } - } - - override fun getDuration(index: Int): Duration? { - throw IllegalArgumentException("Invalid column") - } - - override fun <T> getList( - index: Int, - elementType: Class<T>, - ): List<T>? { - throw IllegalArgumentException("Invalid column") - } - - override fun <T> getSet( - index: Int, - elementType: Class<T>, - ): Set<T>? { - throw IllegalArgumentException("Invalid column") - } - - override fun <K, V> getMap( - index: Int, - keyType: Class<K>, - valueType: Class<V>, - ): Map<K, V>? { - throw IllegalArgumentException("Invalid column") - } - - override fun close() { - reader.close() - } - - override fun toString(): String = "OdcVmResourceTableReader" -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt deleted file mode 100644 index 5bbc2f3f..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc - -import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStartTime -import org.opendc.trace.conv.resourceStopTime -import org.opendc.trace.opendc.parquet.Resource -import java.time.Duration -import java.time.Instant -import java.util.UUID - -/** - * A [TableWriter] implementation for the OpenDC virtual machine trace format. - */ -internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resource>) : TableWriter { - /** - * The current state for the record that is being written. - */ - private var localIsActive = false - private var localId: String = "" - private var localStartTime: Instant = Instant.MIN - private var localStopTime: Instant = Instant.MIN - private var localCpuCount: Int = 0 - private var localCpuCapacity: Double = Double.NaN - private var localMemCapacity: Double = Double.NaN - - override fun startRow() { - localIsActive = true - localId = "" - localStartTime = Instant.MIN - localStopTime = Instant.MIN - localCpuCount = 0 - localCpuCapacity = Double.NaN - localMemCapacity = Double.NaN - } - - override fun endRow() { - check(localIsActive) { "No active row" } - localIsActive = false - writer.write(Resource(localId, localStartTime, localStopTime, localCpuCount, localCpuCapacity, localMemCapacity)) - } - - override fun resolve(name: String): Int { - return when (name) { - resourceID -> colID - resourceStartTime -> colStartTime - resourceStopTime -> colStopTime - resourceCpuCount -> colCpuCount - resourceCpuCapacity -> colCpuCapacity - resourceMemCapacity -> colMemCapacity - else -> -1 - } - } - - override fun setBoolean( - index: Int, - value: Boolean, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setInt( - index: Int, - value: Int, - ) { - check(localIsActive) { "No active row" } - when (index) { - colCpuCount -> localCpuCount = value - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } - } - - override fun setLong( - index: Int, - value: Long, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setFloat( - index: Int, - value: Float, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setDouble( - index: Int, - value: Double, - ) { - check(localIsActive) { "No active row" } - when (index) { - colCpuCapacity -> localCpuCapacity = value - colMemCapacity -> localMemCapacity = value - else -> throw IllegalArgumentException("Invalid column or type [index $index]") - } - } - - override fun setString( - index: Int, - value: String, - ) { - check(localIsActive) { "No active row" } - when (index) { - colID -> localId = value - else -> throw IllegalArgumentException("Invalid column index $index") - } - } - - override fun setUUID( - index: Int, - value: UUID, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun setInstant( - index: Int, - value: Instant, - ) { - check(localIsActive) { "No active row" } - when (index) { - colStartTime -> localStartTime = value - colStopTime -> localStopTime = value - else -> throw IllegalArgumentException("Invalid column index $index") - } - } - - override fun setDuration( - index: Int, - value: Duration, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun <T> setList( - index: Int, - value: List<T>, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun <T> setSet( - index: Int, - value: Set<T>, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun <K, V> setMap( - index: Int, - value: Map<K, V>, - ) { - throw IllegalArgumentException("Invalid column or type [index $index]") - } - - override fun flush() { - // Not available - } - - override fun close() { - writer.close() - } - - private val colID = 0 - private val colStartTime = 1 - private val colStopTime = 2 - private val colCpuCount = 3 - private val colCpuCapacity = 4 - private val colMemCapacity = 5 -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt deleted file mode 100644 index 9abe872f..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc - -import com.fasterxml.jackson.core.JsonEncoding -import com.fasterxml.jackson.core.JsonFactory -import org.apache.parquet.column.ParquetProperties -import org.apache.parquet.hadoop.ParquetFileWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.TableWriter -import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS -import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE -import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET -import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS -import org.opendc.trace.conv.TABLE_RESOURCES -import org.opendc.trace.conv.TABLE_RESOURCE_STATES -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStartTime -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateDuration -import org.opendc.trace.conv.resourceStateTimestamp -import org.opendc.trace.conv.resourceStopTime -import org.opendc.trace.opendc.parquet.ResourceReadSupport -import org.opendc.trace.opendc.parquet.ResourceStateReadSupport -import org.opendc.trace.opendc.parquet.ResourceStateWriteSupport -import org.opendc.trace.opendc.parquet.ResourceWriteSupport -import org.opendc.trace.spi.TableDetails -import org.opendc.trace.spi.TraceFormat -import org.opendc.trace.util.parquet.LocalParquetReader -import org.opendc.trace.util.parquet.LocalParquetWriter -import java.nio.file.Files -import java.nio.file.Path -import kotlin.io.path.exists - -/** - * A [TraceFormat] implementation of the OpenDC virtual machine trace format. - */ -public class OdcVmTraceFormat : TraceFormat { - /** - * A [JsonFactory] that is used to parse the JSON-based interference model. - */ - private val jsonFactory = JsonFactory() - - /** - * The name of this trace format. - */ - override val name: String = "opendc-vm" - - override fun create(path: Path) { - // Construct directory containing the trace files - Files.createDirectories(path) - - val tables = getTables(path) - - for (table in tables) { - val writer = newWriter(path, table) - writer.close() - } - } - - override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES, TABLE_INTERFERENCE_GROUPS) - - override fun getDetails( - path: Path, - table: String, - ): TableDetails { - return when (table) { - TABLE_RESOURCES -> - TableDetails( - listOf( - TableColumn(resourceID, TableColumnType.String), - TableColumn(resourceStartTime, TableColumnType.Instant), - TableColumn(resourceStopTime, TableColumnType.Instant), - TableColumn(resourceCpuCount, TableColumnType.Int), - TableColumn(resourceCpuCapacity, TableColumnType.Double), - TableColumn(resourceMemCapacity, TableColumnType.Double), - ), - ) - TABLE_RESOURCE_STATES -> - TableDetails( - listOf( - TableColumn(resourceID, TableColumnType.String), - TableColumn(resourceStateTimestamp, TableColumnType.Instant), - TableColumn(resourceStateDuration, TableColumnType.Duration), - TableColumn(resourceCpuCount, TableColumnType.Int), - TableColumn(resourceStateCpuUsage, TableColumnType.Double), - ), - ) - TABLE_INTERFERENCE_GROUPS -> - TableDetails( - listOf( - TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)), - TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double), - TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double), - ), - ) - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newReader( - path: Path, - table: String, - projection: List<String>?, - ): TableReader { - return when (table) { - TABLE_RESOURCES -> { - val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection)) - OdcVmResourceTableReader(reader) - } - TABLE_RESOURCE_STATES -> { - val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport(projection)) - OdcVmResourceStateTableReader(reader) - } - TABLE_INTERFERENCE_GROUPS -> { - val modelPath = path.resolve("interference-model.json") - val parser = - if (modelPath.exists()) { - jsonFactory.createParser(modelPath.toFile()) - } else { - jsonFactory.createParser("[]") // If model does not exist, return empty model - } - - OdcVmInterferenceJsonTableReader(parser) - } - else -> throw IllegalArgumentException("Table $table not supported") - } - } - - override fun newWriter( - path: Path, - table: String, - ): TableWriter { - return when (table) { - TABLE_RESOURCES -> { - val writer = - LocalParquetWriter.builder(path.resolve("meta.parquet"), ResourceWriteSupport()) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withPageWriteChecksumEnabled(true) - .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build() - OdcVmResourceTableWriter(writer) - } - TABLE_RESOURCE_STATES -> { - val writer = - LocalParquetWriter.builder(path.resolve("trace.parquet"), ResourceStateWriteSupport()) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withDictionaryEncoding("id", true) - .withBloomFilterEnabled("id", true) - .withPageWriteChecksumEnabled(true) - .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build() - OdcVmResourceStateTableWriter(writer) - } - TABLE_INTERFERENCE_GROUPS -> { - val generator = jsonFactory.createGenerator(path.resolve("interference-model.json").toFile(), JsonEncoding.UTF8) - OdcVmInterferenceJsonTableWriter(generator) - } - else -> throw IllegalArgumentException("Table $table not supported") - } - } -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt deleted file mode 100644 index 13eefe72..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc.parquet - -import java.time.Instant - -/** - * A description of a resource in a trace. - */ -internal data class Resource( - val id: String, - val startTime: Instant, - val stopTime: Instant, - val cpuCount: Int, - val cpuCapacity: Double, - val memCapacity: Double, -) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt deleted file mode 100644 index 8bada02e..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.InitContext -import org.apache.parquet.hadoop.api.ReadSupport -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types -import org.opendc.trace.TableColumn -import org.opendc.trace.conv.resourceCpuCapacity -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceMemCapacity -import org.opendc.trace.conv.resourceStartTime -import org.opendc.trace.conv.resourceStopTime - -/** - * A [ReadSupport] instance for [Resource] objects. - */ -internal class ResourceReadSupport(private val projection: List<String>?) : ReadSupport<Resource>() { - /** - * Mapping from field names to [TableColumn]s. - */ - private val fieldMap = - mapOf( - "id" to resourceID, - "submissionTime" to resourceStartTime, - "start_time" to resourceStartTime, - "endTime" to resourceStopTime, - "stop_time" to resourceStopTime, - "maxCores" to resourceCpuCount, - "cpu_count" to resourceCpuCount, - "cpu_capacity" to resourceCpuCapacity, - "requiredMemory" to resourceMemCapacity, - "mem_capacity" to resourceMemCapacity, - ) - - override fun init(context: InitContext): ReadContext { - val projectedSchema = - if (projection != null) { - Types.buildMessage() - .apply { - val projectionSet = projection.toSet() - - for (field in READ_SCHEMA.fields) { - val col = fieldMap[field.name] ?: continue - if (col in projectionSet) { - addField(field) - } - } - } - .named(READ_SCHEMA.name) - } else { - READ_SCHEMA - } - - return ReadContext(projectedSchema) - } - - override fun prepareForRead( - configuration: Configuration, - keyValueMetaData: Map<String, String>, - fileSchema: MessageType, - readContext: ReadContext, - ): RecordMaterializer<Resource> = ResourceRecordMaterializer(readContext.requestedSchema) - - companion object { - /** - * Parquet read schema (version 2.0) for the "resources" table in the trace. - */ - @JvmStatic - val READ_SCHEMA_V2_0: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("submissionTime"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("endTime"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("maxCores"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("requiredMemory"), - ) - .named("resource") - - /** - * Parquet read schema (version 2.1) for the "resources" table in the trace. - */ - @JvmStatic - val READ_SCHEMA_V2_1: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("start_time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("stop_time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity"), - ) - .named("resource") - - /** - * Parquet read schema for the "resources" table in the trace. - */ - @JvmStatic - val READ_SCHEMA: MessageType = - READ_SCHEMA_V2_0 - .union(READ_SCHEMA_V2_1) - } -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt deleted file mode 100644 index 6e2afa7a..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc.parquet - -import org.apache.parquet.io.api.Binary -import org.apache.parquet.io.api.Converter -import org.apache.parquet.io.api.GroupConverter -import org.apache.parquet.io.api.PrimitiveConverter -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.MessageType -import java.time.Instant - -/** - * A [RecordMaterializer] for [Resource] records. - */ -internal class ResourceRecordMaterializer(schema: MessageType) : RecordMaterializer<Resource>() { - /** - * State of current record being read. - */ - private var localId = "" - private var localStartTime = Instant.MIN - private var localStopTime = Instant.MIN - private var localCpuCount = 0 - private var localCpuCapacity = 0.0 - private var localMemCapacity = 0.0 - - /** - * Root converter for the record. - */ - private val root = - object : GroupConverter() { - /** - * The converters for the columns of the schema. - */ - private val converters = - schema.fields.map { type -> - when (type.name) { - "id" -> - object : PrimitiveConverter() { - override fun addBinary(value: Binary) { - localId = value.toStringUsingUTF8() - } - } - "start_time", "submissionTime" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localStartTime = Instant.ofEpochMilli(value) - } - } - "stop_time", "endTime" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localStopTime = Instant.ofEpochMilli(value) - } - } - "cpu_count", "maxCores" -> - object : PrimitiveConverter() { - override fun addInt(value: Int) { - localCpuCount = value - } - } - "cpu_capacity" -> - object : PrimitiveConverter() { - override fun addDouble(value: Double) { - localCpuCapacity = value - } - } - "mem_capacity", "requiredMemory" -> - object : PrimitiveConverter() { - override fun addDouble(value: Double) { - localMemCapacity = value - } - - override fun addLong(value: Long) { - localMemCapacity = value.toDouble() - } - } - else -> error("Unknown column $type") - } - } - - override fun start() { - localId = "" - localStartTime = Instant.MIN - localStopTime = Instant.MIN - localCpuCount = 0 - localCpuCapacity = 0.0 - localMemCapacity = 0.0 - } - - override fun end() {} - - override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] - } - - override fun getCurrentRecord(): Resource = - Resource( - localId, - localStartTime, - localStopTime, - localCpuCount, - localCpuCapacity, - localMemCapacity, - ) - - override fun getRootConverter(): GroupConverter = root -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt deleted file mode 100644 index 483f444c..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc.parquet - -import java.time.Duration -import java.time.Instant - -internal class ResourceState( - val id: String, - val timestamp: Instant, - val duration: Duration, - val cpuCount: Int, - val cpuUsage: Double, -) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt deleted file mode 100644 index 21e206a9..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.InitContext -import org.apache.parquet.hadoop.api.ReadSupport -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types -import org.opendc.trace.TableColumn -import org.opendc.trace.conv.resourceCpuCount -import org.opendc.trace.conv.resourceID -import org.opendc.trace.conv.resourceStateCpuUsage -import org.opendc.trace.conv.resourceStateDuration -import org.opendc.trace.conv.resourceStateTimestamp - -/** - * A [ReadSupport] instance for [ResourceState] objects. - */ -internal class ResourceStateReadSupport(private val projection: List<String>?) : ReadSupport<ResourceState>() { - /** - * Mapping from field names to [TableColumn]s. - */ - private val fieldMap = - mapOf( - "id" to resourceID, - "time" to resourceStateTimestamp, - "timestamp" to resourceStateTimestamp, - "duration" to resourceStateDuration, - "cores" to resourceCpuCount, - "cpu_count" to resourceCpuCount, - "cpuUsage" to resourceStateCpuUsage, - "cpu_usage" to resourceStateCpuUsage, - ) - - override fun init(context: InitContext): ReadContext { - val projectedSchema = - if (projection != null) { - Types.buildMessage() - .apply { - val projectionSet = projection.toSet() - - for (field in READ_SCHEMA.fields) { - val col = fieldMap[field.name] ?: continue - if (col in projectionSet) { - addField(field) - } - } - } - .named(READ_SCHEMA.name) - } else { - READ_SCHEMA - } - - return ReadContext(projectedSchema) - } - - override fun prepareForRead( - configuration: Configuration, - keyValueMetaData: Map<String, String>, - fileSchema: MessageType, - readContext: ReadContext, - ): RecordMaterializer<ResourceState> = ResourceStateRecordMaterializer(readContext.requestedSchema) - - companion object { - /** - * Parquet read schema (version 2.0) for the "resource states" table in the trace. - */ - @JvmStatic - val READ_SCHEMA_V2_0: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cores"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpuUsage"), - ) - .named("resource_state") - - /** - * Parquet read schema (version 2.1) for the "resource states" table in the trace. - */ - @JvmStatic - val READ_SCHEMA_V2_1: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_usage"), - ) - .named("resource_state") - - /** - * Parquet read schema for the "resource states" table in the trace. - */ - @JvmStatic - val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0.union(READ_SCHEMA_V2_1) - } -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt deleted file mode 100644 index 72d24e78..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc.parquet - -import org.apache.parquet.io.api.Binary -import org.apache.parquet.io.api.Converter -import org.apache.parquet.io.api.GroupConverter -import org.apache.parquet.io.api.PrimitiveConverter -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.MessageType -import java.time.Duration -import java.time.Instant - -/** - * A [RecordMaterializer] for [ResourceState] records. - */ -internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMaterializer<ResourceState>() { - /** - * State of current record being read. - */ - private var localId = "" - private var localTimestamp = Instant.MIN - private var localDuration = Duration.ZERO - private var localCpuCount = 0 - private var localCpuUsage = 0.0 - - /** - * Root converter for the record. - */ - private val root = - object : GroupConverter() { - /** - * The converters for the columns of the schema. - */ - private val converters = - schema.fields.map { type -> - when (type.name) { - "id" -> - object : PrimitiveConverter() { - override fun addBinary(value: Binary) { - localId = value.toStringUsingUTF8() - } - } - "timestamp", "time" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localTimestamp = Instant.ofEpochMilli(value) - } - } - "duration" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - localDuration = Duration.ofMillis(value) - } - } - "cpu_count", "cores" -> - object : PrimitiveConverter() { - override fun addInt(value: Int) { - localCpuCount = value - } - } - "cpu_usage", "cpuUsage" -> - object : PrimitiveConverter() { - override fun addDouble(value: Double) { - localCpuUsage = value - } - } - "flops" -> - object : PrimitiveConverter() { - override fun addLong(value: Long) { - // Ignore to support v1 format - } - } - else -> error("Unknown column $type") - } - } - - override fun start() { - localId = "" - localTimestamp = Instant.MIN - localDuration = Duration.ZERO - localCpuCount = 0 - localCpuUsage = 0.0 - } - - override fun end() {} - - override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] - } - - override fun getCurrentRecord(): ResourceState = ResourceState(localId, localTimestamp, localDuration, localCpuCount, localCpuUsage) - - override fun getRootConverter(): GroupConverter = root -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt deleted file mode 100644 index 2a6d8c12..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api.Binary -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types - -/** - * Support for writing [Resource] instances to Parquet format. - */ -internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() { - /** - * The current active record consumer. - */ - private lateinit var recordConsumer: RecordConsumer - - override fun init(configuration: Configuration): WriteContext { - return WriteContext(WRITE_SCHEMA, emptyMap()) - } - - override fun prepareForWrite(recordConsumer: RecordConsumer) { - this.recordConsumer = recordConsumer - } - - override fun write(record: ResourceState) { - write(recordConsumer, record) - } - - private fun write( - consumer: RecordConsumer, - record: ResourceState, - ) { - consumer.startMessage() - - consumer.startField("id", 0) - consumer.addBinary(Binary.fromCharSequence(record.id)) - consumer.endField("id", 0) - - consumer.startField("timestamp", 1) - consumer.addLong(record.timestamp.toEpochMilli()) - consumer.endField("timestamp", 1) - - consumer.startField("duration", 2) - consumer.addLong(record.duration.toMillis()) - consumer.endField("duration", 2) - - consumer.startField("cpu_count", 3) - consumer.addInteger(record.cpuCount) - consumer.endField("cpu_count", 3) - - consumer.startField("cpu_usage", 4) - consumer.addDouble(record.cpuUsage) - consumer.endField("cpu_usage", 4) - - consumer.endMessage() - } - - companion object { - /** - * Parquet schema for the "resource states" table in the trace. - */ - @JvmStatic - val WRITE_SCHEMA: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_usage"), - ) - .named("resource_state") - } -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt deleted file mode 100644 index ed62e2ce..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.opendc.parquet - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api.Binary -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.LogicalTypeAnnotation -import org.apache.parquet.schema.MessageType -import org.apache.parquet.schema.PrimitiveType -import org.apache.parquet.schema.Types -import kotlin.math.roundToLong - -/** - * Support for writing [Resource] instances to Parquet format. - */ -internal class ResourceWriteSupport : WriteSupport<Resource>() { - /** - * The current active record consumer. - */ - private lateinit var recordConsumer: RecordConsumer - - override fun init(configuration: Configuration): WriteContext { - return WriteContext(WRITE_SCHEMA, emptyMap()) - } - - override fun prepareForWrite(recordConsumer: RecordConsumer) { - this.recordConsumer = recordConsumer - } - - override fun write(record: Resource) { - write(recordConsumer, record) - } - - private fun write( - consumer: RecordConsumer, - record: Resource, - ) { - consumer.startMessage() - - consumer.startField("id", 0) - consumer.addBinary(Binary.fromCharSequence(record.id)) - consumer.endField("id", 0) - - consumer.startField("start_time", 1) - consumer.addLong(record.startTime.toEpochMilli()) - consumer.endField("start_time", 1) - - consumer.startField("stop_time", 2) - consumer.addLong(record.stopTime.toEpochMilli()) - consumer.endField("stop_time", 2) - - consumer.startField("cpu_count", 3) - consumer.addInteger(record.cpuCount) - consumer.endField("cpu_count", 3) - - consumer.startField("cpu_capacity", 4) - consumer.addDouble(record.cpuCapacity) - consumer.endField("cpu_capacity", 4) - - consumer.startField("mem_capacity", 5) - consumer.addLong(record.memCapacity.roundToLong()) - consumer.endField("mem_capacity", 5) - - consumer.endMessage() - } - - companion object { - /** - * Parquet schema for the "resources" table in the trace. - */ - @JvmStatic - val WRITE_SCHEMA: MessageType = - Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("start_time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("stop_time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity"), - ) - .named("resource") - } -} diff --git a/opendc-trace/opendc-trace-opendc/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat b/opendc-trace/opendc-trace-opendc/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat deleted file mode 100644 index 94094af4..00000000 --- a/opendc-trace/opendc-trace-opendc/src/main/resources/META-INF/services/org.opendc.trace.spi.TraceFormat +++ /dev/null @@ -1 +0,0 @@ -org.opendc.trace.opendc.OdcVmTraceFormat |
