From 2358257c1080b7ce78270535f82f0b960d48261a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 6 Jun 2022 16:21:21 +0200 Subject: refactor(trace/api): Introduce type system for trace API This change updates the trace API by introducing a limited type system for the table columns. Previously, the table columns could have any possible type representable by the JVM. With this change, we limit the available types to a small type system. --- .../src/main/kotlin/org/opendc/trace/Table.kt | 13 +- .../main/kotlin/org/opendc/trace/TableColumn.kt | 50 +---- .../kotlin/org/opendc/trace/TableColumnType.kt | 95 +++++++++ .../main/kotlin/org/opendc/trace/TableReader.kt | 222 ++++++++++++++++----- .../main/kotlin/org/opendc/trace/TableWriter.kt | 197 ++++++++++++++---- .../opendc/trace/conv/InterferenceGroupColumns.kt | 11 +- .../org/opendc/trace/conv/ResourceColumns.kt | 17 +- .../org/opendc/trace/conv/ResourceStateColumns.kt | 28 ++- .../kotlin/org/opendc/trace/conv/TableColumns.kt | 36 ---- .../kotlin/org/opendc/trace/conv/TaskColumns.kt | 40 ++-- .../kotlin/org/opendc/trace/internal/TableImpl.kt | 7 +- .../kotlin/org/opendc/trace/spi/TableDetails.kt | 6 +- .../kotlin/org/opendc/trace/spi/TraceFormat.kt | 5 +- .../org/opendc/trace/util/CompositeTableReader.kt | 53 ++++- .../org/opendc/trace/util/TableColumnConversion.kt | 77 +++++++ .../trace/azure/AzureResourceStateTableReader.kt | 73 +++++-- .../opendc/trace/azure/AzureResourceTableReader.kt | 87 +++++--- .../org/opendc/trace/azure/AzureTraceFormat.kt | 21 +- .../org/opendc/trace/azure/AzureTraceFormatTest.kt | 6 +- .../BitbrainsExResourceStateTableReader.kt | 88 +++++--- .../trace/bitbrains/BitbrainsExTraceFormat.kt | 29 ++- .../bitbrains/BitbrainsResourceStateTableReader.kt | 110 ++++++---- .../bitbrains/BitbrainsResourceTableReader.kt | 57 +++++- .../opendc/trace/bitbrains/BitbrainsTraceFormat.kt | 33 +-- .../trace/bitbrains/BitbrainsExTraceFormatTest.kt | 2 +- .../trace/bitbrains/BitbrainsTraceFormatTest.kt | 4 +- .../opendc/trace/calcite/TraceReaderEnumerator.kt | 45 +++-- .../kotlin/org/opendc/trace/calcite/TraceTable.kt | 59 ++++-- .../kotlin/org/opendc/trace/calcite/CalciteTest.kt | 96 ++++++++- .../org/opendc/trace/gwf/GwfTaskTableReader.kt | 83 +++++--- .../kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt | 19 +- .../org/opendc/trace/gwf/GwfTraceFormatTest.kt | 20 +- .../opendc/OdcVmInterferenceJsonTableReader.kt | 60 ++++-- .../opendc/OdcVmInterferenceJsonTableWriter.kt | 55 +++-- .../trace/opendc/OdcVmResourceStateTableReader.kt | 96 ++++++--- .../trace/opendc/OdcVmResourceStateTableWriter.kt | 73 +++++-- .../trace/opendc/OdcVmResourceTableReader.kt | 97 ++++++--- .../trace/opendc/OdcVmResourceTableWriter.kt | 73 +++++-- .../org/opendc/trace/opendc/OdcVmTraceFormat.kt | 33 ++- .../trace/opendc/parquet/ResourceReadSupport.kt | 4 +- .../opendc/parquet/ResourceStateReadSupport.kt | 4 +- .../opendc/trace/opendc/OdcVmTraceFormatTest.kt | 46 ++--- .../org/opendc/trace/swf/SwfTaskTableReader.kt | 93 ++++++--- .../kotlin/org/opendc/trace/swf/SwfTraceFormat.kt | 25 ++- .../org/opendc/trace/swf/SwfTraceFormatTest.kt | 4 +- .../org/opendc/trace/tools/ConvertCommand.kt | 48 ++--- .../trace/wfformat/WfFormatTaskTableReader.kt | 78 ++++++-- .../opendc/trace/wfformat/WfFormatTraceFormat.kt | 17 +- .../trace/wfformat/WfFormatTaskTableReaderTest.kt | 6 +- .../trace/wfformat/WfFormatTraceFormatTest.kt | 16 +- .../org/opendc/trace/wtf/WtfTaskTableReader.kt | 126 ++++++++---- .../kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt | 25 ++- .../opendc/trace/wtf/parquet/TaskReadSupport.kt | 5 +- .../org/opendc/trace/wtf/WtfTraceFormatTest.kt | 24 +-- 54 files changed, 1844 insertions(+), 853 deletions(-) create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumnType.kt delete mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt create mode 100644 opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt (limited to 'opendc-trace') diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index 05d0234a..e6e97706 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -32,21 +32,16 @@ public interface Table { public val name: String /** - * The list of columns supported in this table. + * The columns in this table. */ - public val columns: List> - - /** - * The columns by which the table is partitioned. - */ - public val partitionKeys: List> + public val columns: List /** * Open a [TableReader] for a projection of this table. * - * @param projection The list of columns to fetch from the table or `null` if no projection is performed. + * @param projection The names of the columns to fetch from the table or `null` if no projection is performed. */ - public fun newReader(projection: List>? = null): TableReader + public fun newReader(projection: List? = null): TableReader /** * Open a [TableWriter] for this table. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt index b77a2982..0f75d890 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt @@ -22,47 +22,15 @@ package org.opendc.trace -import java.util.* - /** - * A column in a trace table. + * A column in a [Table]. * - * @param name The universal name of this column. + * @property name The universal name of this column. + * @property type The type of the column. + * @property isNullable A flag to indicate that the column is nullable. */ -public class TableColumn(public val name: String, type: Class) { - /** - * The type of the column. - */ - public val type: Class<*> = type - - /** - * Determine whether the type of the column is a subtype of [column]. - */ - public fun isAssignableTo(column: TableColumn<*>): Boolean { - return name == column.name && type.isAssignableFrom(column.type) - } - - /** - * Compute a hash code for this column. - */ - public override fun hashCode(): Int = Objects.hash(name, type) - - /** - * Determine whether this column is equal to [other]. - */ - public override fun equals(other: Any?): Boolean { - // Fast-path: reference equality - if (this === other) { - return true - } else if (other == null || other !is TableColumn<*>) { - return false - } - - return name == other.name && type == other.type - } - - /** - * Return a string representation of this column. - */ - public override fun toString(): String = "TableColumn[$name,$type]" -} +public data class TableColumn( + public val name: String, + public val type: TableColumnType, + public val isNullable: Boolean = false +) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumnType.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumnType.kt new file mode 100644 index 00000000..6cae47f9 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumnType.kt @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace + +/** + * The type of a [TableColumn]. + */ +public sealed class TableColumnType { + /** + * A column of booleans. + */ + public object Boolean : TableColumnType() + + /** + * A column of 32-bit integers. + */ + public object Int : TableColumnType() + + /** + * A column of 64-bit integers. + */ + public object Long : TableColumnType() + + /** + * A column of 32-bit floats. + */ + public object Float : TableColumnType() + + /** + * A column of 64-bit floats. + */ + public object Double : TableColumnType() + + /** + * A column of UUIDs. + */ + public object UUID : TableColumnType() + + /** + * A column of variable-length strings. + */ + public object String : TableColumnType() + + /** + * A column of timestamps, mapping to [java.time.Instant]. + */ + public object Instant : TableColumnType() + + /** + * A column of durations, mapping to [java.time.Duration] + */ + public object Duration : TableColumnType() + + /** + * A column containing embedded lists. + * + * @property elementType The type of the elements in the list. + */ + public data class List(public val elementType: TableColumnType) : TableColumnType() + + /** + * A column containing embedded sets. + * + * @property elementType The type of the elements in the sets. + */ + public data class Set(public val elementType: TableColumnType) : TableColumnType() + + /** + * A column containing embedded maps. + * + * @property keyType The type of the key. + * @property valueType The type of the value. + */ + public data class Map(public val keyType: TableColumnType, public val valueType: TableColumnType) : TableColumnType() +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt index 8a796e6c..42b1c690 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt @@ -22,8 +22,12 @@ package org.opendc.trace +import java.time.Duration +import java.time.Instant +import java.util.UUID + /** - * Base class for reading entities from a workload trace table in streaming fashion. + * Base class for reading entities from a [Table] in streaming fashion. */ public interface TableReader : AutoCloseable { /** @@ -34,20 +38,15 @@ public interface TableReader : AutoCloseable { public fun nextRow(): Boolean /** - * Resolve the index of the specified [column] for this reader. + * Resolve the index of the column by its [name]. * - * @param column The column to lookup. + * @param name The name of the column to lookup. * @return The zero-based index of the column or a negative value if the column is not present in this table. */ - public fun resolve(column: TableColumn<*>): Int - - /** - * Determine whether the [TableReader] supports the specified [column]. - */ - public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0 + public fun resolve(name: String): Int /** - * Determine whether the specified [column] has a `null` value for the current row. + * Determine whether the column with the specified [index] has a `null` value for the current row. * * @param index The zero-based index of the column to check for a null value. * @throws IllegalArgumentException if the column index is not valid for this reader. @@ -55,15 +54,6 @@ public interface TableReader : AutoCloseable { */ public fun isNull(index: Int): Boolean - /** - * Obtain the object value of the column with the specified [index]. - * - * @param index The zero-based index of the column to obtain the value for. - * @throws IllegalArgumentException if the column index is not valid for this reader. - * @return The object value of the column. - */ - public fun get(index: Int): Any? - /** * Obtain the boolean value of the column with the specified [index]. * @@ -74,7 +64,7 @@ public interface TableReader : AutoCloseable { public fun getBoolean(index: Int): Boolean /** - * Obtain the integer value of the column with the specified [index]. + * Obtain the boolean value of the column with the specified [index]. * * @param index The zero-based index of the column to obtain the value for. * @throws IllegalArgumentException if the column index is not valid for this reader. @@ -91,6 +81,15 @@ public interface TableReader : AutoCloseable { */ public fun getLong(index: Int): Long + /** + * Obtain the float value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The float value of the column or [Float.NaN] if the column is `null`. + */ + public fun getFloat(index: Int): Float + /** * Obtain the double value of the column with the specified [index]. * @@ -101,62 +100,193 @@ public interface TableReader : AutoCloseable { public fun getDouble(index: Int): Double /** - * Determine whether the specified [column] has a `null` value for the current row. + * Obtain the [String] value of the column with the specified [index]. * - * @param column The column to lookup. - * @throws IllegalArgumentException if the column is not valid for this table. - * @return `true` if the column value for the current value has a `null` value, `false` otherwise. + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The [String] value of the column or `null` if the column is `null`. */ - public fun isNull(column: TableColumn<*>): Boolean = isNull(resolve(column)) + public fun getString(index: Int): String? /** - * Obtain the value of the current column with type [T]. + * Obtain the [UUID] value of the column with the specified [index]. * - * @param column The column to obtain the value for. - * @throws IllegalArgumentException if the column is not valid for this reader. - * @return The object value of the column. + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The [UUID] value of the column or `null` if the column is `null`. + */ + public fun getUUID(index: Int): UUID? + + /** + * Obtain the [Instant] value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The [Instant] value of the column or `null` if the column is `null`. + */ + public fun getInstant(index: Int): Instant? + + /** + * Obtain the [Duration] value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The [Duration] value of the column or `null` if the column is `null`. + */ + public fun getDuration(index: Int): Duration? + + /** + * Obtain the value of the column with the specified [index] as [List]. + * + * @param index The zero-based index of the column to obtain the value for. + * @param elementType Class representing the Java data type to convert elements of the designated column to. + * @throws IllegalArgumentException if the column index is not valid for this reader or this type. + * @return The value of the column as `List` or `null` if the column is null. + */ + public fun getList(index: Int, elementType: Class): List? + + /** + * Obtain the value of the column with the specified [index] as [Set]. + * + * @param index The zero-based index of the column to obtain the value for. + * @param elementType Class representing the Java data type to convert elements of the designated column to. + * @throws IllegalArgumentException if the column index is not valid for this reader or this type. + * @return The value of the column as `Set` or `null` if the column is null. + */ + public fun getSet(index: Int, elementType: Class): Set? + + /** + * Obtain the value of the column with the specified [index] as [Set]. + * + * @param index The zero-based index of the column to obtain the value for. + * @param keyType Class representing the Java data type to convert the keys of the designated column to. + * @param valueType Class representing the Java data type to convert the values of the designated column to. + * @throws IllegalArgumentException if the column index is not valid for this reader or this type. + * @return The value of the column as `Map` or `null` if the column is null. + */ + public fun getMap(index: Int, keyType: Class, valueType: Class): Map? + + /** + * Determine whether a column named [name] has a `null` value for the current row. + * + * @param name The name of the column to lookup. + * @throws IllegalArgumentException if the column is not valid for this table. + * @return `true` if the column value for the current value has a `null` value, `false` otherwise. */ - public fun get(column: TableColumn): T { - // This cast should always succeed since the resolve the index of the typed column - @Suppress("UNCHECKED_CAST") - return get(resolve(column)) as T - } + public fun isNull(name: String): Boolean = isNull(resolve(name)) /** - * Read the specified [column] as boolean. + * Read the column named [name] as boolean. * - * @param column The column to obtain the value for. + * @param name The name of the column to get the value for. * @throws IllegalArgumentException if the column is not valid for this reader. * @return The boolean value of the column or `false` if the column is `null`. */ - public fun getBoolean(column: TableColumn): Boolean = getBoolean(resolve(column)) + public fun getBoolean(name: String): Boolean = getBoolean(resolve(name)) /** - * Read the specified [column] as integer. + * Read the column named [name] as integer. * - * @param column The column to obtain the value for. + * @param name The name of the column to get the value for. * @throws IllegalArgumentException if the column is not valid for this reader. * @return The integer value of the column or `0` if the column is `null`. */ - public fun getInt(column: TableColumn): Int = getInt(resolve(column)) + public fun getInt(name: String): Int = getInt(resolve(name)) /** - * Read the specified [column] as long. + * Read the column named [name] as long. * - * @param column The column to obtain the value for. + * @param name The name of the column to get the value for * @throws IllegalArgumentException if the column is not valid for this reader. * @return The long value of the column or `0` if the column is `null`. */ - public fun getLong(column: TableColumn): Long = getLong(resolve(column)) + public fun getLong(name: String): Long = getLong(resolve(name)) + + /** + * Read the column named [name] as float. + * + * @param name The name of the column to get the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The float value of the column or [Float.NaN] if the column is `null`. + */ + public fun getFloat(name: String): Float = getFloat(resolve(name)) /** - * Read the specified [column] as double. + * Read the column named [name] as double. * - * @param column The column to obtain the value for. + * @param name The name of the column to get the value for. * @throws IllegalArgumentException if the column is not valid for this reader. * @return The double value of the column or [Double.NaN] if the column is `null`. */ - public fun getDouble(column: TableColumn): Double = getDouble(resolve(column)) + public fun getDouble(name: String): Double = getDouble(resolve(name)) + + /** + * Read the column named [name] as [String]. + * + * @param name The name of the column to get the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The [String] value of the column or `null` if the column is `null`. + */ + public fun getString(name: String): String? = getString(resolve(name)) + + /** + * Read the column named [name] as [UUID]. + * + * @param name The name of the column to get the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The [UUID] value of the column or `null` if the column is `null`. + */ + public fun getUUID(name: String): UUID? = getUUID(resolve(name)) + + /** + * Read the column named [name] as [Instant]. + * + * @param name The name of the column to get the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The [Instant] value of the column or `null` if the column is `null`. + */ + public fun getInstant(name: String): Instant? = getInstant(resolve(name)) + + /** + * Read the column named [name] as [Duration]. + * + * @param name The name of the column to get the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The [Duration] value of the column or `null` if the column is `null`. + */ + public fun getDuration(name: String): Duration? = getDuration(resolve(name)) + + /** + * Obtain the value of the column named [name] as [List]. + * + * @param name The name of the column to get the value for. + * @param elementType Class representing the Java data type to convert elements of the designated column to. + * @throws IllegalArgumentException if the column index is not valid for this reader or this type. + * @return The value of the column as `List` or `null` if the column is null. + */ + public fun getList(name: String, elementType: Class): List? = getList(resolve(name), elementType) + + /** + * Obtain the value of the column named [name] as [Set]. + * + * @param name The name of the column to get the value for. + * @param elementType Class representing the Java data type to convert elements of the designated column to. + * @throws IllegalArgumentException if the column index is not valid for this reader or this type. + * @return The value of the column as `Set` or `null` if the column is null. + */ + public fun getSet(name: String, elementType: Class): Set? = getSet(resolve(name), elementType) + + /** + * Obtain the value of the column named [name] as [Set]. + * + * @param name The name of the column to get the value for. + * @param keyType Class representing the Java data type to convert the keys of the designated column to. + * @param valueType Class representing the Java data type to convert the values of the designated column to. + * @throws IllegalArgumentException if the column index is not valid for this reader or this type. + * @return The value of the column as `Map` or `null` if the column is null. + */ + public fun getMap(name: String, keyType: Class, valueType: Class): Map? = + getMap(resolve(name), keyType, valueType) /** * Closes the reader so that no further iteration or data access can be made. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt index 423ce86a..a3122ec9 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt @@ -22,6 +22,10 @@ package org.opendc.trace +import java.time.Duration +import java.time.Instant +import java.util.* + /** * Base class for writing workload traces. */ @@ -37,107 +41,228 @@ public interface TableWriter : AutoCloseable { public fun endRow() /** - * Resolve the index of the specified [column] for this writer. + * Resolve the index of the column named [name] for this writer. * - * @param column The column to lookup. + * @param name The name of the column to lookup. * @return The zero-based index of the column or a negative value if the column is not present in this table. */ - public fun resolve(column: TableColumn<*>): Int + public fun resolve(name: String): Int /** - * Determine whether the [TableReader] supports the specified [column]. + * Set the column with index [index] to boolean [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The boolean value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0 + public fun setBoolean(index: Int, value: Boolean) /** - * Set [column] to [value]. + * Set the column with index [index] to integer [value]. * * @param index The zero-based index of the column to set the value for. - * @param value The value to set the column to. + * @param value The integer value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun set(index: Int, value: Any) + public fun setInt(index: Int, value: Int) /** - * Set [column] to boolean [value]. + * Set the column with index [index] to long [value]. * * @param index The zero-based index of the column to set the value for. - * @param value The boolean value to set the column to. + * @param value The long value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setBoolean(index: Int, value: Boolean) + public fun setLong(index: Int, value: Long) /** - * Set [column] to integer [value]. + * Set the column with index [index] to float [value]. * * @param index The zero-based index of the column to set the value for. - * @param value The integer value to set the column to. + * @param value The float value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setInt(index: Int, value: Int) + public fun setFloat(index: Int, value: Float) /** - * Set [column] to long [value]. + * Set the column with index [index] to double [value]. * * @param index The zero-based index of the column to set the value for. - * @param value The long value to set the column to. + * @param value The double value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setLong(index: Int, value: Long) + public fun setDouble(index: Int, value: Double) /** - * Set [column] to double [value]. + * Set the column with index [index] to [String] [value]. * * @param index The zero-based index of the column to set the value for. - * @param value The double value to set the column to. + * @param value The [String] value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setDouble(index: Int, value: Double) + public fun setString(index: Int, value: String) + + /** + * Set the column with index [index] to [UUID] [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The [UUID] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setUUID(index: Int, value: UUID) + + /** + * Set the column with index [index] to [Instant] [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The [Instant] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setInstant(index: Int, value: Instant) /** - * Set [column] to [value]. + * Set the column with index [index] to [Duration] [value]. * - * @param column The column to set the value for. - * @param value The value to set the column to. + * @param index The zero-based index of the column to set the value for. + * @param value The [Duration] value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun set(column: TableColumn, value: T): Unit = set(resolve(column), value) + public fun setDuration(index: Int, value: Duration) /** - * Set [column] to boolean [value]. + * Set the column with index [index] to [List] [value]. * - * @param column The column to set the value for. + * @param index The zero-based index of the column to set the value for. + * @param value The [Map] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setList(index: Int, value: List) + + /** + * Set the column with index [index] to [Set] [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The [Set] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setSet(index: Int, value: Set) + + /** + * Set the column with index [index] to [Map] [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The [Map] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setMap(index: Int, value: Map) + + /** + * Set the column named [name] to boolean [value]. + * + * @param name The name of the column to set the value for. * @param value The boolean value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setBoolean(column: TableColumn, value: Boolean): Unit = setBoolean(resolve(column), value) + public fun setBoolean(name: String, value: Boolean): Unit = setBoolean(resolve(name), value) /** - * Set [column] to integer [value]. + * Set the column named [name] to integer [value]. * - * @param column The column to set the value for. + * @param name The name of the column to set the value for. * @param value The integer value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setInt(column: TableColumn, value: Int): Unit = setInt(resolve(column), value) + public fun setInt(name: String, value: Int): Unit = setInt(resolve(name), value) /** - * Set [column] to long [value]. + * Set the column named [name] to long [value]. * - * @param column The column to set the value for. + * @param name The name of the column to set the value for. * @param value The long value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setLong(column: TableColumn, value: Long): Unit = setLong(resolve(column), value) + public fun setLong(name: String, value: Long): Unit = setLong(resolve(name), value) + + /** + * Set the column named [name] to float [value]. + * + * @param name The name of the column to set the value for. + * @param value The float value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setFloat(name: String, value: Float): Unit = setFloat(resolve(name), value) /** - * Set [column] to double [value]. + * Set the column named [name] to double [value]. * - * @param column The column to set the value for. + * @param name The name of the column to set the value for. * @param value The double value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setDouble(column: TableColumn, value: Double): Unit = setDouble(resolve(column), value) + public fun setDouble(name: String, value: Double): Unit = setDouble(resolve(name), value) + + /** + * Set the column named [name] to [String] [value]. + * + * @param name The name of the column to set the value for. + * @param value The [String] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setString(name: String, value: String): Unit = setString(resolve(name), value) + + /** + * Set the column named [name] to [UUID] [value]. + * + * @param name The name of the column to set the value for. + * @param value The [UUID] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setUUID(name: String, value: UUID): Unit = setUUID(resolve(name), value) + + /** + * Set the column named [name] to [Instant] [value]. + * + * @param name The name of the column to set the value for. + * @param value The [Instant] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setInstant(name: String, value: Instant): Unit = setInstant(resolve(name), value) + + /** + * Set the column named [name] to [Duration] [value]. + * + * @param name The name of the column to set the value for. + * @param value The [Duration] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setDuration(name: String, value: Duration): Unit = setDuration(resolve(name), value) + + /** + * Set the column named [name] to [List] [value]. + * + * @param name The name of the column to set the value for. + * @param value The [List] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setList(name: String, value: List): Unit = setList(resolve(name), value) + + /** + * Set the column named [name] to [Set] [value]. + * + * @param name The name of the column to set the value for. + * @param value The [Set] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setSet(name: String, value: Set): Unit = setSet(resolve(name), value) + + /** + * Set the column named [name] to [Map] [value]. + * + * @param name The name of the column to set the value for. + * @param value The [Map] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setMap(name: String, value: Map): Unit = setMap(resolve(name), value) /** * Flush any buffered content to the underlying target. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt index 5e8859e4..2a80687c 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt @@ -23,22 +23,17 @@ @file:JvmName("InterferenceGroupColumns") package org.opendc.trace.conv -import org.opendc.trace.TableColumn - /** * Members of the interference group. */ -@JvmField -public val INTERFERENCE_GROUP_MEMBERS: TableColumn> = column("members") +public const val INTERFERENCE_GROUP_MEMBERS: String = "members" /** * Target load after which the interference occurs. */ -@JvmField -public val INTERFERENCE_GROUP_TARGET: TableColumn = column("target") +public const val INTERFERENCE_GROUP_TARGET: String = "target" /** * Performance score when the interference occurs. */ -@JvmField -public val INTERFERENCE_GROUP_SCORE: TableColumn = column("score") +public const val INTERFERENCE_GROUP_SCORE: String = "score" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt index e602e534..9b7b8195 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt @@ -23,47 +23,44 @@ @file:JvmName("ResourceColumns") package org.opendc.trace.conv -import org.opendc.trace.TableColumn -import java.time.Instant - /** * Identifier of the resource. */ @JvmField -public val RESOURCE_ID: TableColumn = column("id") +public val RESOURCE_ID: String = "id" /** * The cluster to which the resource belongs. */ @JvmField -public val RESOURCE_CLUSTER_ID: TableColumn = column("cluster_id") +public val RESOURCE_CLUSTER_ID: String = "cluster_id" /** * Start time for the resource. */ @JvmField -public val RESOURCE_START_TIME: TableColumn = column("start_time") +public val RESOURCE_START_TIME: String = "start_time" /** * End time for the resource. */ @JvmField -public val RESOURCE_STOP_TIME: TableColumn = column("stop_time") +public val RESOURCE_STOP_TIME: String = "stop_time" /** * Number of CPUs for the resource. */ @JvmField -public val RESOURCE_CPU_COUNT: TableColumn = column("cpu_count") +public val RESOURCE_CPU_COUNT: String = "cpu_count" /** * Total CPU capacity of the resource in MHz. */ @JvmField -public val RESOURCE_CPU_CAPACITY: TableColumn = column("cpu_capacity") +public val RESOURCE_CPU_CAPACITY: String = "cpu_capacity" /** * Memory capacity for the resource in KB. */ @JvmField -public val RESOURCE_MEM_CAPACITY: TableColumn = column("mem_capacity") +public val RESOURCE_MEM_CAPACITY: String = "mem_capacity" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt index 3a44f817..e9dd7673 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt @@ -23,78 +23,74 @@ @file:JvmName("ResourceStateColumns") package org.opendc.trace.conv -import org.opendc.trace.TableColumn -import java.time.Duration -import java.time.Instant - /** * The timestamp at which the state was recorded. */ @JvmField -public val RESOURCE_STATE_TIMESTAMP: TableColumn = column("timestamp") +public val RESOURCE_STATE_TIMESTAMP: String = "timestamp" /** * Duration for the state. */ @JvmField -public val RESOURCE_STATE_DURATION: TableColumn = column("duration") +public val RESOURCE_STATE_DURATION: String = "duration" /** * A flag to indicate that the resource is powered on. */ @JvmField -public val RESOURCE_STATE_POWERED_ON: TableColumn = column("powered_on") +public val RESOURCE_STATE_POWERED_ON: String = "powered_on" /** * Total CPU usage of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE: TableColumn = column("cpu_usage") +public val RESOURCE_STATE_CPU_USAGE: String = "cpu_usage" /** * Total CPU usage of the resource in percentage. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn = column("cpu_usage_pct") +public val RESOURCE_STATE_CPU_USAGE_PCT: String = "cpu_usage_pct" /** * Total CPU demand of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_DEMAND: TableColumn = column("cpu_demand") +public val RESOURCE_STATE_CPU_DEMAND: String = "cpu_demand" /** * CPU ready percentage. */ @JvmField -public val RESOURCE_STATE_CPU_READY_PCT: TableColumn = column("cpu_ready_pct") +public val RESOURCE_STATE_CPU_READY_PCT: String = "cpu_ready_pct" /** * Memory usage of the resource in KB. */ @JvmField -public val RESOURCE_STATE_MEM_USAGE: TableColumn = column("mem_usage") +public val RESOURCE_STATE_MEM_USAGE: String = "mem_usage" /** * Disk read throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_READ: TableColumn = column("disk_read") +public val RESOURCE_STATE_DISK_READ: String = "disk_read" /** * Disk write throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_WRITE: TableColumn = column("disk_write") +public val RESOURCE_STATE_DISK_WRITE: String = "disk_write" /** * Network receive throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_RX: TableColumn = column("net_rx") +public val RESOURCE_STATE_NET_RX: String = "net_rx" /** * Network transmit throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_TX: TableColumn = column("net_tx") +public val RESOURCE_STATE_NET_TX: String = "net_tx" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt deleted file mode 100644 index a58505e9..00000000 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("TableColumns") -package org.opendc.trace.conv - -import org.opendc.trace.TableColumn - -/** - * Construct a [TableColumn] with the specified [name] and type [T]. - */ -public inline fun column(name: String): TableColumn = column(name, T::class.java) - -/** - * Construct a [TableColumn] with the specified [name] and [type]. - */ -public fun column(name: String, type: Class): TableColumn = TableColumn(name, type) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt index e6daafb7..da5c343f 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt @@ -23,78 +23,62 @@ @file:JvmName("TaskColumns") package org.opendc.trace.conv -import org.opendc.trace.TableColumn -import java.time.Duration -import java.time.Instant - /** * A column containing the task identifier. */ -@JvmField -public val TASK_ID: TableColumn = column("id") +public const val TASK_ID: String = "id" /** * A column containing the identifier of the workflow. */ -@JvmField -public val TASK_WORKFLOW_ID: TableColumn = column("workflow_id") +public const val TASK_WORKFLOW_ID: String = "workflow_id" /** * A column containing the submission time of the task. */ -@JvmField -public val TASK_SUBMIT_TIME: TableColumn = column("submit_time") +public const val TASK_SUBMIT_TIME: String = "submit_time" /** * A column containing the wait time of the task. */ -@JvmField -public val TASK_WAIT_TIME: TableColumn = column("wait_time") +public const val TASK_WAIT_TIME: String = "wait_time" /** * A column containing the runtime time of the task. */ -@JvmField -public val TASK_RUNTIME: TableColumn = column("runtime") +public const val TASK_RUNTIME: String = "runtime" /** * A column containing the parents of a task. */ -@JvmField -public val TASK_PARENTS: TableColumn> = column("parents") +public const val TASK_PARENTS: String = "parents" /** * A column containing the children of a task. */ -@JvmField -public val TASK_CHILDREN: TableColumn> = column("children") +public const val TASK_CHILDREN: String = "children" /** * A column containing the requested CPUs of a task. */ -@JvmField -public val TASK_REQ_NCPUS: TableColumn = column("req_ncpus") +public const val TASK_REQ_NCPUS: String = "req_ncpus" /** * A column containing the allocated CPUs of a task. */ -@JvmField -public val TASK_ALLOC_NCPUS: TableColumn = column("alloc_ncpus") +public const val TASK_ALLOC_NCPUS: String = "alloc_ncpus" /** * A column containing the status of a task. */ -@JvmField -public val TASK_STATUS: TableColumn = column("status") +public const val TASK_STATUS: String = "status" /** * A column containing the group id of a task. */ -@JvmField -public val TASK_GROUP_ID: TableColumn = column("group_id") +public const val TASK_GROUP_ID: String = "group_id" /** * A column containing the user id of a task. */ -@JvmField -public val TASK_USER_ID: TableColumn = column("user_id") +public const val TASK_USER_ID: String = "user_id" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt index b848e19a..1e1bf676 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt @@ -37,13 +37,10 @@ internal class TableImpl(val trace: TraceImpl, override val name: String) : Tabl */ private val details = trace.format.getDetails(trace.path, name) - override val columns: List> + override val columns: List get() = details.columns - override val partitionKeys: List> - get() = details.partitionKeys - - override fun newReader(projection: List>?): TableReader { + override fun newReader(projection: List?): TableReader { return trace.format.newReader(trace.path, name, projection) } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt index 1a9b9ee1..5206d02b 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt @@ -29,9 +29,5 @@ import org.opendc.trace.TableColumn * A class used by the [TraceFormat] interface for describing the metadata of a [Table]. * * @param columns The available columns in the table. - * @param partitionKeys The table columns that act as partition keys for the table. */ -public data class TableDetails( - val columns: List>, - val partitionKeys: List> = emptyList() -) +public data class TableDetails(val columns: List) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt index 47761e0f..eff6fa83 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -22,7 +22,6 @@ package org.opendc.trace.spi -import org.opendc.trace.TableColumn import org.opendc.trace.TableReader import org.opendc.trace.TableWriter import java.nio.file.Path @@ -69,11 +68,11 @@ public interface TraceFormat { * * @param path The path to the trace to open. * @param table The name of the table to open a [TableReader] for. - * @param projection The list of [TableColumn]s to project or `null` if no projection is performed. + * @param projection The name of the columns to project or `null` if no projection is performed. * @throws IllegalArgumentException If [table] does not exist. * @return A [TableReader] instance for the table. */ - public fun newReader(path: Path, table: String, projection: List>?): TableReader + public fun newReader(path: Path, table: String, projection: List?): TableReader /** * Open a [TableWriter] for the specified [table]. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt index 11e032c7..c4854265 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt @@ -22,8 +22,10 @@ package org.opendc.trace.util -import org.opendc.trace.TableColumn import org.opendc.trace.TableReader +import java.time.Duration +import java.time.Instant +import java.util.* /** * A helper class to chain multiple [TableReader]s. @@ -63,11 +65,11 @@ public abstract class CompositeTableReader : TableReader { return delegate != null } - override fun resolve(column: TableColumn<*>): Int { + override fun resolve(name: String): Int { tryStart() val delegate = delegate - return delegate?.resolve(column) ?: -1 + return delegate?.resolve(name) ?: -1 } override fun isNull(index: Int): Boolean { @@ -75,11 +77,6 @@ public abstract class CompositeTableReader : TableReader { return delegate.isNull(index) } - override fun get(index: Int): Any? { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.get(index) - } - override fun getBoolean(index: Int): Boolean { val delegate = checkNotNull(delegate) { "Invalid reader state" } return delegate.getBoolean(index) @@ -95,11 +92,51 @@ public abstract class CompositeTableReader : TableReader { return delegate.getLong(index) } + override fun getFloat(index: Int): Float { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getFloat(index) + } + override fun getDouble(index: Int): Double { val delegate = checkNotNull(delegate) { "Invalid reader state" } return delegate.getDouble(index) } + override fun getString(index: Int): String? { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getString(index) + } + + override fun getUUID(index: Int): UUID? { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getUUID(index) + } + + override fun getInstant(index: Int): Instant? { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getInstant(index) + } + + override fun getDuration(index: Int): Duration? { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getDuration(index) + } + + override fun getList(index: Int, elementType: Class): List? { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getList(index, elementType) + } + + override fun getSet(index: Int, elementType: Class): Set? { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getSet(index, elementType) + } + + override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getMap(index, keyType, valueType) + } + override fun close() { delegate?.close() } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt new file mode 100644 index 00000000..8ef080b5 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("TableColumnConversions") +package org.opendc.trace.util + +import org.opendc.trace.TableColumnType +import java.time.Duration +import java.time.Instant +import java.util.UUID + +/** + * Helper method to convert a [List] into a [List] with elements of [targetElementType]. + */ +public fun TableColumnType.List.convertTo(value: List<*>?, targetElementType: Class): List? { + require(elementType.isCompatible(targetElementType)) { "Target element type is not compatible with $elementType" } + @Suppress("UNCHECKED_CAST") + return value as List? +} + +/** + * Helper method to convert a [Set] into a [Set] with elements of [targetElementType]. + */ +public fun TableColumnType.Set.convertTo(value: Set<*>?, targetElementType: Class): Set? { + require(elementType.isCompatible(targetElementType)) { "Target element type is not compatible with $elementType" } + @Suppress("UNCHECKED_CAST") + return value as Set? +} + +/** + * Helper method to convert a [Map] into a [Map] with [targetKeyType] keys and [targetValueType] values. + */ +public fun TableColumnType.Map.convertTo(value: Map<*, *>?, targetKeyType: Class, targetValueType: Class): Map? { + require(keyType.isCompatible(targetKeyType)) { "Target key type $targetKeyType is not compatible with $keyType" } + require(valueType.isCompatible(targetValueType)) { "Target value type $targetValueType is not compatible with $valueType" } + @Suppress("UNCHECKED_CAST") + return value as Map? +} + +/** + * Helper method to determine [javaType] is compatible with this [TableColumnType]. + */ +private fun TableColumnType.isCompatible(javaType: Class<*>): Boolean { + return when (this) { + is TableColumnType.Boolean -> javaType.isAssignableFrom(Boolean::class.java) + is TableColumnType.Int -> javaType.isAssignableFrom(Int::class.java) + is TableColumnType.Long -> javaType.isAssignableFrom(Long::class.java) + is TableColumnType.Float -> javaType.isAssignableFrom(Float::class.java) + is TableColumnType.Double -> javaType.isAssignableFrom(Double::class.java) + is TableColumnType.String -> javaType.isAssignableFrom(String::class.java) + is TableColumnType.UUID -> javaType.isAssignableFrom(UUID::class.java) + is TableColumnType.Instant -> javaType.isAssignableFrom(Instant::class.java) + is TableColumnType.Duration -> javaType.isAssignableFrom(Duration::class.java) + is TableColumnType.List -> javaType.isAssignableFrom(List::class.java) + is TableColumnType.Set -> javaType.isAssignableFrom(Set::class.java) + is TableColumnType.Map -> javaType.isAssignableFrom(Map::class.java) + } +} diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt index 3132b1d9..e9017b35 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt @@ -29,7 +29,9 @@ import org.opendc.trace.* import org.opendc.trace.conv.RESOURCE_ID import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP +import java.time.Duration import java.time.Instant +import java.util.* /** * A [TableReader] for the Azure v1 VM resource state table. @@ -63,20 +65,22 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta return true } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + private val COL_ID = 0 + private val COL_TIMESTAMP = 1 + private val COL_CPU_USAGE_PCT = 2 - override fun isNull(index: Int): Boolean { - require(index in 0..columns.size) { "Invalid column index" } - return false + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP + RESOURCE_STATE_CPU_USAGE_PCT -> COL_CPU_USAGE_PCT + else -> -1 + } } - override fun get(index: Int): Any? { - return when (index) { - COL_ID -> id - COL_TIMESTAMP -> timestamp - COL_CPU_USAGE_PCT -> cpuUsagePct - else -> throw IllegalArgumentException("Invalid column index") - } + override fun isNull(index: Int): Boolean { + require(index in 0..COL_CPU_USAGE_PCT) { "Invalid column index" } + return false } override fun getBoolean(index: Int): Boolean { @@ -91,6 +95,10 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta throw IllegalArgumentException("Invalid column") } + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + override fun getDouble(index: Int): Double { return when (index) { COL_CPU_USAGE_PCT -> cpuUsagePct @@ -98,6 +106,40 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta } } + override fun getString(index: Int): String? { + return when (index) { + COL_ID -> id + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant? { + return when (index) { + COL_TIMESTAMP -> timestamp + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column") + } + + override fun getList(index: Int, elementType: Class): List? { + throw IllegalArgumentException("Invalid column") + } + + override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { + throw IllegalArgumentException("Invalid column") + } + + override fun getSet(index: Int, elementType: Class): Set? { + throw IllegalArgumentException("Invalid column") + } + override fun close() { parser.close() } @@ -131,15 +173,6 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta cpuUsagePct = Double.NaN } - private val COL_ID = 0 - private val COL_TIMESTAMP = 1 - private val COL_CPU_USAGE_PCT = 2 - private val columns = mapOf( - RESOURCE_ID to COL_ID, - RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, - RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT - ) - companion object { /** * The [CsvSchema] that is used to parse the trace. diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt index 154a37e4..456a2536 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt @@ -27,7 +27,9 @@ import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* import org.opendc.trace.conv.* +import java.time.Duration import java.time.Instant +import java.util.* /** * A [TableReader] for the Azure v1 VM resources table. @@ -63,22 +65,26 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe return true } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + private val COL_ID = 0 + private val COL_START_TIME = 1 + private val COL_STOP_TIME = 2 + private val COL_CPU_COUNT = 3 + private val COL_MEM_CAPACITY = 4 - override fun isNull(index: Int): Boolean { - require(index in 0..columns.size) { "Invalid column index" } - return false + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_START_TIME -> COL_START_TIME + RESOURCE_STOP_TIME -> COL_STOP_TIME + RESOURCE_CPU_COUNT -> COL_CPU_COUNT + RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY + else -> -1 + } } - override fun get(index: Int): Any? { - return when (index) { - COL_ID -> id - COL_START_TIME -> startTime - COL_STOP_TIME -> stopTime - COL_CPU_COUNT -> getInt(index) - COL_MEM_CAPACITY -> getDouble(index) - else -> throw IllegalArgumentException("Invalid column") - } + override fun isNull(index: Int): Boolean { + require(index in 0..COL_MEM_CAPACITY) { "Invalid column index" } + return false } override fun getBoolean(index: Int): Boolean { @@ -93,6 +99,13 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe } override fun getLong(index: Int): Long { + return when (index) { + COL_CPU_COUNT -> cpuCores.toLong() + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getFloat(index: Int): Float { throw IllegalArgumentException("Invalid column") } @@ -103,6 +116,41 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe } } + override fun getString(index: Int): String? { + return when (index) { + COL_ID -> id + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant? { + return when (index) { + COL_START_TIME -> startTime + COL_STOP_TIME -> stopTime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column") + } + + override fun getList(index: Int, elementType: Class): List? { + throw IllegalArgumentException("Invalid column") + } + + override fun getSet(index: Int, elementType: Class): Set? { + throw IllegalArgumentException("Invalid column") + } + + override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { + throw IllegalArgumentException("Invalid column") + } + override fun close() { parser.close() } @@ -140,19 +188,6 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe memCapacity = Double.NaN } - private val COL_ID = 0 - private val COL_START_TIME = 1 - private val COL_STOP_TIME = 2 - private val COL_CPU_COUNT = 3 - private val COL_MEM_CAPACITY = 4 - private val columns = mapOf( - RESOURCE_ID to COL_ID, - RESOURCE_START_TIME to COL_START_TIME, - RESOURCE_STOP_TIME to COL_STOP_TIME, - RESOURCE_CPU_COUNT to COL_CPU_COUNT, - RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY - ) - companion object { /** * The [CsvSchema] that is used to parse the trace. diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt index 73978990..2294e4a4 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt @@ -62,26 +62,25 @@ public class AzureTraceFormat : TraceFormat { return when (table) { TABLE_RESOURCES -> TableDetails( listOf( - RESOURCE_ID, - RESOURCE_START_TIME, - RESOURCE_STOP_TIME, - RESOURCE_CPU_COUNT, - RESOURCE_MEM_CAPACITY + TableColumn(RESOURCE_ID, TableColumnType.String), + TableColumn(RESOURCE_START_TIME, TableColumnType.Instant), + TableColumn(RESOURCE_STOP_TIME, TableColumnType.Instant), + TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int), + TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double), ) ) TABLE_RESOURCE_STATES -> TableDetails( listOf( - RESOURCE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_CPU_USAGE_PCT - ), - listOf(RESOURCE_STATE_TIMESTAMP) + TableColumn(RESOURCE_ID, TableColumnType.String), + TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant), + TableColumn(RESOURCE_STATE_CPU_USAGE_PCT, TableColumnType.Double), + ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List>?): TableReader { + override fun newReader(path: Path, table: String, projection: List?): TableReader { return when (table) { TABLE_RESOURCES -> { val stream = GZIPInputStream(path.resolve("vmtable/vmtable.csv.gz").inputStream()) diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt index 263d26ce..932858f8 100644 --- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt @@ -60,7 +60,7 @@ class AzureTraceFormatTest { val reader = format.newReader(path, TABLE_RESOURCES, null) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) }, + { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.getString(RESOURCE_ID)) }, { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, { assertEquals(1750000.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) }, ) @@ -75,8 +75,8 @@ class AzureTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.get(RESOURCE_ID)) }, - { assertEquals(0, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) }, + { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.getString(RESOURCE_ID)) }, + { assertEquals(0, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) }, { assertEquals(0.0286979, reader.getDouble(RESOURCE_STATE_CPU_USAGE_PCT), 0.01) } ) diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt index 1e1d1a09..f9bd6200 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt @@ -25,7 +25,9 @@ package org.opendc.trace.bitbrains import org.opendc.trace.* import org.opendc.trace.conv.* import java.io.BufferedReader +import java.time.Duration import java.time.Instant +import java.util.* /** * A [TableReader] for the Bitbrains resource state table. @@ -89,25 +91,29 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR return true } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_CLUSTER_ID -> COL_CLUSTER_ID + RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP + RESOURCE_CPU_COUNT -> COL_NCPUS + RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY + RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE + RESOURCE_STATE_CPU_USAGE_PCT -> COL_CPU_USAGE_PCT + RESOURCE_STATE_CPU_DEMAND -> COL_CPU_DEMAND + RESOURCE_STATE_CPU_READY_PCT -> COL_CPU_READY_PCT + RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY + RESOURCE_STATE_DISK_READ -> COL_DISK_READ + RESOURCE_STATE_DISK_WRITE -> COL_DISK_WRITE + else -> -1 + } + } override fun isNull(index: Int): Boolean { - require(index in 0..COL_MAX) { "Invalid column index" } + require(index in 0 until COL_MAX) { "Invalid column index" } return false } - override fun get(index: Int): Any? { - return when (index) { - COL_ID -> id - COL_CLUSTER_ID -> cluster - COL_TIMESTAMP -> timestamp - COL_NCPUS -> getInt(index) - COL_POWERED_ON -> getInt(index) - COL_CPU_CAPACITY, COL_CPU_USAGE, COL_CPU_USAGE_PCT, COL_CPU_READY_PCT, COL_CPU_DEMAND, COL_MEM_CAPACITY, COL_DISK_READ, COL_DISK_WRITE -> getDouble(index) - else -> throw IllegalArgumentException("Invalid column") - } - } - override fun getBoolean(index: Int): Boolean { return when (index) { COL_POWERED_ON -> poweredOn @@ -126,6 +132,10 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR throw IllegalArgumentException("Invalid column") } + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + override fun getDouble(index: Int): Double { return when (index) { COL_CPU_CAPACITY -> cpuCapacity @@ -140,6 +150,41 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR } } + override fun getString(index: Int): String? { + return when (index) { + COL_ID -> id + COL_CLUSTER_ID -> cluster + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant? { + return when (index) { + COL_TIMESTAMP -> timestamp + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column") + } + + override fun getList(index: Int, elementType: Class): List? { + throw IllegalArgumentException("Invalid column") + } + + override fun getSet(index: Int, elementType: Class): Set? { + throw IllegalArgumentException("Invalid column") + } + + override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { + throw IllegalArgumentException("Invalid column") + } + override fun close() { reader.close() } @@ -195,19 +240,4 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR private val COL_MEM_CAPACITY = 20 private val COL_CPU_USAGE_PCT = 21 private val COL_MAX = COL_CPU_USAGE_PCT + 1 - - private val columns = mapOf( - RESOURCE_ID to COL_ID, - RESOURCE_CLUSTER_ID to COL_CLUSTER_ID, - RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, - RESOURCE_CPU_COUNT to COL_NCPUS, - RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY, - RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, - RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT, - RESOURCE_STATE_CPU_DEMAND to COL_CPU_DEMAND, - RESOURCE_STATE_CPU_READY_PCT to COL_CPU_READY_PCT, - RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY, - RESOURCE_STATE_DISK_READ to COL_DISK_READ, - RESOURCE_STATE_DISK_WRITE to COL_DISK_WRITE - ) } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt index 82e454ad..31c4f1e2 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt @@ -53,26 +53,25 @@ public class BitbrainsExTraceFormat : TraceFormat { return when (table) { TABLE_RESOURCE_STATES -> TableDetails( listOf( - RESOURCE_ID, - RESOURCE_CLUSTER_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_CPU_COUNT, - RESOURCE_CPU_CAPACITY, - RESOURCE_STATE_CPU_USAGE, - RESOURCE_STATE_CPU_USAGE_PCT, - RESOURCE_STATE_CPU_DEMAND, - RESOURCE_STATE_CPU_READY_PCT, - RESOURCE_MEM_CAPACITY, - RESOURCE_STATE_DISK_READ, - RESOURCE_STATE_DISK_WRITE - ), - listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + TableColumn(RESOURCE_ID, TableColumnType.String), + TableColumn(RESOURCE_CLUSTER_ID, TableColumnType.String), + TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant), + TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int), + TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double), + TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double), + TableColumn(RESOURCE_STATE_CPU_USAGE_PCT, TableColumnType.Double), + TableColumn(RESOURCE_STATE_CPU_DEMAND, TableColumnType.Double), + TableColumn(RESOURCE_STATE_CPU_READY_PCT, TableColumnType.Double), + TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double), + TableColumn(RESOURCE_STATE_DISK_READ, TableColumnType.Double), + TableColumn(RESOURCE_STATE_DISK_WRITE, TableColumnType.Double), + ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List>?): TableReader { + override fun newReader(path: Path, table: String, projection: List?): TableReader { return when (table) { TABLE_RESOURCE_STATES -> newResourceStateReader(path) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt index 214fd749..14c1f801 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt @@ -29,6 +29,7 @@ import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* import org.opendc.trace.conv.* import java.text.NumberFormat +import java.time.Duration import java.time.Instant import java.time.LocalDateTime import java.time.ZoneOffset @@ -112,21 +113,40 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, return true } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + private val COL_TIMESTAMP = 0 + private val COL_CPU_COUNT = 1 + private val COL_CPU_CAPACITY = 2 + private val COL_CPU_USAGE = 3 + private val COL_CPU_USAGE_PCT = 4 + private val COL_MEM_CAPACITY = 5 + private val COL_MEM_USAGE = 6 + private val COL_DISK_READ = 7 + private val COL_DISK_WRITE = 8 + private val COL_NET_RX = 9 + private val COL_NET_TX = 10 + private val COL_ID = 11 - override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column index" } - return false + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP + RESOURCE_CPU_COUNT -> COL_CPU_COUNT + RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY + RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE + RESOURCE_STATE_CPU_USAGE_PCT -> COL_CPU_USAGE_PCT + RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY + RESOURCE_STATE_MEM_USAGE -> COL_MEM_USAGE + RESOURCE_STATE_DISK_READ -> COL_DISK_READ + RESOURCE_STATE_DISK_WRITE -> COL_DISK_WRITE + RESOURCE_STATE_NET_RX -> COL_NET_RX + RESOURCE_STATE_NET_TX -> COL_NET_TX + else -> -1 + } } - override fun get(index: Int): Any? { - return when (index) { - COL_ID -> partition - COL_TIMESTAMP -> timestamp - COL_CPU_COUNT -> getInt(index) - COL_CPU_CAPACITY, COL_CPU_USAGE, COL_CPU_USAGE_PCT, COL_MEM_CAPACITY, COL_MEM_USAGE, COL_DISK_READ, COL_DISK_WRITE, COL_NET_RX, COL_NET_TX -> getDouble(index) - else -> throw IllegalArgumentException("Invalid column") - } + override fun isNull(index: Int): Boolean { + check(index in 0..COL_NET_TX) { "Invalid column index" } + return false } override fun getBoolean(index: Int): Boolean { @@ -144,6 +164,10 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, throw IllegalArgumentException("Invalid column") } + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + override fun getDouble(index: Int): Double { return when (index) { COL_CPU_CAPACITY -> cpuCapacity @@ -159,6 +183,40 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, } } + override fun getString(index: Int): String { + return when (index) { + COL_ID -> partition + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant? { + return when (index) { + COL_TIMESTAMP -> timestamp + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column") + } + + override fun getList(index: Int, elementType: Class): List? { + throw IllegalArgumentException("Invalid column") + } + + override fun getSet(index: Int, elementType: Class): Set? { + throw IllegalArgumentException("Invalid column") + } + + override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { + throw IllegalArgumentException("Invalid column") + } + override fun close() { parser.close() } @@ -228,34 +286,6 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, netTransmitted = Double.NaN } - private val COL_TIMESTAMP = 0 - private val COL_CPU_COUNT = 1 - private val COL_CPU_CAPACITY = 2 - private val COL_CPU_USAGE = 3 - private val COL_CPU_USAGE_PCT = 4 - private val COL_MEM_CAPACITY = 5 - private val COL_MEM_USAGE = 6 - private val COL_DISK_READ = 7 - private val COL_DISK_WRITE = 8 - private val COL_NET_RX = 9 - private val COL_NET_TX = 10 - private val COL_ID = 11 - - private val columns = mapOf( - RESOURCE_ID to COL_ID, - RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, - RESOURCE_CPU_COUNT to COL_CPU_COUNT, - RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY, - RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, - RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT, - RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY, - RESOURCE_STATE_MEM_USAGE to COL_MEM_USAGE, - RESOURCE_STATE_DISK_READ to COL_DISK_READ, - RESOURCE_STATE_DISK_WRITE to COL_DISK_WRITE, - RESOURCE_STATE_NET_RX to COL_NET_RX, - RESOURCE_STATE_NET_TX to COL_NET_TX - ) - /** * The type of the timestamp in the trace. */ diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt index 55f09f43..c57c4cb2 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt @@ -26,6 +26,9 @@ import com.fasterxml.jackson.dataformat.csv.CsvFactory import org.opendc.trace.* import org.opendc.trace.conv.RESOURCE_ID import java.nio.file.Path +import java.time.Duration +import java.time.Instant +import java.util.* /** * A [TableReader] for the Bitbrains resource table. @@ -51,7 +54,7 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms continue } - id = reader.get(idCol) as String + id = reader.getString(idCol) return true } finally { reader.close() @@ -61,33 +64,68 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms return false } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + private val COL_ID = 0 + + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + else -> -1 + } + } override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column index" } + check(index in 0..COL_ID) { "Invalid column index" } return false } - override fun get(index: Int): Any? { + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + throw IllegalArgumentException("Invalid column") + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun getString(index: Int): String? { return when (index) { COL_ID -> id else -> throw IllegalArgumentException("Invalid column") } } - override fun getBoolean(index: Int): Boolean { + override fun getUUID(index: Int): UUID? { throw IllegalArgumentException("Invalid column") } - override fun getInt(index: Int): Int { + override fun getInstant(index: Int): Instant? { throw IllegalArgumentException("Invalid column") } - override fun getLong(index: Int): Long { + override fun getDuration(index: Int): Duration? { throw IllegalArgumentException("Invalid column") } - override fun getDouble(index: Int): Double { + override fun getList(index: Int, elementType: Class): List? { + throw IllegalArgumentException("Invalid column") + } + + override fun getSet(index: Int, elementType: Class): Set? { + throw IllegalArgumentException("Invalid column") + } + + override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { throw IllegalArgumentException("Invalid column") } @@ -104,7 +142,4 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms private fun reset() { id = null } - - private val COL_ID = 0 - private val columns = mapOf(RESOURCE_ID to COL_ID) } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt index a374e951..f3030893 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt @@ -59,29 +59,32 @@ public class BitbrainsTraceFormat : TraceFormat { override fun getDetails(path: Path, table: String): TableDetails { return when (table) { - TABLE_RESOURCES -> TableDetails(listOf(RESOURCE_ID)) + TABLE_RESOURCES -> TableDetails( + listOf( + TableColumn(RESOURCE_ID, TableColumnType.String) + ) + ) TABLE_RESOURCE_STATES -> TableDetails( listOf( - RESOURCE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_CPU_COUNT, - RESOURCE_CPU_CAPACITY, - RESOURCE_STATE_CPU_USAGE, - RESOURCE_STATE_CPU_USAGE_PCT, - RESOURCE_MEM_CAPACITY, - RESOURCE_STATE_MEM_USAGE, - RESOURCE_STATE_DISK_READ, - RESOURCE_STATE_DISK_WRITE, - RESOURCE_STATE_NET_RX, - RESOURCE_STATE_NET_TX, + TableColumn(RESOURCE_ID, TableColumnType.String), + TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant), + TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int), + TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double), + TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double), + TableColumn(RESOURCE_STATE_CPU_USAGE_PCT, TableColumnType.Double), + TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double), + TableColumn(RESOURCE_STATE_MEM_USAGE, TableColumnType.Double), + TableColumn(RESOURCE_STATE_DISK_READ, TableColumnType.Double), + TableColumn(RESOURCE_STATE_DISK_WRITE, TableColumnType.Double), + TableColumn(RESOURCE_STATE_NET_RX, TableColumnType.Double), + TableColumn(RESOURCE_STATE_NET_TX, TableColumnType.Double), ), - listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List>?): TableReader { + override fun newReader(path: Path, table: String, projection: List?): TableReader { return when (table) { TABLE_RESOURCES -> { val vms = Files.walk(path, 1) diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt index c944cb98..870129e4 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt @@ -63,7 +63,7 @@ internal class BitbrainsExTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(1631911500, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) }, + { assertEquals(1631911500, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) }, { assertEquals(21.2, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) } ) diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt index 841801e6..557f8c21 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt @@ -61,7 +61,7 @@ class BitbrainsTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("bitbrains", reader.get(RESOURCE_ID)) }, + { assertEquals("bitbrains", reader.getString(RESOURCE_ID)) }, { assertFalse(reader.nextRow()) } ) @@ -75,7 +75,7 @@ class BitbrainsTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(1376314846, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) }, + { assertEquals(1376314846, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) }, { assertEquals(19.066, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) } ) diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt index 1854f262..74bd188b 100644 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt @@ -24,10 +24,10 @@ package org.opendc.trace.calcite import org.apache.calcite.linq4j.Enumerator import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType import org.opendc.trace.TableReader -import java.sql.Timestamp -import java.time.Duration -import java.time.Instant +import java.nio.ByteBuffer +import java.nio.ByteOrder import java.util.concurrent.atomic.AtomicBoolean /** @@ -35,10 +35,10 @@ import java.util.concurrent.atomic.AtomicBoolean */ internal class TraceReaderEnumerator( private val reader: TableReader, - private val columns: List>, + private val columns: List, private val cancelFlag: AtomicBoolean ) : Enumerator { - private val columnIndices = columns.map { reader.resolve(it) }.toIntArray() + private val columnIndices = columns.map { reader.resolve(it.name) }.toIntArray() private var current: E? = null override fun moveNext(): Boolean { @@ -80,14 +80,35 @@ internal class TraceReaderEnumerator( return res } - private fun convertColumn(reader: TableReader, column: TableColumn<*>, columnIndex: Int): Any? { - val value = reader.get(columnIndex) - + private fun convertColumn(reader: TableReader, column: TableColumn, columnIndex: Int): Any? { return when (column.type) { - Instant::class.java -> Timestamp.from(value as Instant) - Duration::class.java -> (value as Duration).toMillis() - Set::class.java -> (value as Set<*>).toTypedArray() - else -> value + is TableColumnType.Boolean -> reader.getBoolean(columnIndex) + is TableColumnType.Int -> reader.getInt(columnIndex) + is TableColumnType.Long -> reader.getLong(columnIndex) + is TableColumnType.Float -> reader.getFloat(columnIndex) + is TableColumnType.Double -> reader.getDouble(columnIndex) + is TableColumnType.String -> reader.getString(columnIndex) + is TableColumnType.UUID -> { + val uuid = reader.getUUID(columnIndex) + + if (uuid != null) { + val uuidBytes = ByteArray(16) + + ByteBuffer.wrap(uuidBytes) + .order(ByteOrder.BIG_ENDIAN) + .putLong(uuid.mostSignificantBits) + .putLong(uuid.leastSignificantBits) + + uuidBytes + } else { + null + } + } + is TableColumnType.Instant -> reader.getInstant(columnIndex)?.toEpochMilli() + is TableColumnType.Duration -> reader.getDuration(columnIndex)?.toMillis() ?: 0 + is TableColumnType.List -> reader.getList(columnIndex, Any::class.java)?.toTypedArray() + is TableColumnType.Set -> reader.getSet(columnIndex, Any::class.java)?.toTypedArray() + is TableColumnType.Map -> reader.getMap(columnIndex, Any::class.java, Any::class.java) } } } diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt index 8c571b82..dfcc22a3 100644 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt @@ -38,8 +38,11 @@ import org.apache.calcite.rex.RexNode import org.apache.calcite.schema.* import org.apache.calcite.schema.impl.AbstractTableQueryable import org.apache.calcite.sql.type.SqlTypeName +import org.opendc.trace.TableColumnType +import java.nio.ByteBuffer import java.time.Duration import java.time.Instant +import java.util.* import java.util.concurrent.atomic.AtomicBoolean /** @@ -70,7 +73,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : val cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root) return object : AbstractEnumerable>() { override fun enumerator(): Enumerator> = - TraceReaderEnumerator(table.newReader(projection), projection ?: table.columns, cancelFlag) + TraceReaderEnumerator(table.newReader(projection?.map { it.name }), projection ?: table.columns, cancelFlag) } } @@ -78,7 +81,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : val table = table val columns = table.columns val writer = table.newWriter() - val columnIndices = columns.map { writer.resolve(it) }.toIntArray() + val columnIndices = columns.map { writer.resolve(it.name) }.toIntArray() var rowCount = 0L try { @@ -90,16 +93,24 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : continue } val columnType = columns[index].type - - writer.set( - columnIndices[index], - when (columnType) { - Duration::class.java -> Duration.ofMillis(value as Long) - Instant::class.java -> Instant.ofEpochMilli(value as Long) - Set::class.java -> (value as List<*>).toSet() - else -> value + val columnIndex = columnIndices[index] + when (columnType) { + is TableColumnType.Boolean -> writer.setBoolean(columnIndex, value as Boolean) + is TableColumnType.Int -> writer.setInt(columnIndex, value as Int) + is TableColumnType.Long -> writer.setLong(columnIndex, value as Long) + is TableColumnType.Float -> writer.setFloat(columnIndex, value as Float) + is TableColumnType.Double -> writer.setDouble(columnIndex, value as Double) + is TableColumnType.String -> writer.setString(columnIndex, value as String) + is TableColumnType.UUID -> { + val bb = ByteBuffer.wrap(value as ByteArray) + writer.setUUID(columnIndex, UUID(bb.getLong(), bb.getLong())) } - ) + is TableColumnType.Instant -> writer.setInstant(columnIndex, Instant.ofEpochMilli(value as Long)) + is TableColumnType.Duration -> writer.setDuration(columnIndex, Duration.ofMillis(value as Long)) + is TableColumnType.List -> writer.setList(columnIndex, value as List<*>) + is TableColumnType.Set -> writer.setSet(columnIndex, (value as List<*>).toSet()) + is TableColumnType.Map -> writer.setMap(columnIndex, value as Map<*, *>) + } } writer.endRow() @@ -161,16 +172,26 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : for (column in table.columns) { names.add(column.name) - types.add( - when (column.type) { - Instant::class.java -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP) - Duration::class.java -> typeFactory.createSqlType(SqlTypeName.BIGINT) - Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.UNKNOWN), -1) - else -> typeFactory.createType(column.type) - } - ) + types.add(mapType(typeFactory, column.type)) } return typeFactory.createStructType(types, names) } + + private fun mapType(typeFactory: JavaTypeFactory, type: TableColumnType): RelDataType { + return when (type) { + is TableColumnType.Boolean -> typeFactory.createSqlType(SqlTypeName.BOOLEAN) + is TableColumnType.Int -> typeFactory.createSqlType(SqlTypeName.INTEGER) + is TableColumnType.Long -> typeFactory.createSqlType(SqlTypeName.BIGINT) + is TableColumnType.Float -> typeFactory.createSqlType(SqlTypeName.FLOAT) + is TableColumnType.Double -> typeFactory.createSqlType(SqlTypeName.DOUBLE) + is TableColumnType.String -> typeFactory.createSqlType(SqlTypeName.VARCHAR) + is TableColumnType.UUID -> typeFactory.createSqlType(SqlTypeName.BINARY, 16) + is TableColumnType.Instant -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP) + is TableColumnType.Duration -> typeFactory.createSqlType(SqlTypeName.BIGINT) + is TableColumnType.List -> typeFactory.createArrayType(mapType(typeFactory, type.elementType), -1) + is TableColumnType.Set -> typeFactory.createMultisetType(mapType(typeFactory, type.elementType), -1) + is TableColumnType.Map -> typeFactory.createMapType(mapType(typeFactory, type.keyType), mapType(typeFactory, type.valueType)) + } + } } diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt index d2877d7c..d8729034 100644 --- a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt +++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt @@ -22,16 +22,24 @@ package org.opendc.trace.calcite +import io.mockk.every +import io.mockk.mockk import org.apache.calcite.jdbc.CalciteConnection import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test +import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader import org.opendc.trace.Trace +import org.opendc.trace.conv.TABLE_RESOURCES import java.nio.file.Files import java.nio.file.Paths import java.sql.DriverManager import java.sql.ResultSet import java.sql.Statement import java.sql.Timestamp +import java.time.Duration +import java.time.Instant import java.util.* /** @@ -41,11 +49,11 @@ class CalciteTest { /** * The trace to experiment with. */ - private val trace = Trace.open(Paths.get("src/test/resources/trace"), format = "opendc-vm") + private val odcTrace = Trace.open(Paths.get("src/test/resources/trace"), format = "opendc-vm") @Test fun testResources() { - runQuery(trace, "SELECT * FROM trace.resources") { rs -> + runQuery(odcTrace, "SELECT * FROM trace.resources") { rs -> assertAll( { assertTrue(rs.next()) }, { assertEquals("1019", rs.getString("id")) }, @@ -65,7 +73,7 @@ class CalciteTest { @Test fun testResourceStates() { - runQuery(trace, "SELECT * FROM trace.resource_states") { rs -> + runQuery(odcTrace, "SELECT * FROM trace.resource_states") { rs -> assertAll( { assertTrue(rs.next()) }, { assertEquals("1019", rs.getString("id")) }, @@ -80,7 +88,7 @@ class CalciteTest { @Test fun testInterferenceGroups() { - runQuery(trace, "SELECT * FROM trace.interference_groups") { rs -> + runQuery(odcTrace, "SELECT * FROM trace.interference_groups") { rs -> assertAll( { assertTrue(rs.next()) }, { assertArrayEquals(arrayOf("1019", "1023", "1052"), rs.getArray("members").array as Array<*>) }, @@ -92,7 +100,7 @@ class CalciteTest { @Test fun testComplexQuery() { - runQuery(trace, "SELECT max(cpu_usage) as max_cpu_usage, avg(cpu_usage) as avg_cpu_usage FROM trace.resource_states") { rs -> + runQuery(odcTrace, "SELECT max(cpu_usage) as max_cpu_usage, avg(cpu_usage) as avg_cpu_usage FROM trace.resource_states") { rs -> assertAll( { assertTrue(rs.next()) }, { assertEquals(249.59993808, rs.getDouble("max_cpu_usage")) }, @@ -128,6 +136,84 @@ class CalciteTest { } } + @Test + fun testUUID() { + val trace = mockk() + every { trace.tables } returns listOf(TABLE_RESOURCES) + every { trace.getTable(TABLE_RESOURCES)!!.columns } returns listOf( + TableColumn("id", TableColumnType.UUID) + ) + every { trace.getTable(TABLE_RESOURCES)!!.newReader() } answers { + object : TableReader { + override fun nextRow(): Boolean = true + + override fun resolve(name: String): Int { + return when (name) { + "id" -> 0 + else -> -1 + } + } + + override fun isNull(index: Int): Boolean = false + + override fun getBoolean(index: Int): Boolean { + TODO("not implemented") + } + + override fun getInt(index: Int): Int { + TODO("not implemented") + } + + override fun getLong(index: Int): Long { + TODO("not implemented") + } + + override fun getFloat(index: Int): Float { + TODO("not implemented") + } + + override fun getDouble(index: Int): Double { + TODO("not implemented") + } + + override fun getString(index: Int): String? { + TODO("not implemented") + } + + override fun getUUID(index: Int): UUID = UUID(1, 2) + + override fun getInstant(index: Int): Instant? { + TODO("not implemented") + } + + override fun getDuration(index: Int): Duration? { + TODO("not implemented") + } + + override fun getList(index: Int, elementType: Class): List? { + TODO("not implemented") + } + + override fun getSet(index: Int, elementType: Class): Set? { + TODO("not implemented") + } + + override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { + TODO("not implemented") + } + + override fun close() {} + } + } + + runQuery(trace, "SELECT id FROM trace.resources") { rs -> + assertAll( + { assertTrue(rs.next()) }, + { assertArrayEquals(byteArrayOf(0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2), rs.getBytes("id")) }, + ) + } + } + /** * Helper function to run statement for the specified trace. */ diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index 42a9469e..007ab90a 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -27,8 +27,10 @@ import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.util.convertTo import java.time.Duration import java.time.Instant +import java.util.* import java.util.regex.Pattern /** @@ -68,46 +70,89 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { return true } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> COL_JOB_ID + TASK_WORKFLOW_ID -> COL_WORKFLOW_ID + TASK_SUBMIT_TIME -> COL_SUBMIT_TIME + TASK_RUNTIME -> COL_RUNTIME + TASK_ALLOC_NCPUS -> COL_NPROC + TASK_REQ_NCPUS -> COL_REQ_NPROC + TASK_PARENTS -> COL_DEPS + else -> -1 + } + } override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column" } + check(index in 0..COL_DEPS) { "Invalid column" } return false } - override fun get(index: Int): Any? { + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + return when (index) { + COL_REQ_NPROC -> reqNProcs + COL_NPROC -> nProcs + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun getString(index: Int): String? { return when (index) { COL_JOB_ID -> jobId COL_WORKFLOW_ID -> workflowId - COL_SUBMIT_TIME -> submitTime - COL_RUNTIME -> runtime - COL_REQ_NPROC -> getInt(index) - COL_NPROC -> getInt(index) - COL_DEPS -> dependencies else -> throw IllegalArgumentException("Invalid column") } } - override fun getBoolean(index: Int): Boolean { + override fun getUUID(index: Int): UUID? { throw IllegalArgumentException("Invalid column") } - override fun getInt(index: Int): Int { + override fun getInstant(index: Int): Instant? { return when (index) { - COL_REQ_NPROC -> reqNProcs - COL_NPROC -> nProcs + COL_SUBMIT_TIME -> submitTime else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(index: Int): Long { + override fun getDuration(index: Int): Duration? { + return when (index) { + COL_RUNTIME -> runtime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getList(index: Int, elementType: Class): List? { throw IllegalArgumentException("Invalid column") } - override fun getDouble(index: Int): Double { + override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { throw IllegalArgumentException("Invalid column") } + override fun getSet(index: Int, elementType: Class): Set? { + return when (index) { + COL_DEPS -> TYPE_DEPS.convertTo(dependencies, elementType) + else -> throw IllegalArgumentException("Invalid column") + } + } + override fun close() { parser.close() } @@ -180,15 +225,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { private val COL_REQ_NPROC = 5 private val COL_DEPS = 6 - private val columns = mapOf( - TASK_ID to COL_JOB_ID, - TASK_WORKFLOW_ID to COL_WORKFLOW_ID, - TASK_SUBMIT_TIME to COL_SUBMIT_TIME, - TASK_RUNTIME to COL_RUNTIME, - TASK_ALLOC_NCPUS to COL_NPROC, - TASK_REQ_NCPUS to COL_REQ_NPROC, - TASK_PARENTS to COL_DEPS - ) + private val TYPE_DEPS = TableColumnType.Set(TableColumnType.String) companion object { /** diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt index 8d9eab82..ca63b624 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt @@ -56,21 +56,20 @@ public class GwfTraceFormat : TraceFormat { return when (table) { TABLE_TASKS -> TableDetails( listOf( - TASK_WORKFLOW_ID, - TASK_ID, - TASK_SUBMIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_ALLOC_NCPUS, - TASK_PARENTS, - ), - listOf(TASK_WORKFLOW_ID) + TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List>?): TableReader { + override fun newReader(path: Path, table: String, projection: List?): TableReader { return when (table) { TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile())) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index 411d45d0..dd0e6066 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -62,11 +62,11 @@ internal class GwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals("1", reader.get(TASK_ID)) }, - { assertEquals(Instant.ofEpochSecond(16), reader.get(TASK_SUBMIT_TIME)) }, - { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) }, - { assertEquals(emptySet(), reader.get(TASK_PARENTS)) }, + { assertEquals("0", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals("1", reader.getString(TASK_ID)) }, + { assertEquals(Instant.ofEpochSecond(16), reader.getInstant(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofSeconds(11), reader.getDuration(TASK_RUNTIME)) }, + { assertEquals(emptySet(), reader.getSet(TASK_PARENTS, String::class.java)) }, ) } @@ -81,11 +81,11 @@ internal class GwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals("7", reader.get(TASK_ID)) }, - { assertEquals(Instant.ofEpochSecond(87), reader.get(TASK_SUBMIT_TIME)) }, - { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) }, - { assertEquals(setOf("4", "5", "6"), reader.get(TASK_PARENTS)) }, + { assertEquals("0", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals("7", reader.getString(TASK_ID)) }, + { assertEquals(Instant.ofEpochSecond(87), reader.getInstant(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofSeconds(11), reader.getDuration(TASK_RUNTIME)) }, + { assertEquals(setOf("4", "5", "6"), reader.getSet(TASK_PARENTS, String::class.java)) }, ) } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt index eb91e305..920daeea 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt @@ -26,9 +26,13 @@ import org.opendc.trace.* import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET +import org.opendc.trace.util.convertTo import shaded.parquet.com.fasterxml.jackson.core.JsonParseException import shaded.parquet.com.fasterxml.jackson.core.JsonParser import shaded.parquet.com.fasterxml.jackson.core.JsonToken +import java.time.Duration +import java.time.Instant +import java.util.* /** * A [TableReader] implementation for the OpenDC VM interference JSON format. @@ -59,8 +63,14 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) } } - override fun resolve(column: TableColumn<*>): Int { - return when (column) { + private val COL_MEMBERS = 0 + private val COL_TARGET = 1 + private val COL_SCORE = 2 + + private val TYPE_MEMBERS = TableColumnType.Set(TableColumnType.String) + + override fun resolve(name: String): Int { + return when (name) { INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS INTERFERENCE_GROUP_TARGET -> COL_TARGET INTERFERENCE_GROUP_SCORE -> COL_SCORE @@ -75,43 +85,65 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) } } - override fun get(index: Int): Any { + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getInt(index: Int): Int { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getDouble(index: Int): Double { return when (index) { - COL_MEMBERS -> members COL_TARGET -> targetLoad COL_SCORE -> score else -> throw IllegalArgumentException("Invalid column $index") } } - override fun getBoolean(index: Int): Boolean { + override fun getString(index: Int): String? { throw IllegalArgumentException("Invalid column $index") } - override fun getInt(index: Int): Int { + override fun getUUID(index: Int): UUID? { throw IllegalArgumentException("Invalid column $index") } - override fun getLong(index: Int): Long { + override fun getInstant(index: Int): Instant? { throw IllegalArgumentException("Invalid column $index") } - override fun getDouble(index: Int): Double { + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getList(index: Int, elementType: Class): List? { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getSet(index: Int, elementType: Class): Set? { return when (index) { - COL_TARGET -> targetLoad - COL_SCORE -> score + COL_MEMBERS -> TYPE_MEMBERS.convertTo(members, elementType) else -> throw IllegalArgumentException("Invalid column $index") } } + override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { + throw IllegalArgumentException("Invalid column $index") + } + override fun close() { parser.close() } - private val COL_MEMBERS = 0 - private val COL_TARGET = 1 - private val COL_SCORE = 2 - private var members = emptySet() private var targetLoad = Double.POSITIVE_INFINITY private var score = 1.0 diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt index 64bc4356..d726e890 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt @@ -27,6 +27,9 @@ import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET import shaded.parquet.com.fasterxml.jackson.core.JsonGenerator +import java.time.Duration +import java.time.Instant +import java.util.* /** * A [TableWriter] implementation for the OpenDC VM interference JSON format. @@ -65,8 +68,8 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener generator.writeEndObject() } - override fun resolve(column: TableColumn<*>): Int { - return when (column) { + override fun resolve(name: String): Int { + return when (name) { INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS INTERFERENCE_GROUP_TARGET -> COL_TARGET INTERFERENCE_GROUP_SCORE -> COL_SCORE @@ -74,40 +77,66 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener } } - override fun set(index: Int, value: Any) { + override fun setBoolean(index: Int, value: Boolean) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setInt(index: Int, value: Int) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setLong(index: Int, value: Long) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setFloat(index: Int, value: Float) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setDouble(index: Int, value: Double) { check(isRowActive) { "No active row" } - @Suppress("UNCHECKED_CAST") when (index) { - COL_MEMBERS -> members = value as Set COL_TARGET -> targetLoad = (value as Number).toDouble() COL_SCORE -> score = (value as Number).toDouble() - else -> throw IllegalArgumentException("Invalid column index $index") + else -> throw IllegalArgumentException("Invalid column $index") } } - override fun setBoolean(index: Int, value: Boolean) { + override fun setString(index: Int, value: String) { throw IllegalArgumentException("Invalid column $index") } - override fun setInt(index: Int, value: Int) { + override fun setUUID(index: Int, value: UUID) { throw IllegalArgumentException("Invalid column $index") } - override fun setLong(index: Int, value: Long) { + override fun setInstant(index: Int, value: Instant) { throw IllegalArgumentException("Invalid column $index") } - override fun setDouble(index: Int, value: Double) { + override fun setDuration(index: Int, value: Duration) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setList(index: Int, value: List) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setSet(index: Int, value: Set) { check(isRowActive) { "No active row" } + @Suppress("UNCHECKED_CAST") when (index) { - COL_TARGET -> targetLoad = (value as Number).toDouble() - COL_SCORE -> score = (value as Number).toDouble() - else -> throw IllegalArgumentException("Invalid column $index") + COL_MEMBERS -> members = value as Set + else -> throw IllegalArgumentException("Invalid column index $index") } } + override fun setMap(index: Int, value: Map) { + throw IllegalArgumentException("Invalid column $index") + } + override fun flush() { generator.flush() } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index 7a01b881..599f46f1 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -26,6 +26,9 @@ import org.opendc.trace.* import org.opendc.trace.conv.* import org.opendc.trace.opendc.parquet.ResourceState import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Duration +import java.time.Instant +import java.util.* /** * A [TableReader] implementation for the OpenDC virtual machine trace format. @@ -48,24 +51,26 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea } } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + private val COL_ID = 0 + private val COL_TIMESTAMP = 1 + private val COL_DURATION = 2 + private val COL_CPU_COUNT = 3 + private val COL_CPU_USAGE = 4 - override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column index" } - return get(index) == null + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP + RESOURCE_STATE_DURATION -> COL_DURATION + RESOURCE_CPU_COUNT -> COL_CPU_COUNT + RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE + else -> -1 + } } - override fun get(index: Int): Any? { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - COL_ID -> record.id - COL_TIMESTAMP -> record.timestamp - COL_DURATION -> record.duration - COL_CPU_COUNT -> record.cpuCount - COL_CPU_USAGE -> record.cpuUsage - else -> throw IllegalArgumentException("Invalid column index $index") - } + override fun isNull(index: Int): Boolean { + check(index in 0..COL_CPU_USAGE) { "Invalid column index" } + return false } override fun getBoolean(index: Int): Boolean { @@ -84,6 +89,10 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea throw IllegalArgumentException("Invalid column or type [index $index]") } + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { @@ -92,23 +101,52 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea } } + override fun getString(index: Int): String { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + COL_ID -> record.id + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun getInstant(index: Int): Instant { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + COL_TIMESTAMP -> record.timestamp + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun getDuration(index: Int): Duration { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + COL_DURATION -> record.duration + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun getList(index: Int, elementType: Class): List? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun getSet(index: Int, elementType: Class): Set? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + override fun close() { reader.close() } override fun toString(): String = "OdcVmResourceStateTableReader" - - private val COL_ID = 0 - private val COL_TIMESTAMP = 1 - private val COL_DURATION = 2 - private val COL_CPU_COUNT = 3 - private val COL_CPU_USAGE = 4 - - private val columns = mapOf( - RESOURCE_ID to COL_ID, - RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, - RESOURCE_STATE_DURATION to COL_DURATION, - RESOURCE_CPU_COUNT to COL_CPU_COUNT, - RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, - ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt index 97af5b59..f5e8b863 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt @@ -28,6 +28,7 @@ import org.opendc.trace.conv.* import org.opendc.trace.opendc.parquet.ResourceState import java.time.Duration import java.time.Instant +import java.util.* /** * A [TableWriter] implementation for the OpenDC virtual machine trace format. @@ -64,17 +65,14 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter): Int = columns[column] ?: -1 - - override fun set(index: Int, value: Any) { - check(_isActive) { "No active row" } - - when (index) { - COL_ID -> _id = value as String - COL_TIMESTAMP -> _timestamp = value as Instant - COL_DURATION -> _duration = value as Duration - COL_CPU_COUNT -> _cpuCount = value as Int - COL_CPU_USAGE -> _cpuUsage = value as Double + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP + RESOURCE_STATE_DURATION -> COL_DURATION + RESOURCE_CPU_COUNT -> COL_CPU_COUNT + RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE + else -> -1 } } @@ -94,6 +92,10 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter _id = value + } + } + + override fun setUUID(index: Int, value: UUID) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setInstant(index: Int, value: Instant) { + check(_isActive) { "No active row" } + + when (index) { + COL_TIMESTAMP -> _timestamp = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun setDuration(index: Int, value: Duration) { + check(_isActive) { "No active row" } + + when (index) { + COL_DURATION -> _duration = value + } + } + + override fun setList(index: Int, value: List) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setSet(index: Int, value: Set) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setMap(index: Int, value: Map) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + override fun flush() { // Not available } @@ -121,12 +164,4 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter): Int = columns[column] ?: -1 + private val COL_ID = 0 + private val COL_START_TIME = 1 + private val COL_STOP_TIME = 2 + private val COL_CPU_COUNT = 3 + private val COL_CPU_CAPACITY = 4 + private val COL_MEM_CAPACITY = 5 - override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column index" } - return get(index) == null + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_START_TIME -> COL_START_TIME + RESOURCE_STOP_TIME -> COL_STOP_TIME + RESOURCE_CPU_COUNT -> COL_CPU_COUNT + RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY + RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY + else -> -1 + } } - override fun get(index: Int): Any? { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - COL_ID -> record.id - COL_START_TIME -> record.startTime - COL_STOP_TIME -> record.stopTime - COL_CPU_COUNT -> getInt(index) - COL_CPU_CAPACITY -> getDouble(index) - COL_MEM_CAPACITY -> getDouble(index) - else -> throw IllegalArgumentException("Invalid column") - } + override fun isNull(index: Int): Boolean { + check(index in 0..COL_MEM_CAPACITY) { "Invalid column index" } + return false } override fun getBoolean(index: Int): Boolean { @@ -86,6 +92,10 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader record.id + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + COL_START_TIME -> record.startTime + COL_STOP_TIME -> record.stopTime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column") + } + + override fun getList(index: Int, elementType: Class): List? { + throw IllegalArgumentException("Invalid column") + } + + override fun getSet(index: Int, elementType: Class): Set? { + throw IllegalArgumentException("Invalid column") + } + + override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { + throw IllegalArgumentException("Invalid column") + } + override fun close() { reader.close() } override fun toString(): String = "OdcVmResourceTableReader" - - private val COL_ID = 0 - private val COL_START_TIME = 1 - private val COL_STOP_TIME = 2 - private val COL_CPU_COUNT = 3 - private val COL_CPU_CAPACITY = 4 - private val COL_MEM_CAPACITY = 5 - - private val columns = mapOf( - RESOURCE_ID to COL_ID, - RESOURCE_START_TIME to COL_START_TIME, - RESOURCE_STOP_TIME to COL_STOP_TIME, - RESOURCE_CPU_COUNT to COL_CPU_COUNT, - RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY, - RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY, - ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt index cae65faa..8117c3cd 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt @@ -26,7 +26,9 @@ import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.* import org.opendc.trace.conv.* import org.opendc.trace.opendc.parquet.Resource +import java.time.Duration import java.time.Instant +import java.util.* /** * A [TableWriter] implementation for the OpenDC virtual machine trace format. @@ -59,18 +61,15 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter): Int = columns[column] ?: -1 - - override fun set(index: Int, value: Any) { - check(_isActive) { "No active row" } - when (index) { - COL_ID -> _id = value as String - COL_START_TIME -> _startTime = value as Instant - COL_STOP_TIME -> _stopTime = value as Instant - COL_CPU_COUNT -> _cpuCount = value as Int - COL_CPU_CAPACITY -> _cpuCapacity = value as Double - COL_MEM_CAPACITY -> _memCapacity = value as Double - else -> throw IllegalArgumentException("Invalid column index $index") + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_START_TIME -> COL_START_TIME + RESOURCE_STOP_TIME -> COL_STOP_TIME + RESOURCE_CPU_COUNT -> COL_CPU_COUNT + RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY + RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY + else -> -1 } } @@ -90,6 +89,10 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter _id = value + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun setUUID(index: Int, value: UUID) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setInstant(index: Int, value: Instant) { + check(_isActive) { "No active row" } + when (index) { + COL_START_TIME -> _startTime = value + COL_STOP_TIME -> _stopTime = value + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun setDuration(index: Int, value: Duration) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setList(index: Int, value: List) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setSet(index: Int, value: Set) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setMap(index: Int, value: Map) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + override fun flush() { // Not available } @@ -113,13 +153,4 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter TableDetails( listOf( - RESOURCE_ID, - RESOURCE_START_TIME, - RESOURCE_STOP_TIME, - RESOURCE_CPU_COUNT, - RESOURCE_CPU_CAPACITY, - RESOURCE_MEM_CAPACITY, + TableColumn(RESOURCE_ID, TableColumnType.String), + TableColumn(RESOURCE_START_TIME, TableColumnType.Instant), + TableColumn(RESOURCE_STOP_TIME, TableColumnType.Instant), + TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int), + TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double), + TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double), ) ) TABLE_RESOURCE_STATES -> TableDetails( listOf( - RESOURCE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_DURATION, - RESOURCE_CPU_COUNT, - RESOURCE_STATE_CPU_USAGE, - ), - listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + TableColumn(RESOURCE_ID, TableColumnType.String), + TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant), + TableColumn(RESOURCE_STATE_DURATION, TableColumnType.Duration), + TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int), + TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double), + ) ) TABLE_INTERFERENCE_GROUPS -> TableDetails( listOf( - INTERFERENCE_GROUP_MEMBERS, - INTERFERENCE_GROUP_TARGET, - INTERFERENCE_GROUP_SCORE, + TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)), + TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double), + TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double), ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List>?): TableReader { + override fun newReader(path: Path, table: String, projection: List?): TableReader { return when (table) { TABLE_RESOURCES -> { val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection)) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt index 0d70446d..8a8ed790 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt @@ -33,11 +33,11 @@ import org.opendc.trace.conv.* /** * A [ReadSupport] instance for [Resource] objects. */ -internal class ResourceReadSupport(private val projection: List>?) : ReadSupport() { +internal class ResourceReadSupport(private val projection: List?) : ReadSupport() { /** * Mapping from field names to [TableColumn]s. */ - private val fieldMap = mapOf>( + private val fieldMap = mapOf( "id" to RESOURCE_ID, "submissionTime" to RESOURCE_START_TIME, "start_time" to RESOURCE_START_TIME, diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt index 97aa00b2..78adc649 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt @@ -33,11 +33,11 @@ import org.opendc.trace.conv.* /** * A [ReadSupport] instance for [ResourceState] objects. */ -internal class ResourceStateReadSupport(private val projection: List>?) : ReadSupport() { +internal class ResourceStateReadSupport(private val projection: List?) : ReadSupport() { /** * Mapping from field names to [TableColumn]s. */ - private val fieldMap = mapOf>( + private val fieldMap = mapOf( "id" to RESOURCE_ID, "time" to RESOURCE_STATE_TIMESTAMP, "timestamp" to RESOURCE_STATE_TIMESTAMP, diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index 1f4f6195..ae6e62d8 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -67,14 +67,14 @@ internal class OdcVmTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1019", reader.get(RESOURCE_ID)) }, - { assertEquals(Instant.ofEpochMilli(1376314846000), reader.get(RESOURCE_START_TIME)) }, + { assertEquals("1019", reader.getString(RESOURCE_ID)) }, + { assertEquals(Instant.ofEpochMilli(1376314846000), reader.getInstant(RESOURCE_START_TIME)) }, { assertTrue(reader.nextRow()) }, - { assertEquals("1023", reader.get(RESOURCE_ID)) }, + { assertEquals("1023", reader.getString(RESOURCE_ID)) }, { assertTrue(reader.nextRow()) }, - { assertEquals("1052", reader.get(RESOURCE_ID)) }, + { assertEquals("1052", reader.getString(RESOURCE_ID)) }, { assertTrue(reader.nextRow()) }, - { assertEquals("1073", reader.get(RESOURCE_ID)) }, + { assertEquals("1073", reader.getString(RESOURCE_ID)) }, { assertFalse(reader.nextRow()) } ) @@ -87,9 +87,9 @@ internal class OdcVmTraceFormatTest { val writer = format.newWriter(path, TABLE_RESOURCES) writer.startRow() - writer.set(RESOURCE_ID, "1019") - writer.set(RESOURCE_START_TIME, Instant.EPOCH) - writer.set(RESOURCE_STOP_TIME, Instant.EPOCH) + writer.setString(RESOURCE_ID, "1019") + writer.setInstant(RESOURCE_START_TIME, Instant.EPOCH) + writer.setInstant(RESOURCE_STOP_TIME, Instant.EPOCH) writer.setInt(RESOURCE_CPU_COUNT, 1) writer.setDouble(RESOURCE_CPU_CAPACITY, 1024.0) writer.setDouble(RESOURCE_MEM_CAPACITY, 1024.0) @@ -100,9 +100,9 @@ internal class OdcVmTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1019", reader.get(RESOURCE_ID)) }, - { assertEquals(Instant.EPOCH, reader.get(RESOURCE_START_TIME)) }, - { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STOP_TIME)) }, + { assertEquals("1019", reader.getString(RESOURCE_ID)) }, + { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_START_TIME)) }, + { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_STOP_TIME)) }, { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, { assertEquals(1024.0, reader.getDouble(RESOURCE_CPU_CAPACITY)) }, { assertEquals(1024.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) }, @@ -124,8 +124,8 @@ internal class OdcVmTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1019", reader.get(RESOURCE_ID)) }, - { assertEquals(1376314846, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) }, + { assertEquals("1019", reader.getString(RESOURCE_ID)) }, + { assertEquals(1376314846, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) }, { assertEquals(0.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) } ) @@ -138,8 +138,8 @@ internal class OdcVmTraceFormatTest { val writer = format.newWriter(path, TABLE_RESOURCE_STATES) writer.startRow() - writer.set(RESOURCE_ID, "1019") - writer.set(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH) + writer.setString(RESOURCE_ID, "1019") + writer.setInstant(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH) writer.setDouble(RESOURCE_STATE_CPU_USAGE, 23.0) writer.setInt(RESOURCE_CPU_COUNT, 1) writer.endRow() @@ -149,8 +149,8 @@ internal class OdcVmTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1019", reader.get(RESOURCE_ID)) }, - { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STATE_TIMESTAMP)) }, + { assertEquals("1019", reader.getString(RESOURCE_ID)) }, + { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_STATE_TIMESTAMP)) }, { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, { assertEquals(23.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE)) }, { assertFalse(reader.nextRow()) }, @@ -170,13 +170,13 @@ internal class OdcVmTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(setOf("1019", "1023", "1052"), reader.get(INTERFERENCE_GROUP_MEMBERS)) }, - { assertEquals(0.0, reader.get(INTERFERENCE_GROUP_TARGET)) }, - { assertEquals(0.8830158730158756, reader.get(INTERFERENCE_GROUP_SCORE)) }, + { assertEquals(setOf("1019", "1023", "1052"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) }, + { assertEquals(0.0, reader.getDouble(INTERFERENCE_GROUP_TARGET)) }, + { assertEquals(0.8830158730158756, reader.getDouble(INTERFERENCE_GROUP_SCORE)) }, { assertTrue(reader.nextRow()) }, - { assertEquals(setOf("1023", "1052", "1073"), reader.get(INTERFERENCE_GROUP_MEMBERS)) }, - { assertEquals(0.0, reader.get(INTERFERENCE_GROUP_TARGET)) }, - { assertEquals(0.7133055555552751, reader.get(INTERFERENCE_GROUP_SCORE)) }, + { assertEquals(setOf("1023", "1052", "1073"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) }, + { assertEquals(0.0, reader.getDouble(INTERFERENCE_GROUP_TARGET)) }, + { assertEquals(0.7133055555552751, reader.getDouble(INTERFERENCE_GROUP_SCORE)) }, { assertFalse(reader.nextRow()) } ) diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt index 40b604c3..4d0a9008 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt @@ -27,6 +27,7 @@ import org.opendc.trace.conv.* import java.io.BufferedReader import java.time.Duration import java.time.Instant +import java.util.* /** * A [TableReader] implementation for the SWF format. @@ -70,43 +71,92 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea return true } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> COL_JOB_ID + TASK_SUBMIT_TIME -> COL_SUBMIT_TIME + TASK_WAIT_TIME -> COL_WAIT_TIME + TASK_RUNTIME -> COL_RUN_TIME + TASK_ALLOC_NCPUS -> COL_ALLOC_NCPUS + TASK_REQ_NCPUS -> COL_REQ_NCPUS + TASK_STATUS -> COL_STATUS + TASK_USER_ID -> COL_USER_ID + TASK_GROUP_ID -> COL_GROUP_ID + TASK_PARENTS -> COL_PARENT_JOB + else -> -1 + } + } override fun isNull(index: Int): Boolean { - require(index in columns.values) { "Invalid column index" } + require(index in COL_JOB_ID..COL_PARENT_THINK_TIME) { "Invalid column index" } return false } - override fun get(index: Int): Any? { + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + return when (index) { + COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> fields[index].toInt(10) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun getString(index: Int): String { return when (index) { COL_JOB_ID -> fields[index] - COL_SUBMIT_TIME -> Instant.ofEpochSecond(fields[index].toLong(10)) - COL_WAIT_TIME, COL_RUN_TIME -> Duration.ofSeconds(fields[index].toLong(10)) - COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> getInt(index) - COL_PARENT_JOB -> { - val parent = fields[index].toLong(10) - if (parent < 0) emptySet() else setOf(parent) - } else -> throw IllegalArgumentException("Invalid column") } } - override fun getBoolean(index: Int): Boolean { + override fun getUUID(index: Int): UUID? { throw IllegalArgumentException("Invalid column") } - override fun getInt(index: Int): Int { + override fun getInstant(index: Int): Instant? { return when (index) { - COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> fields[index].toInt(10) + COL_SUBMIT_TIME -> Instant.ofEpochSecond(fields[index].toLong(10)) else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(index: Int): Long { + override fun getDuration(index: Int): Duration? { + return when (index) { + COL_WAIT_TIME, COL_RUN_TIME -> Duration.ofSeconds(fields[index].toLong(10)) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getList(index: Int, elementType: Class): List? { throw IllegalArgumentException("Invalid column") } - override fun getDouble(index: Int): Double { + override fun getSet(index: Int, elementType: Class): Set? { + @Suppress("UNCHECKED_CAST") + return when (index) { + COL_PARENT_JOB -> { + require(elementType.isAssignableFrom(String::class.java)) + val parent = fields[index].toLong(10) + if (parent < 0) emptySet() else setOf(parent) + } + else -> throw IllegalArgumentException("Invalid column") + } as Set? + } + + override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { throw IllegalArgumentException("Invalid column") } @@ -135,17 +185,4 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea private val COL_PART_NUM = 15 private val COL_PARENT_JOB = 16 private val COL_PARENT_THINK_TIME = 17 - - private val columns = mapOf( - TASK_ID to COL_JOB_ID, - TASK_SUBMIT_TIME to COL_SUBMIT_TIME, - TASK_WAIT_TIME to COL_WAIT_TIME, - TASK_RUNTIME to COL_RUN_TIME, - TASK_ALLOC_NCPUS to COL_ALLOC_NCPUS, - TASK_REQ_NCPUS to COL_REQ_NCPUS, - TASK_STATUS to COL_STATUS, - TASK_USER_ID to COL_USER_ID, - TASK_GROUP_ID to COL_GROUP_ID, - TASK_PARENTS to COL_PARENT_JOB - ) } diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt index 916a5eca..40f98a01 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt @@ -47,24 +47,23 @@ public class SwfTraceFormat : TraceFormat { return when (table) { TABLE_TASKS -> TableDetails( listOf( - TASK_ID, - TASK_SUBMIT_TIME, - TASK_WAIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_ALLOC_NCPUS, - TASK_PARENTS, - TASK_STATUS, - TASK_GROUP_ID, - TASK_USER_ID - ), - emptyList() + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), + TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_STATUS, TableColumnType.String), + TableColumn(TASK_GROUP_ID, TableColumnType.Int), + TableColumn(TASK_USER_ID, TableColumnType.Int) + ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List>?): TableReader { + override fun newReader(path: Path, table: String, projection: List?): TableReader { return when (table) { TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader()) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt index c3d644e8..afecdbb9 100644 --- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -62,10 +62,10 @@ internal class SwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1", reader.get(TASK_ID)) }, + { assertEquals("1", reader.getString(TASK_ID)) }, { assertEquals(306, reader.getInt(TASK_ALLOC_NCPUS)) }, { assertTrue(reader.nextRow()) }, - { assertEquals("2", reader.get(TASK_ID)) }, + { assertEquals("2", reader.getString(TASK_ID)) }, { assertEquals(17, reader.getInt(TASK_ALLOC_NCPUS)) }, ) diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt index 970de0f4..b6d661e0 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt @@ -224,9 +224,9 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b var stopTime = Long.MIN_VALUE do { - id = reader.get(idCol) as String + id = reader.getString(idCol)!! - val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli() + val timestamp = reader.getInstant(timestampCol)!!.toEpochMilli() startTime = min(startTime, timestamp) stopTime = max(stopTime, timestamp) @@ -238,7 +238,7 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b } hasNextRow = reader.nextRow() - } while (hasNextRow && id == reader.get(RESOURCE_ID)) + } while (hasNextRow && id == reader.getString(RESOURCE_ID)) // Sample only a fraction of the VMs if (random != null && random.nextDouble() > samplingFraction) { @@ -255,9 +255,9 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b } writer.startRow() - writer.set(RESOURCE_ID, id) - writer.set(RESOURCE_START_TIME, startInstant) - writer.set(RESOURCE_STOP_TIME, stopInstant) + writer.setString(RESOURCE_ID, id) + writer.setInstant(RESOURCE_START_TIME, startInstant) + writer.setInstant(RESOURCE_STOP_TIME, stopInstant) writer.setInt(RESOURCE_CPU_COUNT, cpuCount) writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity) writer.setDouble(RESOURCE_MEM_CAPACITY, max(memCapacity, memUsage)) @@ -280,7 +280,7 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b var count = 0 while (hasNextRow) { - val id = reader.get(idCol) as String + val id = reader.getString(idCol)!! val resource = selected[id] if (resource == null) { hasNextRow = reader.nextRow() @@ -290,13 +290,13 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b val cpuCount = reader.getInt(cpuCountCol) val cpuUsage = reader.getDouble(cpuUsageCol) - val startTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli() + val startTimestamp = reader.getInstant(timestampCol)!!.toEpochMilli() var timestamp: Long = startTimestamp var duration: Long = sampleInterval // Attempt to cascade further samples into one if they share the same CPU usage while (reader.nextRow().also { hasNextRow = it }) { - val shouldCascade = id == reader.get(idCol) && + val shouldCascade = id == reader.getString(idCol) && abs(cpuUsage - reader.getDouble(cpuUsageCol)) < SAMPLE_CASCADE_DIFF && cpuCount == reader.getInt(cpuCountCol) @@ -308,7 +308,7 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b break } - val nextTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli() + val nextTimestamp = reader.getInstant(timestampCol)!!.toEpochMilli() // Check whether the interval between both samples is not higher than `SAMPLE_INTERVAL` if ((nextTimestamp - timestamp) > sampleInterval) { @@ -320,9 +320,9 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b } writer.startRow() - writer.set(RESOURCE_ID, id) - writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(timestamp)) - writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) + writer.setString(RESOURCE_ID, id) + writer.setInstant(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(timestamp)) + writer.setDuration(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) writer.setInt(RESOURCE_CPU_COUNT, cpuCount) writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) writer.endRow() @@ -377,9 +377,9 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b continue } - val id = reader.get(idCol) as String - val startTime = (reader.get(startTimeCol) as Instant).toEpochMilli() - val stopTime = (reader.get(stopTimeCol) as Instant).toEpochMilli() + val id = reader.getString(idCol)!! + val startTime = reader.getInstant(startTimeCol)!!.toEpochMilli() + val stopTime = reader.getInstant(stopTimeCol)!!.toEpochMilli() val cpuCount = reader.getInt(cpuCountCol) val memCapacity = reader.getDouble(memCapacityCol) @@ -394,9 +394,9 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b } writer.startRow() - writer.set(RESOURCE_ID, id) - writer.set(RESOURCE_START_TIME, startInstant) - writer.set(RESOURCE_STOP_TIME, stopInstant) + writer.setString(RESOURCE_ID, id) + writer.setInstant(RESOURCE_START_TIME, startInstant) + writer.setInstant(RESOURCE_STOP_TIME, stopInstant) writer.setInt(RESOURCE_CPU_COUNT, cpuCount) writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity) writer.setDouble(RESOURCE_MEM_CAPACITY, memCapacity) @@ -418,12 +418,12 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b var count = 0 while (reader.nextRow()) { - val id = reader.get(idCol) as String + val id = reader.getString(idCol)!! val resource = selected[id] ?: continue val cpuUsage = reader.getDouble(cpuUsageCol) * resource.cpuCapacity // MHz val state = states.computeIfAbsent(id) { State(resource, cpuUsage, sampleInterval) } - val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli() + val timestamp = reader.getInstant(timestampCol)!!.toEpochMilli() val delta = (timestamp - state.time) // Check whether the next sample can be cascaded with the current sample: @@ -463,9 +463,9 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b lastWrite = time writer.startRow() - writer.set(RESOURCE_ID, resource.id) - writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(time)) - writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) + writer.setString(RESOURCE_ID, resource.id) + writer.setInstant(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(time)) + writer.setDuration(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) writer.setInt(RESOURCE_CPU_COUNT, resource.cpuCount) writer.endRow() diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt index d8eafa9c..0be9dec6 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt @@ -27,7 +27,10 @@ import com.fasterxml.jackson.core.JsonParser import com.fasterxml.jackson.core.JsonToken import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.util.convertTo import java.time.Duration +import java.time.Instant +import java.util.* import kotlin.math.roundToInt /** @@ -95,41 +98,82 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe return hasJob } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> COL_ID + TASK_WORKFLOW_ID -> COL_WORKFLOW_ID + TASK_RUNTIME -> COL_RUNTIME + TASK_REQ_NCPUS -> COL_NPROC + TASK_PARENTS -> COL_PARENTS + TASK_CHILDREN -> COL_CHILDREN + else -> -1 + } + } override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column value" } + check(index in 0..COL_CHILDREN) { "Invalid column value" } return false } - override fun get(index: Int): Any? { + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + return when (index) { + COL_NPROC -> cores + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun getString(index: Int): String? { return when (index) { COL_ID -> id COL_WORKFLOW_ID -> workflowId - COL_RUNTIME -> runtime - COL_PARENTS -> parents - COL_CHILDREN -> children - COL_NPROC -> getInt(index) else -> throw IllegalArgumentException("Invalid column") } } - override fun getBoolean(index: Int): Boolean { + override fun getUUID(index: Int): UUID? { throw IllegalArgumentException("Invalid column") } - override fun getInt(index: Int): Int { + override fun getInstant(index: Int): Instant? { + throw IllegalArgumentException("Invalid column") + } + + override fun getDuration(index: Int): Duration? { return when (index) { - COL_NPROC -> cores + COL_RUNTIME -> runtime else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(index: Int): Long { + override fun getList(index: Int, elementType: Class): List? { throw IllegalArgumentException("Invalid column") } - override fun getDouble(index: Int): Double { + override fun getSet(index: Int, elementType: Class): Set? { + return when (index) { + COL_PARENTS -> TYPE_PARENTS.convertTo(parents, elementType) + COL_CHILDREN -> TYPE_CHILDREN.convertTo(children, elementType) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { throw IllegalArgumentException("Invalid column") } @@ -232,12 +276,6 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe private val COL_PARENTS = 5 private val COL_CHILDREN = 6 - private val columns = mapOf( - TASK_ID to COL_ID, - TASK_WORKFLOW_ID to COL_WORKFLOW_ID, - TASK_RUNTIME to COL_RUNTIME, - TASK_REQ_NCPUS to COL_NPROC, - TASK_PARENTS to COL_PARENTS, - TASK_CHILDREN to COL_CHILDREN, - ) + private val TYPE_PARENTS = TableColumnType.Set(TableColumnType.String) + private val TYPE_CHILDREN = TableColumnType.Set(TableColumnType.String) } diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt index 8db4c169..154fa061 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt @@ -50,20 +50,19 @@ public class WfFormatTraceFormat : TraceFormat { return when (table) { TABLE_TASKS -> TableDetails( listOf( - TASK_ID, - TASK_WORKFLOW_ID, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_PARENTS, - TASK_CHILDREN - ), - emptyList() + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)) + ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List>?): TableReader { + override fun newReader(path: Path, table: String, projection: List?): TableReader { return when (table) { TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile())) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt index e27bc82c..9d9735b1 100644 --- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt @@ -210,7 +210,7 @@ internal class WfFormatTaskTableReaderTest { val reader = WfFormatTaskTableReader(parser) assertTrue(reader.nextRow()) - assertEquals("test", reader.get(TASK_ID)) + assertEquals("test", reader.getString(TASK_ID)) assertFalse(reader.nextRow()) reader.close() @@ -281,7 +281,7 @@ internal class WfFormatTaskTableReaderTest { val reader = WfFormatTaskTableReader(parser) assertTrue(reader.nextRow()) - assertEquals(setOf("1"), reader.get(TASK_PARENTS)) + assertEquals(setOf("1"), reader.getSet(TASK_PARENTS, String::class.java)) assertFalse(reader.nextRow()) reader.close() @@ -337,7 +337,7 @@ internal class WfFormatTaskTableReaderTest { assertTrue(reader.nextRow()) assertTrue(reader.nextRow()) - assertEquals("test2", reader.get(TASK_ID)) + assertEquals("test2", reader.getString(TASK_ID)) assertFalse(reader.nextRow()) reader.close() diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt index 4a8b2792..a460c5f6 100644 --- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt @@ -66,18 +66,18 @@ class WfFormatTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("makebwaindex_mammoth_mt_krause.fasta", reader.get(TASK_ID)) }, - { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals(172000, reader.get(TASK_RUNTIME).toMillis()) }, - { assertEquals(emptySet(), reader.get(TASK_PARENTS)) }, + { assertEquals("makebwaindex_mammoth_mt_krause.fasta", reader.getString(TASK_ID)) }, + { assertEquals("eager-nextflow-chameleon", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals(172000, reader.getDuration(TASK_RUNTIME)?.toMillis()) }, + { assertEquals(emptySet(), reader.getSet(TASK_PARENTS, String::class.java)) }, ) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("makeseqdict_mammoth_mt_krause.fasta", reader.get(TASK_ID)) }, - { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals(175000, reader.get(TASK_RUNTIME).toMillis()) }, - { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.get(TASK_PARENTS)) }, + { assertEquals("makeseqdict_mammoth_mt_krause.fasta", reader.getString(TASK_ID)) }, + { assertEquals("eager-nextflow-chameleon", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals(175000, reader.getDuration(TASK_RUNTIME)?.toMillis()) }, + { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.getSet(TASK_PARENTS, String::class.java)) }, ) reader.close() diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt index f0db78b7..bb5eb668 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt @@ -24,8 +24,12 @@ package org.opendc.trace.wtf import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.util.convertTo import org.opendc.trace.util.parquet.LocalParquetReader import org.opendc.trace.wtf.parquet.Task +import java.time.Duration +import java.time.Instant +import java.util.* /** * A [TableReader] implementation for the WTF format. @@ -48,26 +52,39 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader) } } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + private val COL_ID = 0 + private val COL_WORKFLOW_ID = 1 + private val COL_SUBMIT_TIME = 2 + private val COL_WAIT_TIME = 3 + private val COL_RUNTIME = 4 + private val COL_REQ_NCPUS = 5 + private val COL_PARENTS = 6 + private val COL_CHILDREN = 7 + private val COL_GROUP_ID = 8 + private val COL_USER_ID = 9 - override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column index" } - return get(index) == null + private val TYPE_PARENTS = TableColumnType.Set(TableColumnType.String) + private val TYPE_CHILDREN = TableColumnType.Set(TableColumnType.String) + + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> COL_ID + TASK_WORKFLOW_ID -> COL_WORKFLOW_ID + TASK_SUBMIT_TIME -> COL_SUBMIT_TIME + TASK_WAIT_TIME -> COL_WAIT_TIME + TASK_RUNTIME -> COL_RUNTIME + TASK_REQ_NCPUS -> COL_REQ_NCPUS + TASK_PARENTS -> COL_PARENTS + TASK_CHILDREN -> COL_CHILDREN + TASK_GROUP_ID -> COL_GROUP_ID + TASK_USER_ID -> COL_USER_ID + else -> -1 + } } - override fun get(index: Int): Any? { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - COL_ID -> record.id - COL_WORKFLOW_ID -> record.workflowId - COL_SUBMIT_TIME -> record.submitTime - COL_WAIT_TIME -> record.waitTime - COL_RUNTIME -> record.runtime - COL_REQ_NCPUS, COL_GROUP_ID, COL_USER_ID -> getInt(index) - COL_PARENTS -> record.parents - COL_CHILDREN -> record.children - else -> throw IllegalArgumentException("Invalid column") - } + override fun isNull(index: Int): Boolean { + check(index in COL_ID..COL_USER_ID) { "Invalid column index" } + return false } override fun getBoolean(index: Int): Boolean { @@ -89,35 +106,62 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader) throw IllegalArgumentException("Invalid column") } + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + override fun getDouble(index: Int): Double { throw IllegalArgumentException("Invalid column") } - override fun close() { - reader.close() + override fun getString(index: Int): String { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + COL_ID -> record.id + COL_WORKFLOW_ID -> record.workflowId + else -> throw IllegalArgumentException("Invalid column") + } } - private val COL_ID = 0 - private val COL_WORKFLOW_ID = 1 - private val COL_SUBMIT_TIME = 2 - private val COL_WAIT_TIME = 3 - private val COL_RUNTIME = 4 - private val COL_REQ_NCPUS = 5 - private val COL_PARENTS = 6 - private val COL_CHILDREN = 7 - private val COL_GROUP_ID = 8 - private val COL_USER_ID = 9 + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } - private val columns = mapOf( - TASK_ID to COL_ID, - TASK_WORKFLOW_ID to COL_WORKFLOW_ID, - TASK_SUBMIT_TIME to COL_SUBMIT_TIME, - TASK_WAIT_TIME to COL_WAIT_TIME, - TASK_RUNTIME to COL_RUNTIME, - TASK_REQ_NCPUS to COL_REQ_NCPUS, - TASK_PARENTS to COL_PARENTS, - TASK_CHILDREN to COL_CHILDREN, - TASK_GROUP_ID to COL_GROUP_ID, - TASK_USER_ID to COL_USER_ID, - ) + override fun getInstant(index: Int): Instant { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + COL_SUBMIT_TIME -> record.submitTime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + COL_WAIT_TIME -> record.waitTime + COL_RUNTIME -> record.runtime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getList(index: Int, elementType: Class): List? { + throw IllegalArgumentException("Invalid column") + } + + override fun getSet(index: Int, elementType: Class): Set? { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + COL_PARENTS -> TYPE_PARENTS.convertTo(record.parents, elementType) + COL_CHILDREN -> TYPE_CHILDREN.convertTo(record.children, elementType) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + reader.close() + } } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index e71253ac..c8408626 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -46,24 +46,23 @@ public class WtfTraceFormat : TraceFormat { return when (table) { TABLE_TASKS -> TableDetails( listOf( - TASK_ID, - TASK_WORKFLOW_ID, - TASK_SUBMIT_TIME, - TASK_WAIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_PARENTS, - TASK_CHILDREN, - TASK_GROUP_ID, - TASK_USER_ID - ), - listOf(TASK_SUBMIT_TIME) + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), + TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), + TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_GROUP_ID, TableColumnType.Int), + TableColumn(TASK_USER_ID, TableColumnType.Int) + ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List>?): TableReader { + override fun newReader(path: Path, table: String, projection: List?): TableReader { return when (table) { TABLE_TASKS -> { val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection)) diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt index 8e7325de..a6087d9f 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt @@ -27,7 +27,6 @@ import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.* -import org.opendc.trace.TableColumn import org.opendc.trace.conv.* /** @@ -35,11 +34,11 @@ import org.opendc.trace.conv.* * * @param projection The projection of the table to read. */ -internal class TaskReadSupport(private val projection: List>?) : ReadSupport() { +internal class TaskReadSupport(private val projection: List?) : ReadSupport() { /** * Mapping of table columns to their Parquet column names. */ - private val colMap = mapOf, String>( + private val colMap = mapOf( TASK_ID to "id", TASK_WORKFLOW_ID to "workflow_id", TASK_SUBMIT_TIME to "ts_submit", diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index c0eb3f08..2312035a 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -65,32 +65,28 @@ class WtfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("362334516345962206", reader.get(TASK_ID)) }, - { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals(Instant.ofEpochMilli(245604), reader.get(TASK_SUBMIT_TIME)) }, - { assertEquals(Duration.ofMillis(8163), reader.get(TASK_RUNTIME)) }, + { assertEquals("362334516345962206", reader.getString(TASK_ID)) }, + { assertEquals("1078341553348591493", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals(Instant.ofEpochMilli(245604), reader.getInstant(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofMillis(8163), reader.getDuration(TASK_RUNTIME)) }, { assertEquals( setOf("584055316413447529", "133113685133695608", "1008582348422865408"), - reader.get( - TASK_PARENTS - ) + reader.getSet(TASK_PARENTS, String::class.java) ) }, ) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("502010169100446658", reader.get(TASK_ID)) }, - { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals(Instant.ofEpochMilli(251325), reader.get(TASK_SUBMIT_TIME)) }, - { assertEquals(Duration.ofMillis(8216), reader.get(TASK_RUNTIME)) }, + { assertEquals("502010169100446658", reader.getString(TASK_ID)) }, + { assertEquals("1078341553348591493", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals(Instant.ofEpochMilli(251325), reader.getInstant(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofMillis(8216), reader.getDuration(TASK_RUNTIME)) }, { assertEquals( setOf("584055316413447529", "133113685133695608", "1008582348422865408"), - reader.get( - TASK_PARENTS - ) + reader.getSet(TASK_PARENTS, String::class.java) ) }, ) -- cgit v1.2.3