diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-06-09 10:31:41 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-06-09 10:31:41 +0200 |
| commit | d146814bbbb86bfcb19ccb94250424703e9179e5 (patch) | |
| tree | bf20f51b434d56e60ad013568ac1a32b912a3b5e /opendc-trace/opendc-trace-opendc/src/main | |
| parent | 61b6550d7a476ab1aae45a5b9385dfd6ca4f6b6f (diff) | |
| parent | 9d759c9bc987965fae8b0c16c000772c546bf3a2 (diff) | |
merge: Introduce schema for trace API (#88)
This pull request updates the OpenDC trace API to support proper specification
of a schema of the tables exposed by the traces. This functionality makes it easier
for the API consumer to understand the types exposed by the API.
## Implementation Notes :hammer_and_pick:
* Introduce type system for trace API
* Add benchmarks for odcvm trace format
* Add benchmarks for Azure trace format
* Add conformance suite for OpenDC trace API
## External Dependencies :four_leaf_clover:
* N/A
## Breaking API Changes :warning:
* Removal of typed `TableColumn`. Instead, `TableColumn` instances are now
used to describe the columns belonging to some table.
* `TableReader` and `TableWriter` do not support accessing arbitrary objects
anymore. Instead, only the types supported by the type system are exposed.
Diffstat (limited to 'opendc-trace/opendc-trace-opendc/src/main')
9 files changed, 362 insertions, 153 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt index eb91e305..1841c486 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt @@ -26,9 +26,13 @@ import org.opendc.trace.* import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET +import org.opendc.trace.util.convertTo import shaded.parquet.com.fasterxml.jackson.core.JsonParseException import shaded.parquet.com.fasterxml.jackson.core.JsonParser import shaded.parquet.com.fasterxml.jackson.core.JsonToken +import java.time.Duration +import java.time.Instant +import java.util.* /** * A [TableReader] implementation for the OpenDC VM interference JSON format. @@ -50,17 +54,24 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) } } - return if (parser.nextToken() != JsonToken.END_ARRAY) { - parseGroup(parser) - true - } else { + return if (parser.isClosed || parser.nextToken() == JsonToken.END_ARRAY) { + parser.close() reset() false + } else { + parseGroup(parser) + true } } - override fun resolve(column: TableColumn<*>): Int { - return when (column) { + private val COL_MEMBERS = 0 + private val COL_TARGET = 1 + private val COL_SCORE = 2 + + private val TYPE_MEMBERS = TableColumnType.Set(TableColumnType.String) + + override fun resolve(name: String): Int { + return when (name) { INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS INTERFERENCE_GROUP_TARGET -> COL_TARGET INTERFERENCE_GROUP_SCORE -> COL_SCORE @@ -75,48 +86,79 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) } } - override fun get(index: Int): Any { + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getInt(index: Int): Int { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getDouble(index: Int): Double { + checkActive() return when (index) { - COL_MEMBERS -> members COL_TARGET -> targetLoad COL_SCORE -> score else -> throw IllegalArgumentException("Invalid column $index") } } - override fun getBoolean(index: Int): Boolean { + override fun getString(index: Int): String? { throw IllegalArgumentException("Invalid column $index") } - override fun getInt(index: Int): Int { + override fun getUUID(index: Int): UUID? { throw IllegalArgumentException("Invalid column $index") } - override fun getLong(index: Int): Long { + override fun getInstant(index: Int): Instant? { throw IllegalArgumentException("Invalid column $index") } - override fun getDouble(index: Int): Double { + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column $index") + } + + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + throw IllegalArgumentException("Invalid column $index") + } + + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + checkActive() return when (index) { - COL_TARGET -> targetLoad - COL_SCORE -> score + COL_MEMBERS -> TYPE_MEMBERS.convertTo(members, elementType) else -> throw IllegalArgumentException("Invalid column $index") } } + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + throw IllegalArgumentException("Invalid column $index") + } + override fun close() { parser.close() } - private val COL_MEMBERS = 0 - private val COL_TARGET = 1 - private val COL_SCORE = 2 - private var members = emptySet<String>() private var targetLoad = Double.POSITIVE_INFINITY private var score = 1.0 /** + * Helper method to check if the reader is active. + */ + private fun checkActive() { + check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } + } + + /** * Reset the state. */ private fun reset() { diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt index 64bc4356..d726e890 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt @@ -27,6 +27,9 @@ import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET import shaded.parquet.com.fasterxml.jackson.core.JsonGenerator +import java.time.Duration +import java.time.Instant +import java.util.* /** * A [TableWriter] implementation for the OpenDC VM interference JSON format. @@ -65,8 +68,8 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener generator.writeEndObject() } - override fun resolve(column: TableColumn<*>): Int { - return when (column) { + override fun resolve(name: String): Int { + return when (name) { INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS INTERFERENCE_GROUP_TARGET -> COL_TARGET INTERFERENCE_GROUP_SCORE -> COL_SCORE @@ -74,40 +77,66 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener } } - override fun set(index: Int, value: Any) { + override fun setBoolean(index: Int, value: Boolean) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setInt(index: Int, value: Int) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setLong(index: Int, value: Long) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setFloat(index: Int, value: Float) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setDouble(index: Int, value: Double) { check(isRowActive) { "No active row" } - @Suppress("UNCHECKED_CAST") when (index) { - COL_MEMBERS -> members = value as Set<String> COL_TARGET -> targetLoad = (value as Number).toDouble() COL_SCORE -> score = (value as Number).toDouble() - else -> throw IllegalArgumentException("Invalid column index $index") + else -> throw IllegalArgumentException("Invalid column $index") } } - override fun setBoolean(index: Int, value: Boolean) { + override fun setString(index: Int, value: String) { throw IllegalArgumentException("Invalid column $index") } - override fun setInt(index: Int, value: Int) { + override fun setUUID(index: Int, value: UUID) { throw IllegalArgumentException("Invalid column $index") } - override fun setLong(index: Int, value: Long) { + override fun setInstant(index: Int, value: Instant) { throw IllegalArgumentException("Invalid column $index") } - override fun setDouble(index: Int, value: Double) { + override fun setDuration(index: Int, value: Duration) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun <T> setList(index: Int, value: List<T>) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun <T> setSet(index: Int, value: Set<T>) { check(isRowActive) { "No active row" } + @Suppress("UNCHECKED_CAST") when (index) { - COL_TARGET -> targetLoad = (value as Number).toDouble() - COL_SCORE -> score = (value as Number).toDouble() - else -> throw IllegalArgumentException("Invalid column $index") + COL_MEMBERS -> members = value as Set<String> + else -> throw IllegalArgumentException("Invalid column index $index") } } + override fun <K, V> setMap(index: Int, value: Map<K, V>) { + throw IllegalArgumentException("Invalid column $index") + } + override fun flush() { generator.flush() } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index 7a01b881..b256047f 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -26,6 +26,9 @@ import org.opendc.trace.* import org.opendc.trace.conv.* import org.opendc.trace.opendc.parquet.ResourceState import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Duration +import java.time.Instant +import java.util.* /** * A [TableReader] implementation for the OpenDC virtual machine trace format. @@ -48,24 +51,26 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea } } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + private val COL_ID = 0 + private val COL_TIMESTAMP = 1 + private val COL_DURATION = 2 + private val COL_CPU_COUNT = 3 + private val COL_CPU_USAGE = 4 - override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column index" } - return get(index) == null + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP + RESOURCE_STATE_DURATION -> COL_DURATION + RESOURCE_CPU_COUNT -> COL_CPU_COUNT + RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE + else -> -1 + } } - override fun get(index: Int): Any? { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - COL_ID -> record.id - COL_TIMESTAMP -> record.timestamp - COL_DURATION -> record.duration - COL_CPU_COUNT -> record.cpuCount - COL_CPU_USAGE -> record.cpuUsage - else -> throw IllegalArgumentException("Invalid column index $index") - } + override fun isNull(index: Int): Boolean { + require(index in 0..COL_CPU_USAGE) { "Invalid column index" } + return false } override fun getBoolean(index: Int): Boolean { @@ -84,6 +89,10 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea throw IllegalArgumentException("Invalid column or type [index $index]") } + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { @@ -92,23 +101,52 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea } } + override fun getString(index: Int): String { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + COL_ID -> record.id + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun getInstant(index: Int): Instant { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + COL_TIMESTAMP -> record.timestamp + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun getDuration(index: Int): Duration { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + COL_DURATION -> record.duration + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + override fun close() { reader.close() } override fun toString(): String = "OdcVmResourceStateTableReader" - - private val COL_ID = 0 - private val COL_TIMESTAMP = 1 - private val COL_DURATION = 2 - private val COL_CPU_COUNT = 3 - private val COL_CPU_USAGE = 4 - - private val columns = mapOf( - RESOURCE_ID to COL_ID, - RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, - RESOURCE_STATE_DURATION to COL_DURATION, - RESOURCE_CPU_COUNT to COL_CPU_COUNT, - RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, - ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt index 97af5b59..30375de0 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt @@ -28,6 +28,7 @@ import org.opendc.trace.conv.* import org.opendc.trace.opendc.parquet.ResourceState import java.time.Duration import java.time.Instant +import java.util.* /** * A [TableWriter] implementation for the OpenDC virtual machine trace format. @@ -64,17 +65,14 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R lastTimestamp = _timestamp } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 - - override fun set(index: Int, value: Any) { - check(_isActive) { "No active row" } - - when (index) { - COL_ID -> _id = value as String - COL_TIMESTAMP -> _timestamp = value as Instant - COL_DURATION -> _duration = value as Duration - COL_CPU_COUNT -> _cpuCount = value as Int - COL_CPU_USAGE -> _cpuUsage = value as Double + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP + RESOURCE_STATE_DURATION -> COL_DURATION + RESOURCE_CPU_COUNT -> COL_CPU_COUNT + RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE + else -> -1 } } @@ -94,6 +92,10 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R throw IllegalArgumentException("Invalid column or type [index $index]") } + override fun setFloat(index: Int, value: Float) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + override fun setDouble(index: Int, value: Double) { check(_isActive) { "No active row" } when (index) { @@ -102,6 +104,49 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R } } + override fun setString(index: Int, value: String) { + check(_isActive) { "No active row" } + + when (index) { + COL_ID -> _id = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun setUUID(index: Int, value: UUID) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setInstant(index: Int, value: Instant) { + check(_isActive) { "No active row" } + + when (index) { + COL_TIMESTAMP -> _timestamp = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun setDuration(index: Int, value: Duration) { + check(_isActive) { "No active row" } + + when (index) { + COL_DURATION -> _duration = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun <T> setList(index: Int, value: List<T>) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun <T> setSet(index: Int, value: Set<T>) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun <K, V> setMap(index: Int, value: Map<K, V>) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + override fun flush() { // Not available } @@ -121,12 +166,4 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R private val COL_DURATION = 2 private val COL_CPU_COUNT = 3 private val COL_CPU_USAGE = 4 - - private val columns = mapOf( - RESOURCE_ID to COL_ID, - RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, - RESOURCE_STATE_DURATION to COL_DURATION, - RESOURCE_CPU_COUNT to COL_CPU_COUNT, - RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, - ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt index 6102332f..76fdbca8 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt @@ -26,6 +26,9 @@ import org.opendc.trace.* import org.opendc.trace.conv.* import org.opendc.trace.opendc.parquet.Resource import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Duration +import java.time.Instant +import java.util.* /** * A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format. @@ -48,25 +51,28 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R } } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + private val COL_ID = 0 + private val COL_START_TIME = 1 + private val COL_STOP_TIME = 2 + private val COL_CPU_COUNT = 3 + private val COL_CPU_CAPACITY = 4 + private val COL_MEM_CAPACITY = 5 - override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column index" } - return get(index) == null + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_START_TIME -> COL_START_TIME + RESOURCE_STOP_TIME -> COL_STOP_TIME + RESOURCE_CPU_COUNT -> COL_CPU_COUNT + RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY + RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY + else -> -1 + } } - override fun get(index: Int): Any? { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - COL_ID -> record.id - COL_START_TIME -> record.startTime - COL_STOP_TIME -> record.stopTime - COL_CPU_COUNT -> getInt(index) - COL_CPU_CAPACITY -> getDouble(index) - COL_MEM_CAPACITY -> getDouble(index) - else -> throw IllegalArgumentException("Invalid column") - } + override fun isNull(index: Int): Boolean { + require(index in 0..COL_MEM_CAPACITY) { "Invalid column index" } + return false } override fun getBoolean(index: Int): Boolean { @@ -86,6 +92,10 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R throw IllegalArgumentException("Invalid column") } + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } @@ -96,25 +106,48 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R } } + override fun getString(index: Int): String { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + COL_ID -> record.id + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + COL_START_TIME -> record.startTime + COL_STOP_TIME -> record.stopTime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column") + } + + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + throw IllegalArgumentException("Invalid column") + } + + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + throw IllegalArgumentException("Invalid column") + } + + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + throw IllegalArgumentException("Invalid column") + } + override fun close() { reader.close() } override fun toString(): String = "OdcVmResourceTableReader" - - private val COL_ID = 0 - private val COL_START_TIME = 1 - private val COL_STOP_TIME = 2 - private val COL_CPU_COUNT = 3 - private val COL_CPU_CAPACITY = 4 - private val COL_MEM_CAPACITY = 5 - - private val columns = mapOf( - RESOURCE_ID to COL_ID, - RESOURCE_START_TIME to COL_START_TIME, - RESOURCE_STOP_TIME to COL_STOP_TIME, - RESOURCE_CPU_COUNT to COL_CPU_COUNT, - RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY, - RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY, - ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt index cae65faa..8117c3cd 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt @@ -26,7 +26,9 @@ import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.* import org.opendc.trace.conv.* import org.opendc.trace.opendc.parquet.Resource +import java.time.Duration import java.time.Instant +import java.util.* /** * A [TableWriter] implementation for the OpenDC virtual machine trace format. @@ -59,18 +61,15 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour writer.write(Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity)) } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 - - override fun set(index: Int, value: Any) { - check(_isActive) { "No active row" } - when (index) { - COL_ID -> _id = value as String - COL_START_TIME -> _startTime = value as Instant - COL_STOP_TIME -> _stopTime = value as Instant - COL_CPU_COUNT -> _cpuCount = value as Int - COL_CPU_CAPACITY -> _cpuCapacity = value as Double - COL_MEM_CAPACITY -> _memCapacity = value as Double - else -> throw IllegalArgumentException("Invalid column index $index") + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_START_TIME -> COL_START_TIME + RESOURCE_STOP_TIME -> COL_STOP_TIME + RESOURCE_CPU_COUNT -> COL_CPU_COUNT + RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY + RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY + else -> -1 } } @@ -90,6 +89,10 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour throw IllegalArgumentException("Invalid column or type [index $index]") } + override fun setFloat(index: Int, value: Float) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + override fun setDouble(index: Int, value: Double) { check(_isActive) { "No active row" } when (index) { @@ -99,6 +102,43 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour } } + override fun setString(index: Int, value: String) { + check(_isActive) { "No active row" } + when (index) { + COL_ID -> _id = value + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun setUUID(index: Int, value: UUID) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setInstant(index: Int, value: Instant) { + check(_isActive) { "No active row" } + when (index) { + COL_START_TIME -> _startTime = value + COL_STOP_TIME -> _stopTime = value + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun setDuration(index: Int, value: Duration) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun <T> setList(index: Int, value: List<T>) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun <T> setSet(index: Int, value: Set<T>) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun <K, V> setMap(index: Int, value: Map<K, V>) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + override fun flush() { // Not available } @@ -113,13 +153,4 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour private val COL_CPU_COUNT = 3 private val COL_CPU_CAPACITY = 4 private val COL_MEM_CAPACITY = 5 - - private val columns = mapOf( - RESOURCE_ID to COL_ID, - RESOURCE_START_TIME to COL_START_TIME, - RESOURCE_STOP_TIME to COL_STOP_TIME, - RESOURCE_CPU_COUNT to COL_CPU_COUNT, - RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY, - RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY, - ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index d45910c6..a9c5b934 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -73,36 +73,35 @@ public class OdcVmTraceFormat : TraceFormat { return when (table) { TABLE_RESOURCES -> TableDetails( listOf( - RESOURCE_ID, - RESOURCE_START_TIME, - RESOURCE_STOP_TIME, - RESOURCE_CPU_COUNT, - RESOURCE_CPU_CAPACITY, - RESOURCE_MEM_CAPACITY, + TableColumn(RESOURCE_ID, TableColumnType.String), + TableColumn(RESOURCE_START_TIME, TableColumnType.Instant), + TableColumn(RESOURCE_STOP_TIME, TableColumnType.Instant), + TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int), + TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double), + TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double), ) ) TABLE_RESOURCE_STATES -> TableDetails( listOf( - RESOURCE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_DURATION, - RESOURCE_CPU_COUNT, - RESOURCE_STATE_CPU_USAGE, - ), - listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + TableColumn(RESOURCE_ID, TableColumnType.String), + TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant), + TableColumn(RESOURCE_STATE_DURATION, TableColumnType.Duration), + TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int), + TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double), + ) ) TABLE_INTERFERENCE_GROUPS -> TableDetails( listOf( - INTERFERENCE_GROUP_MEMBERS, - INTERFERENCE_GROUP_TARGET, - INTERFERENCE_GROUP_SCORE, + TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)), + TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double), + TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double), ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { + override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { return when (table) { TABLE_RESOURCES -> { val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection)) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt index 0d70446d..8a8ed790 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt @@ -33,11 +33,11 @@ import org.opendc.trace.conv.* /** * A [ReadSupport] instance for [Resource] objects. */ -internal class ResourceReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Resource>() { +internal class ResourceReadSupport(private val projection: List<String>?) : ReadSupport<Resource>() { /** * Mapping from field names to [TableColumn]s. */ - private val fieldMap = mapOf<String, TableColumn<*>>( + private val fieldMap = mapOf( "id" to RESOURCE_ID, "submissionTime" to RESOURCE_START_TIME, "start_time" to RESOURCE_START_TIME, diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt index 97aa00b2..78adc649 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt @@ -33,11 +33,11 @@ import org.opendc.trace.conv.* /** * A [ReadSupport] instance for [ResourceState] objects. */ -internal class ResourceStateReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<ResourceState>() { +internal class ResourceStateReadSupport(private val projection: List<String>?) : ReadSupport<ResourceState>() { /** * Mapping from field names to [TableColumn]s. */ - private val fieldMap = mapOf<String, TableColumn<*>>( + private val fieldMap = mapOf( "id" to RESOURCE_ID, "time" to RESOURCE_STATE_TIMESTAMP, "timestamp" to RESOURCE_STATE_TIMESTAMP, |
