diff options
69 files changed, 2842 insertions, 866 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 720c7e58..12c2325a 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -29,8 +29,6 @@ import org.opendc.trace.* import org.opendc.trace.conv.* import java.io.File import java.lang.ref.SoftReference -import java.time.Duration -import java.time.Instant import java.util.* import java.util.concurrent.ConcurrentHashMap import kotlin.math.max @@ -68,9 +66,9 @@ public class ComputeWorkloadLoader(private val baseDir: File) { return try { while (reader.nextRow()) { - val id = reader.get(idCol) as String - val time = reader.get(timestampCol) as Instant - val duration = reader.get(durationCol) as Duration + val id = reader.getString(idCol)!! + val time = reader.getInstant(timestampCol)!! + val duration = reader.getDuration(durationCol)!! val cores = reader.getInt(coresCol) val cpuUsage = reader.getDouble(usageCol) @@ -105,13 +103,13 @@ public class ComputeWorkloadLoader(private val baseDir: File) { return try { while (reader.nextRow()) { - val id = reader.get(idCol) as String + val id = reader.getString(idCol)!! if (!fragments.containsKey(id)) { continue } - val submissionTime = reader.get(startTimeCol) as Instant - val endTime = reader.get(stopTimeCol) as Instant + val submissionTime = reader.getInstant(startTimeCol)!! + val endTime = reader.getInstant(stopTimeCol)!! val cpuCount = reader.getInt(cpuCountCol) val cpuCapacity = reader.getDouble(cpuCapacityCol) val memCapacity = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB @@ -162,7 +160,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { while (reader.nextRow()) { @Suppress("UNCHECKED_CAST") - val members = reader.get(membersCol) as Set<String> + val members = reader.getSet(membersCol, String::class.java)!! val target = reader.getDouble(targetCol) val score = reader.getDouble(scoreCol) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index 05d0234a..e6e97706 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -32,21 +32,16 @@ public interface Table { public val name: String /** - * The list of columns supported in this table. + * The columns in this table. */ - public val columns: List<TableColumn<*>> - - /** - * The columns by which the table is partitioned. - */ - public val partitionKeys: List<TableColumn<*>> + public val columns: List<TableColumn> /** * Open a [TableReader] for a projection of this table. * - * @param projection The list of columns to fetch from the table or `null` if no projection is performed. + * @param projection The names of the columns to fetch from the table or `null` if no projection is performed. */ - public fun newReader(projection: List<TableColumn<*>>? = null): TableReader + public fun newReader(projection: List<String>? = null): TableReader /** * Open a [TableWriter] for this table. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt index b77a2982..0f75d890 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt @@ -22,47 +22,15 @@ package org.opendc.trace -import java.util.* - /** - * A column in a trace table. + * A column in a [Table]. * - * @param name The universal name of this column. + * @property name The universal name of this column. + * @property type The type of the column. + * @property isNullable A flag to indicate that the column is nullable. */ -public class TableColumn<out T>(public val name: String, type: Class<T>) { - /** - * The type of the column. - */ - public val type: Class<*> = type - - /** - * Determine whether the type of the column is a subtype of [column]. - */ - public fun isAssignableTo(column: TableColumn<*>): Boolean { - return name == column.name && type.isAssignableFrom(column.type) - } - - /** - * Compute a hash code for this column. - */ - public override fun hashCode(): Int = Objects.hash(name, type) - - /** - * Determine whether this column is equal to [other]. - */ - public override fun equals(other: Any?): Boolean { - // Fast-path: reference equality - if (this === other) { - return true - } else if (other == null || other !is TableColumn<*>) { - return false - } - - return name == other.name && type == other.type - } - - /** - * Return a string representation of this column. - */ - public override fun toString(): String = "TableColumn[$name,$type]" -} +public data class TableColumn( + public val name: String, + public val type: TableColumnType, + public val isNullable: Boolean = false +) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumnType.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumnType.kt new file mode 100644 index 00000000..6cae47f9 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumnType.kt @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace + +/** + * The type of a [TableColumn]. + */ +public sealed class TableColumnType { + /** + * A column of booleans. + */ + public object Boolean : TableColumnType() + + /** + * A column of 32-bit integers. + */ + public object Int : TableColumnType() + + /** + * A column of 64-bit integers. + */ + public object Long : TableColumnType() + + /** + * A column of 32-bit floats. + */ + public object Float : TableColumnType() + + /** + * A column of 64-bit floats. + */ + public object Double : TableColumnType() + + /** + * A column of UUIDs. + */ + public object UUID : TableColumnType() + + /** + * A column of variable-length strings. + */ + public object String : TableColumnType() + + /** + * A column of timestamps, mapping to [java.time.Instant]. + */ + public object Instant : TableColumnType() + + /** + * A column of durations, mapping to [java.time.Duration] + */ + public object Duration : TableColumnType() + + /** + * A column containing embedded lists. + * + * @property elementType The type of the elements in the list. + */ + public data class List(public val elementType: TableColumnType) : TableColumnType() + + /** + * A column containing embedded sets. + * + * @property elementType The type of the elements in the sets. + */ + public data class Set(public val elementType: TableColumnType) : TableColumnType() + + /** + * A column containing embedded maps. + * + * @property keyType The type of the key. + * @property valueType The type of the value. + */ + public data class Map(public val keyType: TableColumnType, public val valueType: TableColumnType) : TableColumnType() +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt index 8a796e6c..42b1c690 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt @@ -22,8 +22,12 @@ package org.opendc.trace +import java.time.Duration +import java.time.Instant +import java.util.UUID + /** - * Base class for reading entities from a workload trace table in streaming fashion. + * Base class for reading entities from a [Table] in streaming fashion. */ public interface TableReader : AutoCloseable { /** @@ -34,20 +38,15 @@ public interface TableReader : AutoCloseable { public fun nextRow(): Boolean /** - * Resolve the index of the specified [column] for this reader. + * Resolve the index of the column by its [name]. * - * @param column The column to lookup. + * @param name The name of the column to lookup. * @return The zero-based index of the column or a negative value if the column is not present in this table. */ - public fun resolve(column: TableColumn<*>): Int - - /** - * Determine whether the [TableReader] supports the specified [column]. - */ - public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0 + public fun resolve(name: String): Int /** - * Determine whether the specified [column] has a `null` value for the current row. + * Determine whether the column with the specified [index] has a `null` value for the current row. * * @param index The zero-based index of the column to check for a null value. * @throws IllegalArgumentException if the column index is not valid for this reader. @@ -56,15 +55,6 @@ public interface TableReader : AutoCloseable { public fun isNull(index: Int): Boolean /** - * Obtain the object value of the column with the specified [index]. - * - * @param index The zero-based index of the column to obtain the value for. - * @throws IllegalArgumentException if the column index is not valid for this reader. - * @return The object value of the column. - */ - public fun get(index: Int): Any? - - /** * Obtain the boolean value of the column with the specified [index]. * * @param index The zero-based index of the column to obtain the value for. @@ -74,7 +64,7 @@ public interface TableReader : AutoCloseable { public fun getBoolean(index: Int): Boolean /** - * Obtain the integer value of the column with the specified [index]. + * Obtain the boolean value of the column with the specified [index]. * * @param index The zero-based index of the column to obtain the value for. * @throws IllegalArgumentException if the column index is not valid for this reader. @@ -92,6 +82,15 @@ public interface TableReader : AutoCloseable { public fun getLong(index: Int): Long /** + * Obtain the float value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The float value of the column or [Float.NaN] if the column is `null`. + */ + public fun getFloat(index: Int): Float + + /** * Obtain the double value of the column with the specified [index]. * * @param index The zero-based index of the column to obtain the value for. @@ -101,62 +100,193 @@ public interface TableReader : AutoCloseable { public fun getDouble(index: Int): Double /** - * Determine whether the specified [column] has a `null` value for the current row. + * Obtain the [String] value of the column with the specified [index]. * - * @param column The column to lookup. - * @throws IllegalArgumentException if the column is not valid for this table. - * @return `true` if the column value for the current value has a `null` value, `false` otherwise. + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The [String] value of the column or `null` if the column is `null`. */ - public fun isNull(column: TableColumn<*>): Boolean = isNull(resolve(column)) + public fun getString(index: Int): String? /** - * Obtain the value of the current column with type [T]. + * Obtain the [UUID] value of the column with the specified [index]. * - * @param column The column to obtain the value for. - * @throws IllegalArgumentException if the column is not valid for this reader. - * @return The object value of the column. + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The [UUID] value of the column or `null` if the column is `null`. + */ + public fun getUUID(index: Int): UUID? + + /** + * Obtain the [Instant] value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The [Instant] value of the column or `null` if the column is `null`. + */ + public fun getInstant(index: Int): Instant? + + /** + * Obtain the [Duration] value of the column with the specified [index]. + * + * @param index The zero-based index of the column to obtain the value for. + * @throws IllegalArgumentException if the column index is not valid for this reader. + * @return The [Duration] value of the column or `null` if the column is `null`. + */ + public fun getDuration(index: Int): Duration? + + /** + * Obtain the value of the column with the specified [index] as [List]. + * + * @param index The zero-based index of the column to obtain the value for. + * @param elementType Class representing the Java data type to convert elements of the designated column to. + * @throws IllegalArgumentException if the column index is not valid for this reader or this type. + * @return The value of the column as `List` or `null` if the column is null. + */ + public fun <T> getList(index: Int, elementType: Class<T>): List<T>? + + /** + * Obtain the value of the column with the specified [index] as [Set]. + * + * @param index The zero-based index of the column to obtain the value for. + * @param elementType Class representing the Java data type to convert elements of the designated column to. + * @throws IllegalArgumentException if the column index is not valid for this reader or this type. + * @return The value of the column as `Set` or `null` if the column is null. + */ + public fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? + + /** + * Obtain the value of the column with the specified [index] as [Set]. + * + * @param index The zero-based index of the column to obtain the value for. + * @param keyType Class representing the Java data type to convert the keys of the designated column to. + * @param valueType Class representing the Java data type to convert the values of the designated column to. + * @throws IllegalArgumentException if the column index is not valid for this reader or this type. + * @return The value of the column as `Map` or `null` if the column is null. + */ + public fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? + + /** + * Determine whether a column named [name] has a `null` value for the current row. + * + * @param name The name of the column to lookup. + * @throws IllegalArgumentException if the column is not valid for this table. + * @return `true` if the column value for the current value has a `null` value, `false` otherwise. */ - public fun <T> get(column: TableColumn<T>): T { - // This cast should always succeed since the resolve the index of the typed column - @Suppress("UNCHECKED_CAST") - return get(resolve(column)) as T - } + public fun isNull(name: String): Boolean = isNull(resolve(name)) /** - * Read the specified [column] as boolean. + * Read the column named [name] as boolean. * - * @param column The column to obtain the value for. + * @param name The name of the column to get the value for. * @throws IllegalArgumentException if the column is not valid for this reader. * @return The boolean value of the column or `false` if the column is `null`. */ - public fun getBoolean(column: TableColumn<Boolean>): Boolean = getBoolean(resolve(column)) + public fun getBoolean(name: String): Boolean = getBoolean(resolve(name)) /** - * Read the specified [column] as integer. + * Read the column named [name] as integer. * - * @param column The column to obtain the value for. + * @param name The name of the column to get the value for. * @throws IllegalArgumentException if the column is not valid for this reader. * @return The integer value of the column or `0` if the column is `null`. */ - public fun getInt(column: TableColumn<Int>): Int = getInt(resolve(column)) + public fun getInt(name: String): Int = getInt(resolve(name)) /** - * Read the specified [column] as long. + * Read the column named [name] as long. * - * @param column The column to obtain the value for. + * @param name The name of the column to get the value for * @throws IllegalArgumentException if the column is not valid for this reader. * @return The long value of the column or `0` if the column is `null`. */ - public fun getLong(column: TableColumn<Long>): Long = getLong(resolve(column)) + public fun getLong(name: String): Long = getLong(resolve(name)) + + /** + * Read the column named [name] as float. + * + * @param name The name of the column to get the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The float value of the column or [Float.NaN] if the column is `null`. + */ + public fun getFloat(name: String): Float = getFloat(resolve(name)) /** - * Read the specified [column] as double. + * Read the column named [name] as double. * - * @param column The column to obtain the value for. + * @param name The name of the column to get the value for. * @throws IllegalArgumentException if the column is not valid for this reader. * @return The double value of the column or [Double.NaN] if the column is `null`. */ - public fun getDouble(column: TableColumn<Double>): Double = getDouble(resolve(column)) + public fun getDouble(name: String): Double = getDouble(resolve(name)) + + /** + * Read the column named [name] as [String]. + * + * @param name The name of the column to get the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The [String] value of the column or `null` if the column is `null`. + */ + public fun getString(name: String): String? = getString(resolve(name)) + + /** + * Read the column named [name] as [UUID]. + * + * @param name The name of the column to get the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The [UUID] value of the column or `null` if the column is `null`. + */ + public fun getUUID(name: String): UUID? = getUUID(resolve(name)) + + /** + * Read the column named [name] as [Instant]. + * + * @param name The name of the column to get the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The [Instant] value of the column or `null` if the column is `null`. + */ + public fun getInstant(name: String): Instant? = getInstant(resolve(name)) + + /** + * Read the column named [name] as [Duration]. + * + * @param name The name of the column to get the value for. + * @throws IllegalArgumentException if the column is not valid for this reader. + * @return The [Duration] value of the column or `null` if the column is `null`. + */ + public fun getDuration(name: String): Duration? = getDuration(resolve(name)) + + /** + * Obtain the value of the column named [name] as [List]. + * + * @param name The name of the column to get the value for. + * @param elementType Class representing the Java data type to convert elements of the designated column to. + * @throws IllegalArgumentException if the column index is not valid for this reader or this type. + * @return The value of the column as `List` or `null` if the column is null. + */ + public fun <T> getList(name: String, elementType: Class<T>): List<T>? = getList(resolve(name), elementType) + + /** + * Obtain the value of the column named [name] as [Set]. + * + * @param name The name of the column to get the value for. + * @param elementType Class representing the Java data type to convert elements of the designated column to. + * @throws IllegalArgumentException if the column index is not valid for this reader or this type. + * @return The value of the column as `Set` or `null` if the column is null. + */ + public fun <T> getSet(name: String, elementType: Class<T>): Set<T>? = getSet(resolve(name), elementType) + + /** + * Obtain the value of the column named [name] as [Set]. + * + * @param name The name of the column to get the value for. + * @param keyType Class representing the Java data type to convert the keys of the designated column to. + * @param valueType Class representing the Java data type to convert the values of the designated column to. + * @throws IllegalArgumentException if the column index is not valid for this reader or this type. + * @return The value of the column as `Map` or `null` if the column is null. + */ + public fun <K, V> getMap(name: String, keyType: Class<K>, valueType: Class<V>): Map<K, V>? = + getMap(resolve(name), keyType, valueType) /** * Closes the reader so that no further iteration or data access can be made. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt index 423ce86a..a3122ec9 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt @@ -22,6 +22,10 @@ package org.opendc.trace +import java.time.Duration +import java.time.Instant +import java.util.* + /** * Base class for writing workload traces. */ @@ -37,107 +41,228 @@ public interface TableWriter : AutoCloseable { public fun endRow() /** - * Resolve the index of the specified [column] for this writer. + * Resolve the index of the column named [name] for this writer. * - * @param column The column to lookup. + * @param name The name of the column to lookup. * @return The zero-based index of the column or a negative value if the column is not present in this table. */ - public fun resolve(column: TableColumn<*>): Int + public fun resolve(name: String): Int /** - * Determine whether the [TableReader] supports the specified [column]. + * Set the column with index [index] to boolean [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The boolean value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0 + public fun setBoolean(index: Int, value: Boolean) /** - * Set [column] to [value]. + * Set the column with index [index] to integer [value]. * * @param index The zero-based index of the column to set the value for. - * @param value The value to set the column to. + * @param value The integer value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun set(index: Int, value: Any) + public fun setInt(index: Int, value: Int) /** - * Set [column] to boolean [value]. + * Set the column with index [index] to long [value]. * * @param index The zero-based index of the column to set the value for. - * @param value The boolean value to set the column to. + * @param value The long value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setBoolean(index: Int, value: Boolean) + public fun setLong(index: Int, value: Long) /** - * Set [column] to integer [value]. + * Set the column with index [index] to float [value]. * * @param index The zero-based index of the column to set the value for. - * @param value The integer value to set the column to. + * @param value The float value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setInt(index: Int, value: Int) + public fun setFloat(index: Int, value: Float) /** - * Set [column] to long [value]. + * Set the column with index [index] to double [value]. * * @param index The zero-based index of the column to set the value for. - * @param value The long value to set the column to. + * @param value The double value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setLong(index: Int, value: Long) + public fun setDouble(index: Int, value: Double) /** - * Set [column] to double [value]. + * Set the column with index [index] to [String] [value]. * * @param index The zero-based index of the column to set the value for. - * @param value The double value to set the column to. + * @param value The [String] value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setDouble(index: Int, value: Double) + public fun setString(index: Int, value: String) + + /** + * Set the column with index [index] to [UUID] [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The [UUID] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setUUID(index: Int, value: UUID) + + /** + * Set the column with index [index] to [Instant] [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The [Instant] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setInstant(index: Int, value: Instant) /** - * Set [column] to [value]. + * Set the column with index [index] to [Duration] [value]. * - * @param column The column to set the value for. - * @param value The value to set the column to. + * @param index The zero-based index of the column to set the value for. + * @param value The [Duration] value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun <T : Any> set(column: TableColumn<T>, value: T): Unit = set(resolve(column), value) + public fun setDuration(index: Int, value: Duration) /** - * Set [column] to boolean [value]. + * Set the column with index [index] to [List] [value]. * - * @param column The column to set the value for. + * @param index The zero-based index of the column to set the value for. + * @param value The [Map] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun <T> setList(index: Int, value: List<T>) + + /** + * Set the column with index [index] to [Set] [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The [Set] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun <T> setSet(index: Int, value: Set<T>) + + /** + * Set the column with index [index] to [Map] [value]. + * + * @param index The zero-based index of the column to set the value for. + * @param value The [Map] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun <K, V> setMap(index: Int, value: Map<K, V>) + + /** + * Set the column named [name] to boolean [value]. + * + * @param name The name of the column to set the value for. * @param value The boolean value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setBoolean(column: TableColumn<Boolean>, value: Boolean): Unit = setBoolean(resolve(column), value) + public fun setBoolean(name: String, value: Boolean): Unit = setBoolean(resolve(name), value) /** - * Set [column] to integer [value]. + * Set the column named [name] to integer [value]. * - * @param column The column to set the value for. + * @param name The name of the column to set the value for. * @param value The integer value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setInt(column: TableColumn<Int>, value: Int): Unit = setInt(resolve(column), value) + public fun setInt(name: String, value: Int): Unit = setInt(resolve(name), value) /** - * Set [column] to long [value]. + * Set the column named [name] to long [value]. * - * @param column The column to set the value for. + * @param name The name of the column to set the value for. * @param value The long value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setLong(column: TableColumn<Long>, value: Long): Unit = setLong(resolve(column), value) + public fun setLong(name: String, value: Long): Unit = setLong(resolve(name), value) + + /** + * Set the column named [name] to float [value]. + * + * @param name The name of the column to set the value for. + * @param value The float value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setFloat(name: String, value: Float): Unit = setFloat(resolve(name), value) /** - * Set [column] to double [value]. + * Set the column named [name] to double [value]. * - * @param column The column to set the value for. + * @param name The name of the column to set the value for. * @param value The double value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setDouble(column: TableColumn<Double>, value: Double): Unit = setDouble(resolve(column), value) + public fun setDouble(name: String, value: Double): Unit = setDouble(resolve(name), value) + + /** + * Set the column named [name] to [String] [value]. + * + * @param name The name of the column to set the value for. + * @param value The [String] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setString(name: String, value: String): Unit = setString(resolve(name), value) + + /** + * Set the column named [name] to [UUID] [value]. + * + * @param name The name of the column to set the value for. + * @param value The [UUID] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setUUID(name: String, value: UUID): Unit = setUUID(resolve(name), value) + + /** + * Set the column named [name] to [Instant] [value]. + * + * @param name The name of the column to set the value for. + * @param value The [Instant] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setInstant(name: String, value: Instant): Unit = setInstant(resolve(name), value) + + /** + * Set the column named [name] to [Duration] [value]. + * + * @param name The name of the column to set the value for. + * @param value The [Duration] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun setDuration(name: String, value: Duration): Unit = setDuration(resolve(name), value) + + /** + * Set the column named [name] to [List] [value]. + * + * @param name The name of the column to set the value for. + * @param value The [List] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun <T> setList(name: String, value: List<T>): Unit = setList(resolve(name), value) + + /** + * Set the column named [name] to [Set] [value]. + * + * @param name The name of the column to set the value for. + * @param value The [Set] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun <T> setSet(name: String, value: Set<T>): Unit = setSet(resolve(name), value) + + /** + * Set the column named [name] to [Map] [value]. + * + * @param name The name of the column to set the value for. + * @param value The [Map] value to set the column to. + * @throws IllegalArgumentException if the column is not valid for this method. + */ + public fun <K, V> setMap(name: String, value: Map<K, V>): Unit = setMap(resolve(name), value) /** * Flush any buffered content to the underlying target. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt index 5e8859e4..2a80687c 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt @@ -23,22 +23,17 @@ @file:JvmName("InterferenceGroupColumns") package org.opendc.trace.conv -import org.opendc.trace.TableColumn - /** * Members of the interference group. */ -@JvmField -public val INTERFERENCE_GROUP_MEMBERS: TableColumn<Set<String>> = column("members") +public const val INTERFERENCE_GROUP_MEMBERS: String = "members" /** * Target load after which the interference occurs. */ -@JvmField -public val INTERFERENCE_GROUP_TARGET: TableColumn<Double> = column("target") +public const val INTERFERENCE_GROUP_TARGET: String = "target" /** * Performance score when the interference occurs. */ -@JvmField -public val INTERFERENCE_GROUP_SCORE: TableColumn<Double> = column("score") +public const val INTERFERENCE_GROUP_SCORE: String = "score" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt index e602e534..9b7b8195 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt @@ -23,47 +23,44 @@ @file:JvmName("ResourceColumns") package org.opendc.trace.conv -import org.opendc.trace.TableColumn -import java.time.Instant - /** * Identifier of the resource. */ @JvmField -public val RESOURCE_ID: TableColumn<String> = column("id") +public val RESOURCE_ID: String = "id" /** * The cluster to which the resource belongs. */ @JvmField -public val RESOURCE_CLUSTER_ID: TableColumn<String> = column("cluster_id") +public val RESOURCE_CLUSTER_ID: String = "cluster_id" /** * Start time for the resource. */ @JvmField -public val RESOURCE_START_TIME: TableColumn<Instant> = column("start_time") +public val RESOURCE_START_TIME: String = "start_time" /** * End time for the resource. */ @JvmField -public val RESOURCE_STOP_TIME: TableColumn<Instant> = column("stop_time") +public val RESOURCE_STOP_TIME: String = "stop_time" /** * Number of CPUs for the resource. */ @JvmField -public val RESOURCE_CPU_COUNT: TableColumn<Int> = column("cpu_count") +public val RESOURCE_CPU_COUNT: String = "cpu_count" /** * Total CPU capacity of the resource in MHz. */ @JvmField -public val RESOURCE_CPU_CAPACITY: TableColumn<Double> = column("cpu_capacity") +public val RESOURCE_CPU_CAPACITY: String = "cpu_capacity" /** * Memory capacity for the resource in KB. */ @JvmField -public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = column("mem_capacity") +public val RESOURCE_MEM_CAPACITY: String = "mem_capacity" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt index 3a44f817..e9dd7673 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt @@ -23,78 +23,74 @@ @file:JvmName("ResourceStateColumns") package org.opendc.trace.conv -import org.opendc.trace.TableColumn -import java.time.Duration -import java.time.Instant - /** * The timestamp at which the state was recorded. */ @JvmField -public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = column("timestamp") +public val RESOURCE_STATE_TIMESTAMP: String = "timestamp" /** * Duration for the state. */ @JvmField -public val RESOURCE_STATE_DURATION: TableColumn<Duration> = column("duration") +public val RESOURCE_STATE_DURATION: String = "duration" /** * A flag to indicate that the resource is powered on. */ @JvmField -public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = column("powered_on") +public val RESOURCE_STATE_POWERED_ON: String = "powered_on" /** * Total CPU usage of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = column("cpu_usage") +public val RESOURCE_STATE_CPU_USAGE: String = "cpu_usage" /** * Total CPU usage of the resource in percentage. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = column("cpu_usage_pct") +public val RESOURCE_STATE_CPU_USAGE_PCT: String = "cpu_usage_pct" /** * Total CPU demand of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = column("cpu_demand") +public val RESOURCE_STATE_CPU_DEMAND: String = "cpu_demand" /** * CPU ready percentage. */ @JvmField -public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = column("cpu_ready_pct") +public val RESOURCE_STATE_CPU_READY_PCT: String = "cpu_ready_pct" /** * Memory usage of the resource in KB. */ @JvmField -public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = column("mem_usage") +public val RESOURCE_STATE_MEM_USAGE: String = "mem_usage" /** * Disk read throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = column("disk_read") +public val RESOURCE_STATE_DISK_READ: String = "disk_read" /** * Disk write throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = column("disk_write") +public val RESOURCE_STATE_DISK_WRITE: String = "disk_write" /** * Network receive throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_RX: TableColumn<Double> = column("net_rx") +public val RESOURCE_STATE_NET_RX: String = "net_rx" /** * Network transmit throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_TX: TableColumn<Double> = column("net_tx") +public val RESOURCE_STATE_NET_TX: String = "net_tx" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt index e6daafb7..da5c343f 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt @@ -23,78 +23,62 @@ @file:JvmName("TaskColumns") package org.opendc.trace.conv -import org.opendc.trace.TableColumn -import java.time.Duration -import java.time.Instant - /** * A column containing the task identifier. */ -@JvmField -public val TASK_ID: TableColumn<String> = column("id") +public const val TASK_ID: String = "id" /** * A column containing the identifier of the workflow. */ -@JvmField -public val TASK_WORKFLOW_ID: TableColumn<String> = column("workflow_id") +public const val TASK_WORKFLOW_ID: String = "workflow_id" /** * A column containing the submission time of the task. */ -@JvmField -public val TASK_SUBMIT_TIME: TableColumn<Instant> = column("submit_time") +public const val TASK_SUBMIT_TIME: String = "submit_time" /** * A column containing the wait time of the task. */ -@JvmField -public val TASK_WAIT_TIME: TableColumn<Instant> = column("wait_time") +public const val TASK_WAIT_TIME: String = "wait_time" /** * A column containing the runtime time of the task. */ -@JvmField -public val TASK_RUNTIME: TableColumn<Duration> = column("runtime") +public const val TASK_RUNTIME: String = "runtime" /** * A column containing the parents of a task. */ -@JvmField -public val TASK_PARENTS: TableColumn<Set<String>> = column("parents") +public const val TASK_PARENTS: String = "parents" /** * A column containing the children of a task. */ -@JvmField -public val TASK_CHILDREN: TableColumn<Set<String>> = column("children") +public const val TASK_CHILDREN: String = "children" /** * A column containing the requested CPUs of a task. */ -@JvmField -public val TASK_REQ_NCPUS: TableColumn<Int> = column("req_ncpus") +public const val TASK_REQ_NCPUS: String = "req_ncpus" /** * A column containing the allocated CPUs of a task. */ -@JvmField -public val TASK_ALLOC_NCPUS: TableColumn<Int> = column("alloc_ncpus") +public const val TASK_ALLOC_NCPUS: String = "alloc_ncpus" /** * A column containing the status of a task. */ -@JvmField -public val TASK_STATUS: TableColumn<Int> = column("status") +public const val TASK_STATUS: String = "status" /** * A column containing the group id of a task. */ -@JvmField -public val TASK_GROUP_ID: TableColumn<Int> = column("group_id") +public const val TASK_GROUP_ID: String = "group_id" /** * A column containing the user id of a task. */ -@JvmField -public val TASK_USER_ID: TableColumn<Int> = column("user_id") +public const val TASK_USER_ID: String = "user_id" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt index b848e19a..1e1bf676 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt @@ -37,13 +37,10 @@ internal class TableImpl(val trace: TraceImpl, override val name: String) : Tabl */ private val details = trace.format.getDetails(trace.path, name) - override val columns: List<TableColumn<*>> + override val columns: List<TableColumn> get() = details.columns - override val partitionKeys: List<TableColumn<*>> - get() = details.partitionKeys - - override fun newReader(projection: List<TableColumn<*>>?): TableReader { + override fun newReader(projection: List<String>?): TableReader { return trace.format.newReader(trace.path, name, projection) } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt index 1a9b9ee1..5206d02b 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt @@ -29,9 +29,5 @@ import org.opendc.trace.TableColumn * A class used by the [TraceFormat] interface for describing the metadata of a [Table]. * * @param columns The available columns in the table. - * @param partitionKeys The table columns that act as partition keys for the table. */ -public data class TableDetails( - val columns: List<TableColumn<*>>, - val partitionKeys: List<TableColumn<*>> = emptyList() -) +public data class TableDetails(val columns: List<TableColumn>) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt index 47761e0f..eff6fa83 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -22,7 +22,6 @@ package org.opendc.trace.spi -import org.opendc.trace.TableColumn import org.opendc.trace.TableReader import org.opendc.trace.TableWriter import java.nio.file.Path @@ -69,11 +68,11 @@ public interface TraceFormat { * * @param path The path to the trace to open. * @param table The name of the table to open a [TableReader] for. - * @param projection The list of [TableColumn]s to project or `null` if no projection is performed. + * @param projection The name of the columns to project or `null` if no projection is performed. * @throws IllegalArgumentException If [table] does not exist. * @return A [TableReader] instance for the table. */ - public fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader + public fun newReader(path: Path, table: String, projection: List<String>?): TableReader /** * Open a [TableWriter] for the specified [table]. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt index 11e032c7..c4854265 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt @@ -22,8 +22,10 @@ package org.opendc.trace.util -import org.opendc.trace.TableColumn import org.opendc.trace.TableReader +import java.time.Duration +import java.time.Instant +import java.util.* /** * A helper class to chain multiple [TableReader]s. @@ -63,11 +65,11 @@ public abstract class CompositeTableReader : TableReader { return delegate != null } - override fun resolve(column: TableColumn<*>): Int { + override fun resolve(name: String): Int { tryStart() val delegate = delegate - return delegate?.resolve(column) ?: -1 + return delegate?.resolve(name) ?: -1 } override fun isNull(index: Int): Boolean { @@ -75,11 +77,6 @@ public abstract class CompositeTableReader : TableReader { return delegate.isNull(index) } - override fun get(index: Int): Any? { - val delegate = checkNotNull(delegate) { "Invalid reader state" } - return delegate.get(index) - } - override fun getBoolean(index: Int): Boolean { val delegate = checkNotNull(delegate) { "Invalid reader state" } return delegate.getBoolean(index) @@ -95,11 +92,51 @@ public abstract class CompositeTableReader : TableReader { return delegate.getLong(index) } + override fun getFloat(index: Int): Float { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getFloat(index) + } + override fun getDouble(index: Int): Double { val delegate = checkNotNull(delegate) { "Invalid reader state" } return delegate.getDouble(index) } + override fun getString(index: Int): String? { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getString(index) + } + + override fun getUUID(index: Int): UUID? { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getUUID(index) + } + + override fun getInstant(index: Int): Instant? { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getInstant(index) + } + + override fun getDuration(index: Int): Duration? { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getDuration(index) + } + + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getList(index, elementType) + } + + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getSet(index, elementType) + } + + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getMap(index, keyType, valueType) + } + override fun close() { delegate?.close() } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt new file mode 100644 index 00000000..8ef080b5 --- /dev/null +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("TableColumnConversions") +package org.opendc.trace.util + +import org.opendc.trace.TableColumnType +import java.time.Duration +import java.time.Instant +import java.util.UUID + +/** + * Helper method to convert a [List] into a [List] with elements of [targetElementType]. + */ +public fun <T> TableColumnType.List.convertTo(value: List<*>?, targetElementType: Class<T>): List<T>? { + require(elementType.isCompatible(targetElementType)) { "Target element type is not compatible with $elementType" } + @Suppress("UNCHECKED_CAST") + return value as List<T>? +} + +/** + * Helper method to convert a [Set] into a [Set] with elements of [targetElementType]. + */ +public fun <T> TableColumnType.Set.convertTo(value: Set<*>?, targetElementType: Class<T>): Set<T>? { + require(elementType.isCompatible(targetElementType)) { "Target element type is not compatible with $elementType" } + @Suppress("UNCHECKED_CAST") + return value as Set<T>? +} + +/** + * Helper method to convert a [Map] into a [Map] with [targetKeyType] keys and [targetValueType] values. + */ +public fun <K, V> TableColumnType.Map.convertTo(value: Map<*, *>?, targetKeyType: Class<K>, targetValueType: Class<V>): Map<K, V>? { + require(keyType.isCompatible(targetKeyType)) { "Target key type $targetKeyType is not compatible with $keyType" } + require(valueType.isCompatible(targetValueType)) { "Target value type $targetValueType is not compatible with $valueType" } + @Suppress("UNCHECKED_CAST") + return value as Map<K, V>? +} + +/** + * Helper method to determine [javaType] is compatible with this [TableColumnType]. + */ +private fun TableColumnType.isCompatible(javaType: Class<*>): Boolean { + return when (this) { + is TableColumnType.Boolean -> javaType.isAssignableFrom(Boolean::class.java) + is TableColumnType.Int -> javaType.isAssignableFrom(Int::class.java) + is TableColumnType.Long -> javaType.isAssignableFrom(Long::class.java) + is TableColumnType.Float -> javaType.isAssignableFrom(Float::class.java) + is TableColumnType.Double -> javaType.isAssignableFrom(Double::class.java) + is TableColumnType.String -> javaType.isAssignableFrom(String::class.java) + is TableColumnType.UUID -> javaType.isAssignableFrom(UUID::class.java) + is TableColumnType.Instant -> javaType.isAssignableFrom(Instant::class.java) + is TableColumnType.Duration -> javaType.isAssignableFrom(Duration::class.java) + is TableColumnType.List -> javaType.isAssignableFrom(List::class.java) + is TableColumnType.Set -> javaType.isAssignableFrom(Set::class.java) + is TableColumnType.Map -> javaType.isAssignableFrom(Map::class.java) + } +} diff --git a/opendc-trace/opendc-trace-azure/build.gradle.kts b/opendc-trace/opendc-trace-azure/build.gradle.kts index d4fe045e..ee53c583 100644 --- a/opendc-trace/opendc-trace-azure/build.gradle.kts +++ b/opendc-trace/opendc-trace-azure/build.gradle.kts @@ -25,9 +25,12 @@ description = "Support for Azure VM traces in OpenDC" /* Build configuration */ plugins { `kotlin-library-conventions` + `benchmark-conventions` } dependencies { api(projects.opendcTrace.opendcTraceApi) implementation(libs.jackson.dataformat.csv) + + testImplementation(projects.opendcTrace.opendcTraceTestkit) } diff --git a/opendc-trace/opendc-trace-azure/src/jmh/kotlin/org/opendc/trace/azure/AzureTraceBenchmarks.kt b/opendc-trace/opendc-trace-azure/src/jmh/kotlin/org/opendc/trace/azure/AzureTraceBenchmarks.kt new file mode 100644 index 00000000..4fcdce30 --- /dev/null +++ b/opendc-trace/opendc-trace-azure/src/jmh/kotlin/org/opendc/trace/azure/AzureTraceBenchmarks.kt @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.azure + +import org.opendc.trace.conv.* +import org.opendc.trace.spi.TraceFormat +import org.openjdk.jmh.annotations.* +import org.openjdk.jmh.infra.Blackhole +import java.nio.file.Path +import java.util.concurrent.TimeUnit + +/** + * Benchmarks for parsing traces in the Azure VM format. + */ +@State(Scope.Thread) +@Fork(1) +@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) +class AzureTraceBenchmarks { + private lateinit var path: Path + private lateinit var format: TraceFormat + + @Setup + fun setUp() { + path = Path.of("src/test/resources/trace") + format = AzureTraceFormat() + } + + @Benchmark + fun benchmarkResourcesReader(bh: Blackhole) { + val reader = format.newReader(path, TABLE_RESOURCES, null) + try { + val idColumn = reader.resolve(RESOURCE_ID) + while (reader.nextRow()) { + bh.consume(reader.getString(idColumn)) + } + } finally { + reader.close() + } + } + + @Benchmark + fun benchmarkResourceStatesReader(bh: Blackhole) { + val reader = format.newReader(path, TABLE_RESOURCE_STATES, null) + try { + val idColumn = reader.resolve(RESOURCE_ID) + while (reader.nextRow()) { + bh.consume(reader.getString(idColumn)) + } + } finally { + reader.close() + } + } +} diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt index 3132b1d9..c0c67329 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt @@ -29,17 +29,28 @@ import org.opendc.trace.* import org.opendc.trace.conv.RESOURCE_ID import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP +import java.time.Duration import java.time.Instant +import java.util.* /** * A [TableReader] for the Azure v1 VM resource state table. */ internal class AzureResourceStateTableReader(private val parser: CsvParser) : TableReader { + /** + * A flag to indicate whether a single row has been read already. + */ + private var isStarted = false + init { parser.schema = schema } override fun nextRow(): Boolean { + if (!isStarted) { + isStarted = true + } + reset() if (!nextStart()) { @@ -63,20 +74,22 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta return true } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + private val COL_ID = 0 + private val COL_TIMESTAMP = 1 + private val COL_CPU_USAGE_PCT = 2 - override fun isNull(index: Int): Boolean { - require(index in 0..columns.size) { "Invalid column index" } - return false + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP + RESOURCE_STATE_CPU_USAGE_PCT -> COL_CPU_USAGE_PCT + else -> -1 + } } - override fun get(index: Int): Any? { - return when (index) { - COL_ID -> id - COL_TIMESTAMP -> timestamp - COL_CPU_USAGE_PCT -> cpuUsagePct - else -> throw IllegalArgumentException("Invalid column index") - } + override fun isNull(index: Int): Boolean { + require(index in 0..COL_CPU_USAGE_PCT) { "Invalid column index" } + return false } override fun getBoolean(index: Int): Boolean { @@ -91,18 +104,66 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta throw IllegalArgumentException("Invalid column") } + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + override fun getDouble(index: Int): Double { + checkActive() return when (index) { COL_CPU_USAGE_PCT -> cpuUsagePct else -> throw IllegalArgumentException("Invalid column") } } + override fun getString(index: Int): String? { + checkActive() + return when (index) { + COL_ID -> id + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant? { + checkActive() + return when (index) { + COL_TIMESTAMP -> timestamp + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column") + } + + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + throw IllegalArgumentException("Invalid column") + } + + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + throw IllegalArgumentException("Invalid column") + } + + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + throw IllegalArgumentException("Invalid column") + } + override fun close() { parser.close() } /** + * Helper method to check if the reader is active. + */ + private fun checkActive() { + check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } + } + + /** * Advance the parser until the next object start. */ private fun nextStart(): Boolean { @@ -131,15 +192,6 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta cpuUsagePct = Double.NaN } - private val COL_ID = 0 - private val COL_TIMESTAMP = 1 - private val COL_CPU_USAGE_PCT = 2 - private val columns = mapOf( - RESOURCE_ID to COL_ID, - RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, - RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT - ) - companion object { /** * The [CsvSchema] that is used to parse the trace. diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt index 154a37e4..a8451301 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt @@ -27,17 +27,28 @@ import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* import org.opendc.trace.conv.* +import java.time.Duration import java.time.Instant +import java.util.* /** * A [TableReader] for the Azure v1 VM resources table. */ internal class AzureResourceTableReader(private val parser: CsvParser) : TableReader { + /** + * A flag to indicate whether a single row has been read already. + */ + private var isStarted = false + init { parser.schema = schema } override fun nextRow(): Boolean { + if (!isStarted) { + isStarted = true + } + reset() if (!nextStart()) { @@ -63,22 +74,26 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe return true } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + private val COL_ID = 0 + private val COL_START_TIME = 1 + private val COL_STOP_TIME = 2 + private val COL_CPU_COUNT = 3 + private val COL_MEM_CAPACITY = 4 - override fun isNull(index: Int): Boolean { - require(index in 0..columns.size) { "Invalid column index" } - return false + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_START_TIME -> COL_START_TIME + RESOURCE_STOP_TIME -> COL_STOP_TIME + RESOURCE_CPU_COUNT -> COL_CPU_COUNT + RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY + else -> -1 + } } - override fun get(index: Int): Any? { - return when (index) { - COL_ID -> id - COL_START_TIME -> startTime - COL_STOP_TIME -> stopTime - COL_CPU_COUNT -> getInt(index) - COL_MEM_CAPACITY -> getDouble(index) - else -> throw IllegalArgumentException("Invalid column") - } + override fun isNull(index: Int): Boolean { + require(index in 0..COL_MEM_CAPACITY) { "Invalid column index" } + return false } override fun getBoolean(index: Int): Boolean { @@ -86,6 +101,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe } override fun getInt(index: Int): Int { + checkActive() return when (index) { COL_CPU_COUNT -> cpuCores else -> throw IllegalArgumentException("Invalid column") @@ -93,21 +109,74 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe } override fun getLong(index: Int): Long { + checkActive() + return when (index) { + COL_CPU_COUNT -> cpuCores.toLong() + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getFloat(index: Int): Float { throw IllegalArgumentException("Invalid column") } override fun getDouble(index: Int): Double { + checkActive() return when (index) { COL_MEM_CAPACITY -> memCapacity else -> throw IllegalArgumentException("Invalid column") } } + override fun getString(index: Int): String? { + checkActive() + return when (index) { + COL_ID -> id + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant? { + checkActive() + return when (index) { + COL_START_TIME -> startTime + COL_STOP_TIME -> stopTime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column") + } + + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + throw IllegalArgumentException("Invalid column") + } + + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + throw IllegalArgumentException("Invalid column") + } + + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + throw IllegalArgumentException("Invalid column") + } + override fun close() { parser.close() } /** + * Helper method to check if the reader is active. + */ + private fun checkActive() { + check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } + } + + /** * Advance the parser until the next object start. */ private fun nextStart(): Boolean { @@ -140,19 +209,6 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe memCapacity = Double.NaN } - private val COL_ID = 0 - private val COL_START_TIME = 1 - private val COL_STOP_TIME = 2 - private val COL_CPU_COUNT = 3 - private val COL_MEM_CAPACITY = 4 - private val columns = mapOf( - RESOURCE_ID to COL_ID, - RESOURCE_START_TIME to COL_START_TIME, - RESOURCE_STOP_TIME to COL_STOP_TIME, - RESOURCE_CPU_COUNT to COL_CPU_COUNT, - RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY - ) - companion object { /** * The [CsvSchema] that is used to parse the trace. diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt index 73978990..2294e4a4 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt @@ -62,26 +62,25 @@ public class AzureTraceFormat : TraceFormat { return when (table) { TABLE_RESOURCES -> TableDetails( listOf( - RESOURCE_ID, - RESOURCE_START_TIME, - RESOURCE_STOP_TIME, - RESOURCE_CPU_COUNT, - RESOURCE_MEM_CAPACITY + TableColumn(RESOURCE_ID, TableColumnType.String), + TableColumn(RESOURCE_START_TIME, TableColumnType.Instant), + TableColumn(RESOURCE_STOP_TIME, TableColumnType.Instant), + TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int), + TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double), ) ) TABLE_RESOURCE_STATES -> TableDetails( listOf( - RESOURCE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_CPU_USAGE_PCT - ), - listOf(RESOURCE_STATE_TIMESTAMP) + TableColumn(RESOURCE_ID, TableColumnType.String), + TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant), + TableColumn(RESOURCE_STATE_CPU_USAGE_PCT, TableColumnType.Double), + ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { + override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { return when (table) { TABLE_RESOURCES -> { val stream = GZIPInputStream(path.resolve("vmtable/vmtable.csv.gz").inputStream()) diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt index 263d26ce..06ba047a 100644 --- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt @@ -22,15 +22,19 @@ package org.opendc.trace.azure +import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.* -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows +import org.junit.jupiter.api.Assertions.assertAll +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader import org.opendc.trace.conv.* +import org.opendc.trace.testkit.TableReaderTestKit import java.nio.file.Paths /** * Test suite for the [AzureTraceFormat] class. */ +@DisplayName("Azure VM TraceFormat") class AzureTraceFormatTest { private val format = AzureTraceFormat() @@ -60,7 +64,7 @@ class AzureTraceFormatTest { val reader = format.newReader(path, TABLE_RESOURCES, null) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) }, + { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.getString(RESOURCE_ID)) }, { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, { assertEquals(1750000.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) }, ) @@ -75,11 +79,41 @@ class AzureTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.get(RESOURCE_ID)) }, - { assertEquals(0, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) }, + { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.getString(RESOURCE_ID)) }, + { assertEquals(0, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) }, { assertEquals(0.0286979, reader.getDouble(RESOURCE_STATE_CPU_USAGE_PCT), 0.01) } ) reader.close() } + + @DisplayName("TableReader for Resources") + @Nested + inner class ResourcesTableReaderTest : TableReaderTestKit() { + override lateinit var reader: TableReader + override lateinit var columns: List<TableColumn> + + @BeforeEach + fun setUp() { + val path = Paths.get("src/test/resources/trace") + + columns = format.getDetails(path, TABLE_RESOURCES).columns + reader = format.newReader(path, TABLE_RESOURCES, null) + } + } + + @DisplayName("TableReader for Resource States") + @Nested + inner class ResourceStatesTableReaderTest : TableReaderTestKit() { + override lateinit var reader: TableReader + override lateinit var columns: List<TableColumn> + + @BeforeEach + fun setUp() { + val path = Paths.get("src/test/resources/trace") + + columns = format.getDetails(path, TABLE_RESOURCE_STATES).columns + reader = format.newReader(path, TABLE_RESOURCE_STATES, null) + } + } } diff --git a/opendc-trace/opendc-trace-bitbrains/build.gradle.kts b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts index 5211ec30..502b052a 100644 --- a/opendc-trace/opendc-trace-bitbrains/build.gradle.kts +++ b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts @@ -30,4 +30,6 @@ plugins { dependencies { api(projects.opendcTrace.opendcTraceApi) implementation(libs.jackson.dataformat.csv) + + testImplementation(projects.opendcTrace.opendcTraceTestkit) } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt index 1e1d1a09..df7a1c91 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt @@ -25,20 +25,37 @@ package org.opendc.trace.bitbrains import org.opendc.trace.* import org.opendc.trace.conv.* import java.io.BufferedReader +import java.time.Duration import java.time.Instant +import java.util.* /** * A [TableReader] for the Bitbrains resource state table. */ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedReader) : TableReader { + private var state = State.Pending + override fun nextRow(): Boolean { + val state = state + if (state == State.Closed) { + return false + } else if (state == State.Pending) { + this.state = State.Active + } + reset() - var line: String + var line: String? var num = 0 while (true) { - line = reader.readLine() ?: return false + line = reader.readLine() + + if (line == null) { + this.state = State.Closed + return false + } + num++ if (line[0] == '#' || line.isBlank()) { @@ -49,7 +66,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR break } - line = line.trim() + line = line!!.trim() val length = line.length var col = 0 @@ -89,26 +106,31 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR return true } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_CLUSTER_ID -> COL_CLUSTER_ID + RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP + RESOURCE_CPU_COUNT -> COL_NCPUS + RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY + RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE + RESOURCE_STATE_CPU_USAGE_PCT -> COL_CPU_USAGE_PCT + RESOURCE_STATE_CPU_DEMAND -> COL_CPU_DEMAND + RESOURCE_STATE_CPU_READY_PCT -> COL_CPU_READY_PCT + RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY + RESOURCE_STATE_DISK_READ -> COL_DISK_READ + RESOURCE_STATE_DISK_WRITE -> COL_DISK_WRITE + else -> -1 + } + } override fun isNull(index: Int): Boolean { - require(index in 0..COL_MAX) { "Invalid column index" } + require(index in 0 until COL_MAX) { "Invalid column index" } return false } - override fun get(index: Int): Any? { - return when (index) { - COL_ID -> id - COL_CLUSTER_ID -> cluster - COL_TIMESTAMP -> timestamp - COL_NCPUS -> getInt(index) - COL_POWERED_ON -> getInt(index) - COL_CPU_CAPACITY, COL_CPU_USAGE, COL_CPU_USAGE_PCT, COL_CPU_READY_PCT, COL_CPU_DEMAND, COL_MEM_CAPACITY, COL_DISK_READ, COL_DISK_WRITE -> getDouble(index) - else -> throw IllegalArgumentException("Invalid column") - } - } - override fun getBoolean(index: Int): Boolean { + check(state == State.Active) { "No active row" } return when (index) { COL_POWERED_ON -> poweredOn else -> throw IllegalArgumentException("Invalid column") @@ -116,6 +138,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR } override fun getInt(index: Int): Int { + check(state == State.Active) { "No active row" } return when (index) { COL_NCPUS -> cpuCores else -> throw IllegalArgumentException("Invalid column") @@ -126,7 +149,12 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR throw IllegalArgumentException("Invalid column") } + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + override fun getDouble(index: Int): Double { + check(state == State.Active) { "No active row" } return when (index) { COL_CPU_CAPACITY -> cpuCapacity COL_CPU_USAGE -> cpuUsage @@ -140,8 +168,47 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR } } + override fun getString(index: Int): String? { + check(state == State.Active) { "No active row" } + return when (index) { + COL_ID -> id + COL_CLUSTER_ID -> cluster + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant? { + check(state == State.Active) { "No active row" } + return when (index) { + COL_TIMESTAMP -> timestamp + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column") + } + + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + throw IllegalArgumentException("Invalid column") + } + + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + throw IllegalArgumentException("Invalid column") + } + + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + throw IllegalArgumentException("Invalid column") + } + override fun close() { reader.close() + reset() + state = State.Closed } /** @@ -196,18 +263,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR private val COL_CPU_USAGE_PCT = 21 private val COL_MAX = COL_CPU_USAGE_PCT + 1 - private val columns = mapOf( - RESOURCE_ID to COL_ID, - RESOURCE_CLUSTER_ID to COL_CLUSTER_ID, - RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, - RESOURCE_CPU_COUNT to COL_NCPUS, - RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY, - RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, - RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT, - RESOURCE_STATE_CPU_DEMAND to COL_CPU_DEMAND, - RESOURCE_STATE_CPU_READY_PCT to COL_CPU_READY_PCT, - RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY, - RESOURCE_STATE_DISK_READ to COL_DISK_READ, - RESOURCE_STATE_DISK_WRITE to COL_DISK_WRITE - ) + private enum class State { + Pending, Active, Closed + } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt index 82e454ad..31c4f1e2 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt @@ -53,26 +53,25 @@ public class BitbrainsExTraceFormat : TraceFormat { return when (table) { TABLE_RESOURCE_STATES -> TableDetails( listOf( - RESOURCE_ID, - RESOURCE_CLUSTER_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_CPU_COUNT, - RESOURCE_CPU_CAPACITY, - RESOURCE_STATE_CPU_USAGE, - RESOURCE_STATE_CPU_USAGE_PCT, - RESOURCE_STATE_CPU_DEMAND, - RESOURCE_STATE_CPU_READY_PCT, - RESOURCE_MEM_CAPACITY, - RESOURCE_STATE_DISK_READ, - RESOURCE_STATE_DISK_WRITE - ), - listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + TableColumn(RESOURCE_ID, TableColumnType.String), + TableColumn(RESOURCE_CLUSTER_ID, TableColumnType.String), + TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant), + TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int), + TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double), + TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double), + TableColumn(RESOURCE_STATE_CPU_USAGE_PCT, TableColumnType.Double), + TableColumn(RESOURCE_STATE_CPU_DEMAND, TableColumnType.Double), + TableColumn(RESOURCE_STATE_CPU_READY_PCT, TableColumnType.Double), + TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double), + TableColumn(RESOURCE_STATE_DISK_READ, TableColumnType.Double), + TableColumn(RESOURCE_STATE_DISK_WRITE, TableColumnType.Double), + ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { + override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { return when (table) { TABLE_RESOURCE_STATES -> newResourceStateReader(path) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt index 214fd749..4d8cf758 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt @@ -29,6 +29,7 @@ import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* import org.opendc.trace.conv.* import java.text.NumberFormat +import java.time.Duration import java.time.Instant import java.time.LocalDateTime import java.time.ZoneOffset @@ -41,6 +42,11 @@ import java.util.* */ internal class BitbrainsResourceStateTableReader(private val partition: String, private val parser: CsvParser) : TableReader { /** + * A flag to indicate whether a single row has been read already. + */ + private var isStarted = false + + /** * The [DateTimeFormatter] used to parse the timestamps in case of the Materna trace. */ private val formatter = DateTimeFormatter.ofPattern("dd.MM.yyyy HH:mm:ss") @@ -65,6 +71,10 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, } override fun nextRow(): Boolean { + if (!isStarted) { + isStarted = true + } + // Reset the row state reset() @@ -112,21 +122,40 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, return true } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + private val COL_TIMESTAMP = 0 + private val COL_CPU_COUNT = 1 + private val COL_CPU_CAPACITY = 2 + private val COL_CPU_USAGE = 3 + private val COL_CPU_USAGE_PCT = 4 + private val COL_MEM_CAPACITY = 5 + private val COL_MEM_USAGE = 6 + private val COL_DISK_READ = 7 + private val COL_DISK_WRITE = 8 + private val COL_NET_RX = 9 + private val COL_NET_TX = 10 + private val COL_ID = 11 - override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column index" } - return false + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP + RESOURCE_CPU_COUNT -> COL_CPU_COUNT + RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY + RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE + RESOURCE_STATE_CPU_USAGE_PCT -> COL_CPU_USAGE_PCT + RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY + RESOURCE_STATE_MEM_USAGE -> COL_MEM_USAGE + RESOURCE_STATE_DISK_READ -> COL_DISK_READ + RESOURCE_STATE_DISK_WRITE -> COL_DISK_WRITE + RESOURCE_STATE_NET_RX -> COL_NET_RX + RESOURCE_STATE_NET_TX -> COL_NET_TX + else -> -1 + } } - override fun get(index: Int): Any? { - return when (index) { - COL_ID -> partition - COL_TIMESTAMP -> timestamp - COL_CPU_COUNT -> getInt(index) - COL_CPU_CAPACITY, COL_CPU_USAGE, COL_CPU_USAGE_PCT, COL_MEM_CAPACITY, COL_MEM_USAGE, COL_DISK_READ, COL_DISK_WRITE, COL_NET_RX, COL_NET_TX -> getDouble(index) - else -> throw IllegalArgumentException("Invalid column") - } + override fun isNull(index: Int): Boolean { + require(index in 0..COL_ID) { "Invalid column index" } + return false } override fun getBoolean(index: Int): Boolean { @@ -134,6 +163,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, } override fun getInt(index: Int): Int { + checkActive() return when (index) { COL_CPU_COUNT -> cpuCores else -> throw IllegalArgumentException("Invalid column") @@ -144,7 +174,12 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, throw IllegalArgumentException("Invalid column") } + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + override fun getDouble(index: Int): Double { + checkActive() return when (index) { COL_CPU_CAPACITY -> cpuCapacity COL_CPU_USAGE -> cpuUsage @@ -159,11 +194,54 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, } } + override fun getString(index: Int): String { + checkActive() + return when (index) { + COL_ID -> partition + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant? { + checkActive() + return when (index) { + COL_TIMESTAMP -> timestamp + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column") + } + + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + throw IllegalArgumentException("Invalid column") + } + + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + throw IllegalArgumentException("Invalid column") + } + + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + throw IllegalArgumentException("Invalid column") + } + override fun close() { parser.close() } /** + * Helper method to check if the reader is active. + */ + private fun checkActive() { + check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } + } + + /** * Advance the parser until the next object start. */ private fun nextStart(): Boolean { @@ -228,34 +306,6 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, netTransmitted = Double.NaN } - private val COL_TIMESTAMP = 0 - private val COL_CPU_COUNT = 1 - private val COL_CPU_CAPACITY = 2 - private val COL_CPU_USAGE = 3 - private val COL_CPU_USAGE_PCT = 4 - private val COL_MEM_CAPACITY = 5 - private val COL_MEM_USAGE = 6 - private val COL_DISK_READ = 7 - private val COL_DISK_WRITE = 8 - private val COL_NET_RX = 9 - private val COL_NET_TX = 10 - private val COL_ID = 11 - - private val columns = mapOf( - RESOURCE_ID to COL_ID, - RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, - RESOURCE_CPU_COUNT to COL_CPU_COUNT, - RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY, - RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, - RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT, - RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY, - RESOURCE_STATE_MEM_USAGE to COL_MEM_USAGE, - RESOURCE_STATE_DISK_READ to COL_DISK_READ, - RESOURCE_STATE_DISK_WRITE to COL_DISK_WRITE, - RESOURCE_STATE_NET_RX to COL_NET_RX, - RESOURCE_STATE_NET_TX to COL_NET_TX - ) - /** * The type of the timestamp in the trace. */ diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt index 55f09f43..7454c40f 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt @@ -26,6 +26,9 @@ import com.fasterxml.jackson.dataformat.csv.CsvFactory import org.opendc.trace.* import org.opendc.trace.conv.RESOURCE_ID import java.nio.file.Path +import java.time.Duration +import java.time.Instant +import java.util.* /** * A [TableReader] for the Bitbrains resource table. @@ -36,7 +39,16 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms */ private val it = vms.iterator() + /** + * The state of the reader. + */ + private var state = State.Pending + override fun nextRow(): Boolean { + if (state == State.Pending) { + state = State.Active + } + reset() while (it.hasNext()) { @@ -51,47 +63,87 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms continue } - id = reader.get(idCol) as String + id = reader.getString(idCol) return true } finally { reader.close() } } + state = State.Closed return false } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + private val COL_ID = 0 + + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + else -> -1 + } + } override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column index" } + require(index in 0..COL_ID) { "Invalid column index" } return false } - override fun get(index: Int): Any? { + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + throw IllegalArgumentException("Invalid column") + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun getString(index: Int): String? { + check(state == State.Active) { "No active row" } return when (index) { COL_ID -> id else -> throw IllegalArgumentException("Invalid column") } } - override fun getBoolean(index: Int): Boolean { + override fun getUUID(index: Int): UUID? { throw IllegalArgumentException("Invalid column") } - override fun getInt(index: Int): Int { + override fun getInstant(index: Int): Instant? { throw IllegalArgumentException("Invalid column") } - override fun getLong(index: Int): Long { + override fun getDuration(index: Int): Duration? { throw IllegalArgumentException("Invalid column") } - override fun getDouble(index: Int): Double { + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { throw IllegalArgumentException("Invalid column") } - override fun close() {} + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + throw IllegalArgumentException("Invalid column") + } + + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + reset() + state = State.Closed + } /** * State fields of the reader. @@ -105,6 +157,7 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms id = null } - private val COL_ID = 0 - private val columns = mapOf(RESOURCE_ID to COL_ID) + private enum class State { + Pending, Active, Closed + } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt index a374e951..f3030893 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt @@ -59,29 +59,32 @@ public class BitbrainsTraceFormat : TraceFormat { override fun getDetails(path: Path, table: String): TableDetails { return when (table) { - TABLE_RESOURCES -> TableDetails(listOf(RESOURCE_ID)) + TABLE_RESOURCES -> TableDetails( + listOf( + TableColumn(RESOURCE_ID, TableColumnType.String) + ) + ) TABLE_RESOURCE_STATES -> TableDetails( listOf( - RESOURCE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_CPU_COUNT, - RESOURCE_CPU_CAPACITY, - RESOURCE_STATE_CPU_USAGE, - RESOURCE_STATE_CPU_USAGE_PCT, - RESOURCE_MEM_CAPACITY, - RESOURCE_STATE_MEM_USAGE, - RESOURCE_STATE_DISK_READ, - RESOURCE_STATE_DISK_WRITE, - RESOURCE_STATE_NET_RX, - RESOURCE_STATE_NET_TX, + TableColumn(RESOURCE_ID, TableColumnType.String), + TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant), + TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int), + TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double), + TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double), + TableColumn(RESOURCE_STATE_CPU_USAGE_PCT, TableColumnType.Double), + TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double), + TableColumn(RESOURCE_STATE_MEM_USAGE, TableColumnType.Double), + TableColumn(RESOURCE_STATE_DISK_READ, TableColumnType.Double), + TableColumn(RESOURCE_STATE_DISK_WRITE, TableColumnType.Double), + TableColumn(RESOURCE_STATE_NET_RX, TableColumnType.Double), + TableColumn(RESOURCE_STATE_NET_TX, TableColumnType.Double), ), - listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { + override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { return when (table) { TABLE_RESOURCES -> { val vms = Files.walk(path, 1) diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt index c944cb98..dbb75c50 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt @@ -22,12 +22,15 @@ package org.opendc.trace.bitbrains +import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.* -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows +import org.junit.jupiter.api.Assertions.assertAll +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.testkit.TableReaderTestKit import java.nio.file.Paths /** @@ -63,10 +66,25 @@ internal class BitbrainsExTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(1631911500, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) }, + { assertEquals(1631911500, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) }, { assertEquals(21.2, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) } ) reader.close() } + + @DisplayName("TableReader for Resource States") + @Nested + inner class ResourceStatesTableReaderTest : TableReaderTestKit() { + override lateinit var reader: TableReader + override lateinit var columns: List<TableColumn> + + @BeforeEach + fun setUp() { + val path = Paths.get("src/test/resources/vm.txt") + + columns = format.getDetails(path, TABLE_RESOURCE_STATES).columns + reader = format.newReader(path, TABLE_RESOURCE_STATES, null) + } + } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt index 841801e6..712e1fcb 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt @@ -22,10 +22,13 @@ package org.opendc.trace.bitbrains +import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.* -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows +import org.junit.jupiter.api.Assertions.assertAll +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader import org.opendc.trace.conv.* +import org.opendc.trace.testkit.TableReaderTestKit import java.nio.file.Paths /** @@ -61,7 +64,7 @@ class BitbrainsTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("bitbrains", reader.get(RESOURCE_ID)) }, + { assertEquals("bitbrains", reader.getString(RESOURCE_ID)) }, { assertFalse(reader.nextRow()) } ) @@ -75,10 +78,40 @@ class BitbrainsTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(1376314846, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) }, + { assertEquals(1376314846, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) }, { assertEquals(19.066, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) } ) reader.close() } + + @DisplayName("TableReader for Resources") + @Nested + inner class ResourcesTableReaderTest : TableReaderTestKit() { + override lateinit var reader: TableReader + override lateinit var columns: List<TableColumn> + + @BeforeEach + fun setUp() { + val path = Paths.get("src/test/resources/bitbrains.csv") + + columns = format.getDetails(path, TABLE_RESOURCES).columns + reader = format.newReader(path, TABLE_RESOURCES, null) + } + } + + @DisplayName("TableReader for Resource States") + @Nested + inner class ResourceStatesTableReaderTest : TableReaderTestKit() { + override lateinit var reader: TableReader + override lateinit var columns: List<TableColumn> + + @BeforeEach + fun setUp() { + val path = Paths.get("src/test/resources/bitbrains.csv") + + columns = format.getDetails(path, TABLE_RESOURCE_STATES).columns + reader = format.newReader(path, TABLE_RESOURCE_STATES, null) + } + } } diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt index 1854f262..74bd188b 100644 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt @@ -24,10 +24,10 @@ package org.opendc.trace.calcite import org.apache.calcite.linq4j.Enumerator import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType import org.opendc.trace.TableReader -import java.sql.Timestamp -import java.time.Duration -import java.time.Instant +import java.nio.ByteBuffer +import java.nio.ByteOrder import java.util.concurrent.atomic.AtomicBoolean /** @@ -35,10 +35,10 @@ import java.util.concurrent.atomic.AtomicBoolean */ internal class TraceReaderEnumerator<E>( private val reader: TableReader, - private val columns: List<TableColumn<*>>, + private val columns: List<TableColumn>, private val cancelFlag: AtomicBoolean ) : Enumerator<E> { - private val columnIndices = columns.map { reader.resolve(it) }.toIntArray() + private val columnIndices = columns.map { reader.resolve(it.name) }.toIntArray() private var current: E? = null override fun moveNext(): Boolean { @@ -80,14 +80,35 @@ internal class TraceReaderEnumerator<E>( return res } - private fun convertColumn(reader: TableReader, column: TableColumn<*>, columnIndex: Int): Any? { - val value = reader.get(columnIndex) - + private fun convertColumn(reader: TableReader, column: TableColumn, columnIndex: Int): Any? { return when (column.type) { - Instant::class.java -> Timestamp.from(value as Instant) - Duration::class.java -> (value as Duration).toMillis() - Set::class.java -> (value as Set<*>).toTypedArray() - else -> value + is TableColumnType.Boolean -> reader.getBoolean(columnIndex) + is TableColumnType.Int -> reader.getInt(columnIndex) + is TableColumnType.Long -> reader.getLong(columnIndex) + is TableColumnType.Float -> reader.getFloat(columnIndex) + is TableColumnType.Double -> reader.getDouble(columnIndex) + is TableColumnType.String -> reader.getString(columnIndex) + is TableColumnType.UUID -> { + val uuid = reader.getUUID(columnIndex) + + if (uuid != null) { + val uuidBytes = ByteArray(16) + + ByteBuffer.wrap(uuidBytes) + .order(ByteOrder.BIG_ENDIAN) + .putLong(uuid.mostSignificantBits) + .putLong(uuid.leastSignificantBits) + + uuidBytes + } else { + null + } + } + is TableColumnType.Instant -> reader.getInstant(columnIndex)?.toEpochMilli() + is TableColumnType.Duration -> reader.getDuration(columnIndex)?.toMillis() ?: 0 + is TableColumnType.List -> reader.getList(columnIndex, Any::class.java)?.toTypedArray() + is TableColumnType.Set -> reader.getSet(columnIndex, Any::class.java)?.toTypedArray() + is TableColumnType.Map -> reader.getMap(columnIndex, Any::class.java, Any::class.java) } } } diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt index 8c571b82..dfcc22a3 100644 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt @@ -38,8 +38,11 @@ import org.apache.calcite.rex.RexNode import org.apache.calcite.schema.* import org.apache.calcite.schema.impl.AbstractTableQueryable import org.apache.calcite.sql.type.SqlTypeName +import org.opendc.trace.TableColumnType +import java.nio.ByteBuffer import java.time.Duration import java.time.Instant +import java.util.* import java.util.concurrent.atomic.AtomicBoolean /** @@ -70,7 +73,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : val cancelFlag = DataContext.Variable.CANCEL_FLAG.get<AtomicBoolean>(root) return object : AbstractEnumerable<Array<Any?>>() { override fun enumerator(): Enumerator<Array<Any?>> = - TraceReaderEnumerator(table.newReader(projection), projection ?: table.columns, cancelFlag) + TraceReaderEnumerator(table.newReader(projection?.map { it.name }), projection ?: table.columns, cancelFlag) } } @@ -78,7 +81,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : val table = table val columns = table.columns val writer = table.newWriter() - val columnIndices = columns.map { writer.resolve(it) }.toIntArray() + val columnIndices = columns.map { writer.resolve(it.name) }.toIntArray() var rowCount = 0L try { @@ -90,16 +93,24 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : continue } val columnType = columns[index].type - - writer.set( - columnIndices[index], - when (columnType) { - Duration::class.java -> Duration.ofMillis(value as Long) - Instant::class.java -> Instant.ofEpochMilli(value as Long) - Set::class.java -> (value as List<*>).toSet() - else -> value + val columnIndex = columnIndices[index] + when (columnType) { + is TableColumnType.Boolean -> writer.setBoolean(columnIndex, value as Boolean) + is TableColumnType.Int -> writer.setInt(columnIndex, value as Int) + is TableColumnType.Long -> writer.setLong(columnIndex, value as Long) + is TableColumnType.Float -> writer.setFloat(columnIndex, value as Float) + is TableColumnType.Double -> writer.setDouble(columnIndex, value as Double) + is TableColumnType.String -> writer.setString(columnIndex, value as String) + is TableColumnType.UUID -> { + val bb = ByteBuffer.wrap(value as ByteArray) + writer.setUUID(columnIndex, UUID(bb.getLong(), bb.getLong())) } - ) + is TableColumnType.Instant -> writer.setInstant(columnIndex, Instant.ofEpochMilli(value as Long)) + is TableColumnType.Duration -> writer.setDuration(columnIndex, Duration.ofMillis(value as Long)) + is TableColumnType.List -> writer.setList(columnIndex, value as List<*>) + is TableColumnType.Set -> writer.setSet(columnIndex, (value as List<*>).toSet()) + is TableColumnType.Map -> writer.setMap(columnIndex, value as Map<*, *>) + } } writer.endRow() @@ -161,16 +172,26 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : for (column in table.columns) { names.add(column.name) - types.add( - when (column.type) { - Instant::class.java -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP) - Duration::class.java -> typeFactory.createSqlType(SqlTypeName.BIGINT) - Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.UNKNOWN), -1) - else -> typeFactory.createType(column.type) - } - ) + types.add(mapType(typeFactory, column.type)) } return typeFactory.createStructType(types, names) } + + private fun mapType(typeFactory: JavaTypeFactory, type: TableColumnType): RelDataType { + return when (type) { + is TableColumnType.Boolean -> typeFactory.createSqlType(SqlTypeName.BOOLEAN) + is TableColumnType.Int -> typeFactory.createSqlType(SqlTypeName.INTEGER) + is TableColumnType.Long -> typeFactory.createSqlType(SqlTypeName.BIGINT) + is TableColumnType.Float -> typeFactory.createSqlType(SqlTypeName.FLOAT) + is TableColumnType.Double -> typeFactory.createSqlType(SqlTypeName.DOUBLE) + is TableColumnType.String -> typeFactory.createSqlType(SqlTypeName.VARCHAR) + is TableColumnType.UUID -> typeFactory.createSqlType(SqlTypeName.BINARY, 16) + is TableColumnType.Instant -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP) + is TableColumnType.Duration -> typeFactory.createSqlType(SqlTypeName.BIGINT) + is TableColumnType.List -> typeFactory.createArrayType(mapType(typeFactory, type.elementType), -1) + is TableColumnType.Set -> typeFactory.createMultisetType(mapType(typeFactory, type.elementType), -1) + is TableColumnType.Map -> typeFactory.createMapType(mapType(typeFactory, type.keyType), mapType(typeFactory, type.valueType)) + } + } } diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt index d2877d7c..d8729034 100644 --- a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt +++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt @@ -22,16 +22,24 @@ package org.opendc.trace.calcite +import io.mockk.every +import io.mockk.mockk import org.apache.calcite.jdbc.CalciteConnection import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test +import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader import org.opendc.trace.Trace +import org.opendc.trace.conv.TABLE_RESOURCES import java.nio.file.Files import java.nio.file.Paths import java.sql.DriverManager import java.sql.ResultSet import java.sql.Statement import java.sql.Timestamp +import java.time.Duration +import java.time.Instant import java.util.* /** @@ -41,11 +49,11 @@ class CalciteTest { /** * The trace to experiment with. */ - private val trace = Trace.open(Paths.get("src/test/resources/trace"), format = "opendc-vm") + private val odcTrace = Trace.open(Paths.get("src/test/resources/trace"), format = "opendc-vm") @Test fun testResources() { - runQuery(trace, "SELECT * FROM trace.resources") { rs -> + runQuery(odcTrace, "SELECT * FROM trace.resources") { rs -> assertAll( { assertTrue(rs.next()) }, { assertEquals("1019", rs.getString("id")) }, @@ -65,7 +73,7 @@ class CalciteTest { @Test fun testResourceStates() { - runQuery(trace, "SELECT * FROM trace.resource_states") { rs -> + runQuery(odcTrace, "SELECT * FROM trace.resource_states") { rs -> assertAll( { assertTrue(rs.next()) }, { assertEquals("1019", rs.getString("id")) }, @@ -80,7 +88,7 @@ class CalciteTest { @Test fun testInterferenceGroups() { - runQuery(trace, "SELECT * FROM trace.interference_groups") { rs -> + runQuery(odcTrace, "SELECT * FROM trace.interference_groups") { rs -> assertAll( { assertTrue(rs.next()) }, { assertArrayEquals(arrayOf("1019", "1023", "1052"), rs.getArray("members").array as Array<*>) }, @@ -92,7 +100,7 @@ class CalciteTest { @Test fun testComplexQuery() { - runQuery(trace, "SELECT max(cpu_usage) as max_cpu_usage, avg(cpu_usage) as avg_cpu_usage FROM trace.resource_states") { rs -> + runQuery(odcTrace, "SELECT max(cpu_usage) as max_cpu_usage, avg(cpu_usage) as avg_cpu_usage FROM trace.resource_states") { rs -> assertAll( { assertTrue(rs.next()) }, { assertEquals(249.59993808, rs.getDouble("max_cpu_usage")) }, @@ -128,6 +136,84 @@ class CalciteTest { } } + @Test + fun testUUID() { + val trace = mockk<Trace>() + every { trace.tables } returns listOf(TABLE_RESOURCES) + every { trace.getTable(TABLE_RESOURCES)!!.columns } returns listOf( + TableColumn("id", TableColumnType.UUID) + ) + every { trace.getTable(TABLE_RESOURCES)!!.newReader() } answers { + object : TableReader { + override fun nextRow(): Boolean = true + + override fun resolve(name: String): Int { + return when (name) { + "id" -> 0 + else -> -1 + } + } + + override fun isNull(index: Int): Boolean = false + + override fun getBoolean(index: Int): Boolean { + TODO("not implemented") + } + + override fun getInt(index: Int): Int { + TODO("not implemented") + } + + override fun getLong(index: Int): Long { + TODO("not implemented") + } + + override fun getFloat(index: Int): Float { + TODO("not implemented") + } + + override fun getDouble(index: Int): Double { + TODO("not implemented") + } + + override fun getString(index: Int): String? { + TODO("not implemented") + } + + override fun getUUID(index: Int): UUID = UUID(1, 2) + + override fun getInstant(index: Int): Instant? { + TODO("not implemented") + } + + override fun getDuration(index: Int): Duration? { + TODO("not implemented") + } + + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + TODO("not implemented") + } + + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + TODO("not implemented") + } + + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + TODO("not implemented") + } + + override fun close() {} + } + } + + runQuery(trace, "SELECT id FROM trace.resources") { rs -> + assertAll( + { assertTrue(rs.next()) }, + { assertArrayEquals(byteArrayOf(0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2), rs.getBytes("id")) }, + ) + } + } + /** * Helper function to run statement for the specified trace. */ diff --git a/opendc-trace/opendc-trace-gwf/build.gradle.kts b/opendc-trace/opendc-trace-gwf/build.gradle.kts index 1105a465..0c041439 100644 --- a/opendc-trace/opendc-trace-gwf/build.gradle.kts +++ b/opendc-trace/opendc-trace-gwf/build.gradle.kts @@ -31,4 +31,6 @@ dependencies { api(projects.opendcTrace.opendcTraceApi) implementation(libs.jackson.dataformat.csv) + + testImplementation(projects.opendcTrace.opendcTraceTestkit) } diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index 42a9469e..f9a171e9 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -27,23 +27,34 @@ import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.util.convertTo import java.time.Duration import java.time.Instant +import java.util.* import java.util.regex.Pattern /** * A [TableReader] implementation for the GWF format. */ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { + /** + * A flag to indicate whether a single row has been read already. + */ + private var isStarted = false + init { parser.schema = schema } override fun nextRow(): Boolean { + if (!isStarted) { + isStarted = true + } + // Reset the row state reset() - if (!nextStart()) { + if (parser.isClosed || !nextStart()) { return false } @@ -68,51 +79,106 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { return true } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> COL_JOB_ID + TASK_WORKFLOW_ID -> COL_WORKFLOW_ID + TASK_SUBMIT_TIME -> COL_SUBMIT_TIME + TASK_RUNTIME -> COL_RUNTIME + TASK_ALLOC_NCPUS -> COL_NPROC + TASK_REQ_NCPUS -> COL_REQ_NPROC + TASK_PARENTS -> COL_DEPS + else -> -1 + } + } override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column" } + require(index in 0..COL_DEPS) { "Invalid column" } return false } - override fun get(index: Int): Any? { + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + checkActive() + return when (index) { + COL_REQ_NPROC -> reqNProcs + COL_NPROC -> nProcs + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun getString(index: Int): String? { + checkActive() return when (index) { COL_JOB_ID -> jobId COL_WORKFLOW_ID -> workflowId - COL_SUBMIT_TIME -> submitTime - COL_RUNTIME -> runtime - COL_REQ_NPROC -> getInt(index) - COL_NPROC -> getInt(index) - COL_DEPS -> dependencies else -> throw IllegalArgumentException("Invalid column") } } - override fun getBoolean(index: Int): Boolean { + override fun getUUID(index: Int): UUID? { throw IllegalArgumentException("Invalid column") } - override fun getInt(index: Int): Int { + override fun getInstant(index: Int): Instant? { + checkActive() return when (index) { - COL_REQ_NPROC -> reqNProcs - COL_NPROC -> nProcs + COL_SUBMIT_TIME -> submitTime else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(index: Int): Long { + override fun getDuration(index: Int): Duration? { + checkActive() + return when (index) { + COL_RUNTIME -> runtime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { throw IllegalArgumentException("Invalid column") } - override fun getDouble(index: Int): Double { + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { throw IllegalArgumentException("Invalid column") } + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + checkActive() + return when (index) { + COL_DEPS -> TYPE_DEPS.convertTo(dependencies, elementType) + else -> throw IllegalArgumentException("Invalid column") + } + } + override fun close() { parser.close() } /** + * Helper method to check if the reader is active. + */ + private fun checkActive() { + check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } + } + + /** * The pattern used to parse the parents. */ private val pattern = Pattern.compile("\\s+") @@ -180,15 +246,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { private val COL_REQ_NPROC = 5 private val COL_DEPS = 6 - private val columns = mapOf( - TASK_ID to COL_JOB_ID, - TASK_WORKFLOW_ID to COL_WORKFLOW_ID, - TASK_SUBMIT_TIME to COL_SUBMIT_TIME, - TASK_RUNTIME to COL_RUNTIME, - TASK_ALLOC_NCPUS to COL_NPROC, - TASK_REQ_NCPUS to COL_REQ_NPROC, - TASK_PARENTS to COL_DEPS - ) + private val TYPE_DEPS = TableColumnType.Set(TableColumnType.String) companion object { /** diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt index 8d9eab82..ca63b624 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt @@ -56,21 +56,20 @@ public class GwfTraceFormat : TraceFormat { return when (table) { TABLE_TASKS -> TableDetails( listOf( - TASK_WORKFLOW_ID, - TASK_ID, - TASK_SUBMIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_ALLOC_NCPUS, - TASK_PARENTS, - ), - listOf(TASK_WORKFLOW_ID) + TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { + override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { return when (table) { TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile())) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index 411d45d0..a8c3a715 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -24,7 +24,10 @@ package org.opendc.trace.gwf import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.* +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader import org.opendc.trace.conv.* +import org.opendc.trace.testkit.TableReaderTestKit import java.nio.file.Paths import java.time.Duration import java.time.Instant @@ -32,6 +35,7 @@ import java.time.Instant /** * Test suite for the [GwfTraceFormat] class. */ +@DisplayName("GWF TraceFormat") internal class GwfTraceFormatTest { private val format = GwfTraceFormat() @@ -62,11 +66,11 @@ internal class GwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals("1", reader.get(TASK_ID)) }, - { assertEquals(Instant.ofEpochSecond(16), reader.get(TASK_SUBMIT_TIME)) }, - { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) }, - { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) }, + { assertEquals("0", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals("1", reader.getString(TASK_ID)) }, + { assertEquals(Instant.ofEpochSecond(16), reader.getInstant(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofSeconds(11), reader.getDuration(TASK_RUNTIME)) }, + { assertEquals(emptySet<String>(), reader.getSet(TASK_PARENTS, String::class.java)) }, ) } @@ -81,11 +85,26 @@ internal class GwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals("7", reader.get(TASK_ID)) }, - { assertEquals(Instant.ofEpochSecond(87), reader.get(TASK_SUBMIT_TIME)) }, - { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) }, - { assertEquals(setOf("4", "5", "6"), reader.get(TASK_PARENTS)) }, + { assertEquals("0", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals("7", reader.getString(TASK_ID)) }, + { assertEquals(Instant.ofEpochSecond(87), reader.getInstant(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofSeconds(11), reader.getDuration(TASK_RUNTIME)) }, + { assertEquals(setOf("4", "5", "6"), reader.getSet(TASK_PARENTS, String::class.java)) }, ) } + + @DisplayName("TableReader for Tasks") + @Nested + inner class TasksTableReaderTest : TableReaderTestKit() { + override lateinit var reader: TableReader + override lateinit var columns: List<TableColumn> + + @BeforeEach + fun setUp() { + val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) + + columns = format.getDetails(path, TABLE_TASKS).columns + reader = format.newReader(path, TABLE_TASKS, null) + } + } } diff --git a/opendc-trace/opendc-trace-opendc/build.gradle.kts b/opendc-trace/opendc-trace-opendc/build.gradle.kts index 67568f48..236bdedc 100644 --- a/opendc-trace/opendc-trace-opendc/build.gradle.kts +++ b/opendc-trace/opendc-trace-opendc/build.gradle.kts @@ -25,6 +25,7 @@ description = "Support for OpenDC-specific trace formats" /* Build configuration */ plugins { `kotlin-library-conventions` + `benchmark-conventions` } dependencies { @@ -32,5 +33,6 @@ dependencies { implementation(projects.opendcTrace.opendcTraceParquet) + testImplementation(projects.opendcTrace.opendcTraceTestkit) testRuntimeOnly(libs.slf4j.simple) } diff --git a/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt b/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt new file mode 100644 index 00000000..b9b22931 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc + +import org.opendc.trace.conv.* +import org.opendc.trace.spi.TraceFormat +import org.openjdk.jmh.annotations.* +import org.openjdk.jmh.infra.Blackhole +import java.nio.file.Path +import java.util.concurrent.TimeUnit + +/** + * Benchmarks for parsing traces in the OpenDC vm format. + */ +@State(Scope.Thread) +@Fork(1) +@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) +class OdcVmTraceBenchmarks { + private lateinit var path: Path + private lateinit var format: TraceFormat + + @Setup + fun setUp() { + path = Path.of("../../opendc-experiments/opendc-experiments-capelin/src/test/resources/trace/bitbrains-small") + format = OdcVmTraceFormat() + } + + @Benchmark + fun benchmarkResourcesReader(bh: Blackhole) { + val reader = format.newReader(path, TABLE_RESOURCES, null) + try { + val idColumn = reader.resolve(RESOURCE_ID) + while (reader.nextRow()) { + bh.consume(reader.getString(idColumn)) + } + } finally { + reader.close() + } + } + + @Benchmark + fun benchmarkResourceStatesReader(bh: Blackhole) { + val reader = format.newReader(path, TABLE_RESOURCE_STATES, null) + try { + val idColumn = reader.resolve(RESOURCE_ID) + while (reader.nextRow()) { + bh.consume(reader.getString(idColumn)) + } + } finally { + reader.close() + } + } + + @Benchmark + fun benchmarkInterferenceGroupReader(bh: Blackhole) { + val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS, null) + try { + val scoreColumn = reader.resolve(INTERFERENCE_GROUP_SCORE) + while (reader.nextRow()) { + bh.consume(reader.getDouble(scoreColumn)) + } + } finally { + reader.close() + } + } +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt index eb91e305..1841c486 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt @@ -26,9 +26,13 @@ import org.opendc.trace.* import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET +import org.opendc.trace.util.convertTo import shaded.parquet.com.fasterxml.jackson.core.JsonParseException import shaded.parquet.com.fasterxml.jackson.core.JsonParser import shaded.parquet.com.fasterxml.jackson.core.JsonToken +import java.time.Duration +import java.time.Instant +import java.util.* /** * A [TableReader] implementation for the OpenDC VM interference JSON format. @@ -50,17 +54,24 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) } } - return if (parser.nextToken() != JsonToken.END_ARRAY) { - parseGroup(parser) - true - } else { + return if (parser.isClosed || parser.nextToken() == JsonToken.END_ARRAY) { + parser.close() reset() false + } else { + parseGroup(parser) + true } } - override fun resolve(column: TableColumn<*>): Int { - return when (column) { + private val COL_MEMBERS = 0 + private val COL_TARGET = 1 + private val COL_SCORE = 2 + + private val TYPE_MEMBERS = TableColumnType.Set(TableColumnType.String) + + override fun resolve(name: String): Int { + return when (name) { INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS INTERFERENCE_GROUP_TARGET -> COL_TARGET INTERFERENCE_GROUP_SCORE -> COL_SCORE @@ -75,48 +86,79 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) } } - override fun get(index: Int): Any { + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getInt(index: Int): Int { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column $index") + } + + override fun getDouble(index: Int): Double { + checkActive() return when (index) { - COL_MEMBERS -> members COL_TARGET -> targetLoad COL_SCORE -> score else -> throw IllegalArgumentException("Invalid column $index") } } - override fun getBoolean(index: Int): Boolean { + override fun getString(index: Int): String? { throw IllegalArgumentException("Invalid column $index") } - override fun getInt(index: Int): Int { + override fun getUUID(index: Int): UUID? { throw IllegalArgumentException("Invalid column $index") } - override fun getLong(index: Int): Long { + override fun getInstant(index: Int): Instant? { throw IllegalArgumentException("Invalid column $index") } - override fun getDouble(index: Int): Double { + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column $index") + } + + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + throw IllegalArgumentException("Invalid column $index") + } + + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + checkActive() return when (index) { - COL_TARGET -> targetLoad - COL_SCORE -> score + COL_MEMBERS -> TYPE_MEMBERS.convertTo(members, elementType) else -> throw IllegalArgumentException("Invalid column $index") } } + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + throw IllegalArgumentException("Invalid column $index") + } + override fun close() { parser.close() } - private val COL_MEMBERS = 0 - private val COL_TARGET = 1 - private val COL_SCORE = 2 - private var members = emptySet<String>() private var targetLoad = Double.POSITIVE_INFINITY private var score = 1.0 /** + * Helper method to check if the reader is active. + */ + private fun checkActive() { + check(isStarted && !parser.isClosed) { "No active row. Did you call nextRow()?" } + } + + /** * Reset the state. */ private fun reset() { diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt index 64bc4356..d726e890 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt @@ -27,6 +27,9 @@ import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET import shaded.parquet.com.fasterxml.jackson.core.JsonGenerator +import java.time.Duration +import java.time.Instant +import java.util.* /** * A [TableWriter] implementation for the OpenDC VM interference JSON format. @@ -65,8 +68,8 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener generator.writeEndObject() } - override fun resolve(column: TableColumn<*>): Int { - return when (column) { + override fun resolve(name: String): Int { + return when (name) { INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS INTERFERENCE_GROUP_TARGET -> COL_TARGET INTERFERENCE_GROUP_SCORE -> COL_SCORE @@ -74,40 +77,66 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener } } - override fun set(index: Int, value: Any) { + override fun setBoolean(index: Int, value: Boolean) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setInt(index: Int, value: Int) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setLong(index: Int, value: Long) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setFloat(index: Int, value: Float) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun setDouble(index: Int, value: Double) { check(isRowActive) { "No active row" } - @Suppress("UNCHECKED_CAST") when (index) { - COL_MEMBERS -> members = value as Set<String> COL_TARGET -> targetLoad = (value as Number).toDouble() COL_SCORE -> score = (value as Number).toDouble() - else -> throw IllegalArgumentException("Invalid column index $index") + else -> throw IllegalArgumentException("Invalid column $index") } } - override fun setBoolean(index: Int, value: Boolean) { + override fun setString(index: Int, value: String) { throw IllegalArgumentException("Invalid column $index") } - override fun setInt(index: Int, value: Int) { + override fun setUUID(index: Int, value: UUID) { throw IllegalArgumentException("Invalid column $index") } - override fun setLong(index: Int, value: Long) { + override fun setInstant(index: Int, value: Instant) { throw IllegalArgumentException("Invalid column $index") } - override fun setDouble(index: Int, value: Double) { + override fun setDuration(index: Int, value: Duration) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun <T> setList(index: Int, value: List<T>) { + throw IllegalArgumentException("Invalid column $index") + } + + override fun <T> setSet(index: Int, value: Set<T>) { check(isRowActive) { "No active row" } + @Suppress("UNCHECKED_CAST") when (index) { - COL_TARGET -> targetLoad = (value as Number).toDouble() - COL_SCORE -> score = (value as Number).toDouble() - else -> throw IllegalArgumentException("Invalid column $index") + COL_MEMBERS -> members = value as Set<String> + else -> throw IllegalArgumentException("Invalid column index $index") } } + override fun <K, V> setMap(index: Int, value: Map<K, V>) { + throw IllegalArgumentException("Invalid column $index") + } + override fun flush() { generator.flush() } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index 7a01b881..b256047f 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -26,6 +26,9 @@ import org.opendc.trace.* import org.opendc.trace.conv.* import org.opendc.trace.opendc.parquet.ResourceState import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Duration +import java.time.Instant +import java.util.* /** * A [TableReader] implementation for the OpenDC virtual machine trace format. @@ -48,24 +51,26 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea } } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + private val COL_ID = 0 + private val COL_TIMESTAMP = 1 + private val COL_DURATION = 2 + private val COL_CPU_COUNT = 3 + private val COL_CPU_USAGE = 4 - override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column index" } - return get(index) == null + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP + RESOURCE_STATE_DURATION -> COL_DURATION + RESOURCE_CPU_COUNT -> COL_CPU_COUNT + RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE + else -> -1 + } } - override fun get(index: Int): Any? { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - COL_ID -> record.id - COL_TIMESTAMP -> record.timestamp - COL_DURATION -> record.duration - COL_CPU_COUNT -> record.cpuCount - COL_CPU_USAGE -> record.cpuUsage - else -> throw IllegalArgumentException("Invalid column index $index") - } + override fun isNull(index: Int): Boolean { + require(index in 0..COL_CPU_USAGE) { "Invalid column index" } + return false } override fun getBoolean(index: Int): Boolean { @@ -84,6 +89,10 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea throw IllegalArgumentException("Invalid column or type [index $index]") } + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { @@ -92,23 +101,52 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea } } + override fun getString(index: Int): String { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + COL_ID -> record.id + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun getInstant(index: Int): Instant { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + COL_TIMESTAMP -> record.timestamp + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun getDuration(index: Int): Duration { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + COL_DURATION -> record.duration + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + override fun close() { reader.close() } override fun toString(): String = "OdcVmResourceStateTableReader" - - private val COL_ID = 0 - private val COL_TIMESTAMP = 1 - private val COL_DURATION = 2 - private val COL_CPU_COUNT = 3 - private val COL_CPU_USAGE = 4 - - private val columns = mapOf( - RESOURCE_ID to COL_ID, - RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, - RESOURCE_STATE_DURATION to COL_DURATION, - RESOURCE_CPU_COUNT to COL_CPU_COUNT, - RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, - ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt index 97af5b59..30375de0 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt @@ -28,6 +28,7 @@ import org.opendc.trace.conv.* import org.opendc.trace.opendc.parquet.ResourceState import java.time.Duration import java.time.Instant +import java.util.* /** * A [TableWriter] implementation for the OpenDC virtual machine trace format. @@ -64,17 +65,14 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R lastTimestamp = _timestamp } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 - - override fun set(index: Int, value: Any) { - check(_isActive) { "No active row" } - - when (index) { - COL_ID -> _id = value as String - COL_TIMESTAMP -> _timestamp = value as Instant - COL_DURATION -> _duration = value as Duration - COL_CPU_COUNT -> _cpuCount = value as Int - COL_CPU_USAGE -> _cpuUsage = value as Double + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP + RESOURCE_STATE_DURATION -> COL_DURATION + RESOURCE_CPU_COUNT -> COL_CPU_COUNT + RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE + else -> -1 } } @@ -94,6 +92,10 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R throw IllegalArgumentException("Invalid column or type [index $index]") } + override fun setFloat(index: Int, value: Float) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + override fun setDouble(index: Int, value: Double) { check(_isActive) { "No active row" } when (index) { @@ -102,6 +104,49 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R } } + override fun setString(index: Int, value: String) { + check(_isActive) { "No active row" } + + when (index) { + COL_ID -> _id = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun setUUID(index: Int, value: UUID) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setInstant(index: Int, value: Instant) { + check(_isActive) { "No active row" } + + when (index) { + COL_TIMESTAMP -> _timestamp = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun setDuration(index: Int, value: Duration) { + check(_isActive) { "No active row" } + + when (index) { + COL_DURATION -> _duration = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } + + override fun <T> setList(index: Int, value: List<T>) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun <T> setSet(index: Int, value: Set<T>) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun <K, V> setMap(index: Int, value: Map<K, V>) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + override fun flush() { // Not available } @@ -121,12 +166,4 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R private val COL_DURATION = 2 private val COL_CPU_COUNT = 3 private val COL_CPU_USAGE = 4 - - private val columns = mapOf( - RESOURCE_ID to COL_ID, - RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, - RESOURCE_STATE_DURATION to COL_DURATION, - RESOURCE_CPU_COUNT to COL_CPU_COUNT, - RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, - ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt index 6102332f..76fdbca8 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt @@ -26,6 +26,9 @@ import org.opendc.trace.* import org.opendc.trace.conv.* import org.opendc.trace.opendc.parquet.Resource import org.opendc.trace.util.parquet.LocalParquetReader +import java.time.Duration +import java.time.Instant +import java.util.* /** * A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format. @@ -48,25 +51,28 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R } } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + private val COL_ID = 0 + private val COL_START_TIME = 1 + private val COL_STOP_TIME = 2 + private val COL_CPU_COUNT = 3 + private val COL_CPU_CAPACITY = 4 + private val COL_MEM_CAPACITY = 5 - override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column index" } - return get(index) == null + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_START_TIME -> COL_START_TIME + RESOURCE_STOP_TIME -> COL_STOP_TIME + RESOURCE_CPU_COUNT -> COL_CPU_COUNT + RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY + RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY + else -> -1 + } } - override fun get(index: Int): Any? { - val record = checkNotNull(record) { "Reader in invalid state" } - - return when (index) { - COL_ID -> record.id - COL_START_TIME -> record.startTime - COL_STOP_TIME -> record.stopTime - COL_CPU_COUNT -> getInt(index) - COL_CPU_CAPACITY -> getDouble(index) - COL_MEM_CAPACITY -> getDouble(index) - else -> throw IllegalArgumentException("Invalid column") - } + override fun isNull(index: Int): Boolean { + require(index in 0..COL_MEM_CAPACITY) { "Invalid column index" } + return false } override fun getBoolean(index: Int): Boolean { @@ -86,6 +92,10 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R throw IllegalArgumentException("Invalid column") } + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } @@ -96,25 +106,48 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R } } + override fun getString(index: Int): String { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + COL_ID -> record.id + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } + + override fun getInstant(index: Int): Instant { + val record = checkNotNull(record) { "Reader in invalid state" } + + return when (index) { + COL_START_TIME -> record.startTime + COL_STOP_TIME -> record.stopTime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration? { + throw IllegalArgumentException("Invalid column") + } + + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + throw IllegalArgumentException("Invalid column") + } + + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + throw IllegalArgumentException("Invalid column") + } + + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + throw IllegalArgumentException("Invalid column") + } + override fun close() { reader.close() } override fun toString(): String = "OdcVmResourceTableReader" - - private val COL_ID = 0 - private val COL_START_TIME = 1 - private val COL_STOP_TIME = 2 - private val COL_CPU_COUNT = 3 - private val COL_CPU_CAPACITY = 4 - private val COL_MEM_CAPACITY = 5 - - private val columns = mapOf( - RESOURCE_ID to COL_ID, - RESOURCE_START_TIME to COL_START_TIME, - RESOURCE_STOP_TIME to COL_STOP_TIME, - RESOURCE_CPU_COUNT to COL_CPU_COUNT, - RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY, - RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY, - ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt index cae65faa..8117c3cd 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt @@ -26,7 +26,9 @@ import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.* import org.opendc.trace.conv.* import org.opendc.trace.opendc.parquet.Resource +import java.time.Duration import java.time.Instant +import java.util.* /** * A [TableWriter] implementation for the OpenDC virtual machine trace format. @@ -59,18 +61,15 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour writer.write(Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity)) } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 - - override fun set(index: Int, value: Any) { - check(_isActive) { "No active row" } - when (index) { - COL_ID -> _id = value as String - COL_START_TIME -> _startTime = value as Instant - COL_STOP_TIME -> _stopTime = value as Instant - COL_CPU_COUNT -> _cpuCount = value as Int - COL_CPU_CAPACITY -> _cpuCapacity = value as Double - COL_MEM_CAPACITY -> _memCapacity = value as Double - else -> throw IllegalArgumentException("Invalid column index $index") + override fun resolve(name: String): Int { + return when (name) { + RESOURCE_ID -> COL_ID + RESOURCE_START_TIME -> COL_START_TIME + RESOURCE_STOP_TIME -> COL_STOP_TIME + RESOURCE_CPU_COUNT -> COL_CPU_COUNT + RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY + RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY + else -> -1 } } @@ -90,6 +89,10 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour throw IllegalArgumentException("Invalid column or type [index $index]") } + override fun setFloat(index: Int, value: Float) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + override fun setDouble(index: Int, value: Double) { check(_isActive) { "No active row" } when (index) { @@ -99,6 +102,43 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour } } + override fun setString(index: Int, value: String) { + check(_isActive) { "No active row" } + when (index) { + COL_ID -> _id = value + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun setUUID(index: Int, value: UUID) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun setInstant(index: Int, value: Instant) { + check(_isActive) { "No active row" } + when (index) { + COL_START_TIME -> _startTime = value + COL_STOP_TIME -> _stopTime = value + else -> throw IllegalArgumentException("Invalid column index $index") + } + } + + override fun setDuration(index: Int, value: Duration) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun <T> setList(index: Int, value: List<T>) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun <T> setSet(index: Int, value: Set<T>) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + + override fun <K, V> setMap(index: Int, value: Map<K, V>) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } + override fun flush() { // Not available } @@ -113,13 +153,4 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour private val COL_CPU_COUNT = 3 private val COL_CPU_CAPACITY = 4 private val COL_MEM_CAPACITY = 5 - - private val columns = mapOf( - RESOURCE_ID to COL_ID, - RESOURCE_START_TIME to COL_START_TIME, - RESOURCE_STOP_TIME to COL_STOP_TIME, - RESOURCE_CPU_COUNT to COL_CPU_COUNT, - RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY, - RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY, - ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index d45910c6..a9c5b934 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -73,36 +73,35 @@ public class OdcVmTraceFormat : TraceFormat { return when (table) { TABLE_RESOURCES -> TableDetails( listOf( - RESOURCE_ID, - RESOURCE_START_TIME, - RESOURCE_STOP_TIME, - RESOURCE_CPU_COUNT, - RESOURCE_CPU_CAPACITY, - RESOURCE_MEM_CAPACITY, + TableColumn(RESOURCE_ID, TableColumnType.String), + TableColumn(RESOURCE_START_TIME, TableColumnType.Instant), + TableColumn(RESOURCE_STOP_TIME, TableColumnType.Instant), + TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int), + TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double), + TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double), ) ) TABLE_RESOURCE_STATES -> TableDetails( listOf( - RESOURCE_ID, - RESOURCE_STATE_TIMESTAMP, - RESOURCE_STATE_DURATION, - RESOURCE_CPU_COUNT, - RESOURCE_STATE_CPU_USAGE, - ), - listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP) + TableColumn(RESOURCE_ID, TableColumnType.String), + TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant), + TableColumn(RESOURCE_STATE_DURATION, TableColumnType.Duration), + TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int), + TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double), + ) ) TABLE_INTERFERENCE_GROUPS -> TableDetails( listOf( - INTERFERENCE_GROUP_MEMBERS, - INTERFERENCE_GROUP_TARGET, - INTERFERENCE_GROUP_SCORE, + TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)), + TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double), + TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double), ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { + override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { return when (table) { TABLE_RESOURCES -> { val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection)) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt index 0d70446d..8a8ed790 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt @@ -33,11 +33,11 @@ import org.opendc.trace.conv.* /** * A [ReadSupport] instance for [Resource] objects. */ -internal class ResourceReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Resource>() { +internal class ResourceReadSupport(private val projection: List<String>?) : ReadSupport<Resource>() { /** * Mapping from field names to [TableColumn]s. */ - private val fieldMap = mapOf<String, TableColumn<*>>( + private val fieldMap = mapOf( "id" to RESOURCE_ID, "submissionTime" to RESOURCE_START_TIME, "start_time" to RESOURCE_START_TIME, diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt index 97aa00b2..78adc649 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt @@ -33,11 +33,11 @@ import org.opendc.trace.conv.* /** * A [ReadSupport] instance for [ResourceState] objects. */ -internal class ResourceStateReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<ResourceState>() { +internal class ResourceStateReadSupport(private val projection: List<String>?) : ReadSupport<ResourceState>() { /** * Mapping from field names to [TableColumn]s. */ - private val fieldMap = mapOf<String, TableColumn<*>>( + private val fieldMap = mapOf( "id" to RESOURCE_ID, "time" to RESOURCE_STATE_TIMESTAMP, "timestamp" to RESOURCE_STATE_TIMESTAMP, diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index 1f4f6195..9fdffb2b 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -23,12 +23,20 @@ package org.opendc.trace.opendc import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Nested import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader +import org.opendc.trace.TableWriter import org.opendc.trace.conv.* +import org.opendc.trace.testkit.TableReaderTestKit +import org.opendc.trace.testkit.TableWriterTestKit import java.nio.file.Files import java.nio.file.Paths import java.time.Instant @@ -36,6 +44,7 @@ import java.time.Instant /** * Test suite for the [OdcVmTraceFormat] implementation. */ +@DisplayName("OdcVmTraceFormat") internal class OdcVmTraceFormatTest { private val format = OdcVmTraceFormat() @@ -67,14 +76,14 @@ internal class OdcVmTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1019", reader.get(RESOURCE_ID)) }, - { assertEquals(Instant.ofEpochMilli(1376314846000), reader.get(RESOURCE_START_TIME)) }, + { assertEquals("1019", reader.getString(RESOURCE_ID)) }, + { assertEquals(Instant.ofEpochMilli(1376314846000), reader.getInstant(RESOURCE_START_TIME)) }, { assertTrue(reader.nextRow()) }, - { assertEquals("1023", reader.get(RESOURCE_ID)) }, + { assertEquals("1023", reader.getString(RESOURCE_ID)) }, { assertTrue(reader.nextRow()) }, - { assertEquals("1052", reader.get(RESOURCE_ID)) }, + { assertEquals("1052", reader.getString(RESOURCE_ID)) }, { assertTrue(reader.nextRow()) }, - { assertEquals("1073", reader.get(RESOURCE_ID)) }, + { assertEquals("1073", reader.getString(RESOURCE_ID)) }, { assertFalse(reader.nextRow()) } ) @@ -87,9 +96,9 @@ internal class OdcVmTraceFormatTest { val writer = format.newWriter(path, TABLE_RESOURCES) writer.startRow() - writer.set(RESOURCE_ID, "1019") - writer.set(RESOURCE_START_TIME, Instant.EPOCH) - writer.set(RESOURCE_STOP_TIME, Instant.EPOCH) + writer.setString(RESOURCE_ID, "1019") + writer.setInstant(RESOURCE_START_TIME, Instant.EPOCH) + writer.setInstant(RESOURCE_STOP_TIME, Instant.EPOCH) writer.setInt(RESOURCE_CPU_COUNT, 1) writer.setDouble(RESOURCE_CPU_CAPACITY, 1024.0) writer.setDouble(RESOURCE_MEM_CAPACITY, 1024.0) @@ -100,9 +109,9 @@ internal class OdcVmTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1019", reader.get(RESOURCE_ID)) }, - { assertEquals(Instant.EPOCH, reader.get(RESOURCE_START_TIME)) }, - { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STOP_TIME)) }, + { assertEquals("1019", reader.getString(RESOURCE_ID)) }, + { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_START_TIME)) }, + { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_STOP_TIME)) }, { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, { assertEquals(1024.0, reader.getDouble(RESOURCE_CPU_CAPACITY)) }, { assertEquals(1024.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) }, @@ -124,8 +133,8 @@ internal class OdcVmTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1019", reader.get(RESOURCE_ID)) }, - { assertEquals(1376314846, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) }, + { assertEquals("1019", reader.getString(RESOURCE_ID)) }, + { assertEquals(1376314846, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) }, { assertEquals(0.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) } ) @@ -138,8 +147,8 @@ internal class OdcVmTraceFormatTest { val writer = format.newWriter(path, TABLE_RESOURCE_STATES) writer.startRow() - writer.set(RESOURCE_ID, "1019") - writer.set(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH) + writer.setString(RESOURCE_ID, "1019") + writer.setInstant(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH) writer.setDouble(RESOURCE_STATE_CPU_USAGE, 23.0) writer.setInt(RESOURCE_CPU_COUNT, 1) writer.endRow() @@ -149,8 +158,8 @@ internal class OdcVmTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1019", reader.get(RESOURCE_ID)) }, - { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STATE_TIMESTAMP)) }, + { assertEquals("1019", reader.getString(RESOURCE_ID)) }, + { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_STATE_TIMESTAMP)) }, { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, { assertEquals(23.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE)) }, { assertFalse(reader.nextRow()) }, @@ -170,13 +179,13 @@ internal class OdcVmTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(setOf("1019", "1023", "1052"), reader.get(INTERFERENCE_GROUP_MEMBERS)) }, - { assertEquals(0.0, reader.get(INTERFERENCE_GROUP_TARGET)) }, - { assertEquals(0.8830158730158756, reader.get(INTERFERENCE_GROUP_SCORE)) }, + { assertEquals(setOf("1019", "1023", "1052"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) }, + { assertEquals(0.0, reader.getDouble(INTERFERENCE_GROUP_TARGET)) }, + { assertEquals(0.8830158730158756, reader.getDouble(INTERFERENCE_GROUP_SCORE)) }, { assertTrue(reader.nextRow()) }, - { assertEquals(setOf("1023", "1052", "1073"), reader.get(INTERFERENCE_GROUP_MEMBERS)) }, - { assertEquals(0.0, reader.get(INTERFERENCE_GROUP_TARGET)) }, - { assertEquals(0.7133055555552751, reader.get(INTERFERENCE_GROUP_SCORE)) }, + { assertEquals(setOf("1023", "1052", "1073"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) }, + { assertEquals(0.0, reader.getDouble(INTERFERENCE_GROUP_TARGET)) }, + { assertEquals(0.7133055555552751, reader.getDouble(INTERFERENCE_GROUP_SCORE)) }, { assertFalse(reader.nextRow()) } ) @@ -191,4 +200,130 @@ internal class OdcVmTraceFormatTest { assertFalse(reader.nextRow()) reader.close() } + + @Test + fun testInterferenceGroupsWrite() { + val path = Files.createTempDirectory("opendc") + val writer = format.newWriter(path, TABLE_INTERFERENCE_GROUPS) + + writer.startRow() + writer.setSet(INTERFERENCE_GROUP_MEMBERS, setOf("a", "b", "c")) + writer.setDouble(INTERFERENCE_GROUP_TARGET, 0.5) + writer.setDouble(INTERFERENCE_GROUP_SCORE, 0.8) + writer.endRow() + writer.flush() + + writer.startRow() + writer.setSet(INTERFERENCE_GROUP_MEMBERS, setOf("a", "b", "d")) + writer.setDouble(INTERFERENCE_GROUP_TARGET, 0.5) + writer.setDouble(INTERFERENCE_GROUP_SCORE, 0.9) + writer.endRow() + writer.close() + + val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS, null) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals(setOf("a", "b", "c"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) }, + { assertEquals(0.5, reader.getDouble(INTERFERENCE_GROUP_TARGET)) }, + { assertEquals(0.8, reader.getDouble(INTERFERENCE_GROUP_SCORE)) }, + { assertTrue(reader.nextRow()) }, + { assertEquals(setOf("a", "b", "d"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) }, + { assertEquals(0.5, reader.getDouble(INTERFERENCE_GROUP_TARGET)) }, + { assertEquals(0.9, reader.getDouble(INTERFERENCE_GROUP_SCORE)) }, + { assertFalse(reader.nextRow()) }, + ) + + reader.close() + } + + @DisplayName("TableReader for Resources") + @Nested + inner class ResourcesTableReaderTest : TableReaderTestKit() { + override lateinit var reader: TableReader + override lateinit var columns: List<TableColumn> + + @BeforeEach + fun setUp() { + val path = Paths.get("src/test/resources/trace-v2.1") + + columns = format.getDetails(path, TABLE_RESOURCES).columns + reader = format.newReader(path, TABLE_RESOURCES, null) + } + } + + @DisplayName("TableWriter for Resources") + @Nested + inner class ResourcesTableWriterTest : TableWriterTestKit() { + override lateinit var writer: TableWriter + override lateinit var columns: List<TableColumn> + + @BeforeEach + fun setUp() { + val path = Files.createTempDirectory("opendc") + + columns = format.getDetails(Paths.get("src/test/resources/trace-v2.1"), TABLE_RESOURCES).columns + writer = format.newWriter(path, TABLE_RESOURCES) + } + } + + @DisplayName("TableReader for Resource States") + @Nested + inner class ResourceStatesTableReaderTest : TableReaderTestKit() { + override lateinit var reader: TableReader + override lateinit var columns: List<TableColumn> + + @BeforeEach + fun setUp() { + val path = Paths.get("src/test/resources/trace-v2.1") + + columns = format.getDetails(path, TABLE_RESOURCE_STATES).columns + reader = format.newReader(path, TABLE_RESOURCE_STATES, null) + } + } + + @DisplayName("TableWriter for Resource States") + @Nested + inner class ResourceStatesTableWriterTest : TableWriterTestKit() { + override lateinit var writer: TableWriter + override lateinit var columns: List<TableColumn> + + @BeforeEach + fun setUp() { + val path = Files.createTempDirectory("opendc") + + columns = format.getDetails(Paths.get("src/test/resources/trace-v2.1"), TABLE_RESOURCE_STATES).columns + writer = format.newWriter(path, TABLE_RESOURCE_STATES) + } + } + + @DisplayName("TableReader for Interference Groups") + @Nested + inner class InterferenceGroupsTableReaderTest : TableReaderTestKit() { + override lateinit var reader: TableReader + override lateinit var columns: List<TableColumn> + + @BeforeEach + fun setUp() { + val path = Paths.get("src/test/resources/trace-v2.1") + + columns = format.getDetails(path, TABLE_INTERFERENCE_GROUPS).columns + reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS, null) + } + } + + @DisplayName("TableWriter for Interference Groups") + @Nested + inner class InterferenceGroupsTableWriterTest : TableWriterTestKit() { + override lateinit var writer: TableWriter + override lateinit var columns: List<TableColumn> + + @BeforeEach + fun setUp() { + val path = Files.createTempDirectory("opendc") + + columns = format.getDetails(Paths.get("src/test/resources/trace-v2.1"), TABLE_INTERFERENCE_GROUPS).columns + writer = format.newWriter(path, TABLE_INTERFERENCE_GROUPS) + } + } } diff --git a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt index be354319..29fcac96 100644 --- a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt +++ b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt @@ -77,6 +77,7 @@ internal class ParquetTest { } private val readSupport = object : ReadSupport<Int>() { + @Suppress("OVERRIDE_DEPRECATION") override fun init( configuration: Configuration, keyValueMetaData: Map<String, String>, diff --git a/opendc-trace/opendc-trace-swf/build.gradle.kts b/opendc-trace/opendc-trace-swf/build.gradle.kts index 20b03c80..d3bc5aa6 100644 --- a/opendc-trace/opendc-trace-swf/build.gradle.kts +++ b/opendc-trace/opendc-trace-swf/build.gradle.kts @@ -29,4 +29,6 @@ plugins { dependencies { api(projects.opendcTrace.opendcTraceApi) + + testImplementation(projects.opendcTrace.opendcTraceTestkit) } diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt index 40b604c3..b2734fe7 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt @@ -27,12 +27,18 @@ import org.opendc.trace.conv.* import java.io.BufferedReader import java.time.Duration import java.time.Instant +import java.util.* /** * A [TableReader] implementation for the SWF format. */ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableReader { /** + * A flag to indicate the state of the reader + */ + private var state = State.Pending + + /** * The current row. */ private var fields = emptyList<String>() @@ -43,11 +49,23 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea private val whitespace = "\\s+".toRegex() override fun nextRow(): Boolean { - var line: String + var line: String? var num = 0 + val state = state + if (state == State.Closed) { + return false + } else if (state == State.Pending) { + this.state = State.Active + } + while (true) { - line = reader.readLine() ?: return false + line = reader.readLine() + + if (line == null) { + this.state = State.Closed + return false + } num++ if (line.isBlank()) { @@ -61,7 +79,7 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea break } - fields = line.trim().split(whitespace) + fields = line!!.trim().split(whitespace) if (fields.size < 18) { throw IllegalArgumentException("Invalid format at line $line") @@ -70,48 +88,103 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea return true } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> COL_JOB_ID + TASK_SUBMIT_TIME -> COL_SUBMIT_TIME + TASK_WAIT_TIME -> COL_WAIT_TIME + TASK_RUNTIME -> COL_RUN_TIME + TASK_ALLOC_NCPUS -> COL_ALLOC_NCPUS + TASK_REQ_NCPUS -> COL_REQ_NCPUS + TASK_STATUS -> COL_STATUS + TASK_USER_ID -> COL_USER_ID + TASK_GROUP_ID -> COL_GROUP_ID + TASK_PARENTS -> COL_PARENT_JOB + else -> -1 + } + } override fun isNull(index: Int): Boolean { - require(index in columns.values) { "Invalid column index" } + require(index in COL_JOB_ID..COL_PARENT_THINK_TIME) { "Invalid column index" } return false } - override fun get(index: Int): Any? { + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + check(state == State.Active) { "No active row" } + return when (index) { + COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> fields[index].toInt(10) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun getString(index: Int): String { + check(state == State.Active) { "No active row" } return when (index) { COL_JOB_ID -> fields[index] - COL_SUBMIT_TIME -> Instant.ofEpochSecond(fields[index].toLong(10)) - COL_WAIT_TIME, COL_RUN_TIME -> Duration.ofSeconds(fields[index].toLong(10)) - COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> getInt(index) - COL_PARENT_JOB -> { - val parent = fields[index].toLong(10) - if (parent < 0) emptySet() else setOf(parent) - } else -> throw IllegalArgumentException("Invalid column") } } - override fun getBoolean(index: Int): Boolean { + override fun getUUID(index: Int): UUID? { throw IllegalArgumentException("Invalid column") } - override fun getInt(index: Int): Int { + override fun getInstant(index: Int): Instant? { + check(state == State.Active) { "No active row" } return when (index) { - COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> fields[index].toInt(10) + COL_SUBMIT_TIME -> Instant.ofEpochSecond(fields[index].toLong(10)) else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(index: Int): Long { + override fun getDuration(index: Int): Duration? { + check(state == State.Active) { "No active row" } + return when (index) { + COL_WAIT_TIME, COL_RUN_TIME -> Duration.ofSeconds(fields[index].toLong(10)) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { throw IllegalArgumentException("Invalid column") } - override fun getDouble(index: Int): Double { + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + check(state == State.Active) { "No active row" } + @Suppress("UNCHECKED_CAST") + return when (index) { + COL_PARENT_JOB -> { + require(elementType.isAssignableFrom(String::class.java)) + val parent = fields[index].toLong(10) + if (parent < 0) emptySet() else setOf(parent) + } + else -> throw IllegalArgumentException("Invalid column") + } as Set<T>? + } + + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { throw IllegalArgumentException("Invalid column") } override fun close() { reader.close() + state = State.Closed } /** @@ -136,16 +209,7 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea private val COL_PARENT_JOB = 16 private val COL_PARENT_THINK_TIME = 17 - private val columns = mapOf( - TASK_ID to COL_JOB_ID, - TASK_SUBMIT_TIME to COL_SUBMIT_TIME, - TASK_WAIT_TIME to COL_WAIT_TIME, - TASK_RUNTIME to COL_RUN_TIME, - TASK_ALLOC_NCPUS to COL_ALLOC_NCPUS, - TASK_REQ_NCPUS to COL_REQ_NCPUS, - TASK_STATUS to COL_STATUS, - TASK_USER_ID to COL_USER_ID, - TASK_GROUP_ID to COL_GROUP_ID, - TASK_PARENTS to COL_PARENT_JOB - ) + private enum class State { + Pending, Active, Closed + } } diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt index 916a5eca..575a1740 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt @@ -47,24 +47,23 @@ public class SwfTraceFormat : TraceFormat { return when (table) { TABLE_TASKS -> TableDetails( listOf( - TASK_ID, - TASK_SUBMIT_TIME, - TASK_WAIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_ALLOC_NCPUS, - TASK_PARENTS, - TASK_STATUS, - TASK_GROUP_ID, - TASK_USER_ID - ), - emptyList() + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), + TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_STATUS, TableColumnType.Int), + TableColumn(TASK_GROUP_ID, TableColumnType.Int), + TableColumn(TASK_USER_ID, TableColumnType.Int) + ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { + override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { return when (table) { TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader()) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt index c3d644e8..06a500d8 100644 --- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -24,14 +24,18 @@ package org.opendc.trace.swf import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.* +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader import org.opendc.trace.conv.TABLE_TASKS import org.opendc.trace.conv.TASK_ALLOC_NCPUS import org.opendc.trace.conv.TASK_ID +import org.opendc.trace.testkit.TableReaderTestKit import java.nio.file.Paths /** * Test suite for the [SwfTraceFormat] class. */ +@DisplayName("SWF TraceFormat") internal class SwfTraceFormatTest { private val format = SwfTraceFormat() @@ -62,13 +66,28 @@ internal class SwfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1", reader.get(TASK_ID)) }, + { assertEquals("1", reader.getString(TASK_ID)) }, { assertEquals(306, reader.getInt(TASK_ALLOC_NCPUS)) }, { assertTrue(reader.nextRow()) }, - { assertEquals("2", reader.get(TASK_ID)) }, + { assertEquals("2", reader.getString(TASK_ID)) }, { assertEquals(17, reader.getInt(TASK_ALLOC_NCPUS)) }, ) reader.close() } + + @DisplayName("TableReader for Tasks") + @Nested + inner class TasksTableReaderTest : TableReaderTestKit() { + override lateinit var reader: TableReader + override lateinit var columns: List<TableColumn> + + @BeforeEach + fun setUp() { + val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) + + columns = format.getDetails(path, TABLE_TASKS).columns + reader = format.newReader(path, TABLE_TASKS, null) + } + } } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt b/opendc-trace/opendc-trace-testkit/build.gradle.kts index a58505e9..f6b7222c 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt +++ b/opendc-trace/opendc-trace-testkit/build.gradle.kts @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,17 +20,15 @@ * SOFTWARE. */ -@file:JvmName("TableColumns") -package org.opendc.trace.conv +description = "Reusable test suite for implementors" -import org.opendc.trace.TableColumn +/* Build configuration */ +plugins { + `kotlin-library-conventions` +} -/** - * Construct a [TableColumn] with the specified [name] and type [T]. - */ -public inline fun <reified T> column(name: String): TableColumn<T> = column(name, T::class.java) - -/** - * Construct a [TableColumn] with the specified [name] and [type]. - */ -public fun <T> column(name: String, type: Class<T>): TableColumn<T> = TableColumn(name, type) +dependencies { + api(projects.opendcTrace.opendcTraceApi) + implementation(libs.junit.jupiter.api) + implementation(libs.junit.jupiter.params) +} diff --git a/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableReaderTestKit.kt b/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableReaderTestKit.kt new file mode 100644 index 00000000..291ca2b1 --- /dev/null +++ b/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableReaderTestKit.kt @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.testkit + +import org.junit.jupiter.api.* +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Assumptions.assumeTrue +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertDoesNotThrow +import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader + +/** + * A test suite for implementations of the [TableReader] interface. + */ +public abstract class TableReaderTestKit { + /** + * The [TableReader] instance to test. + */ + public abstract val reader: TableReader + + /** + * The columns of the table. + */ + public abstract val columns: List<TableColumn> + + @AfterEach + public fun tearDown() { + reader.close() + } + + /** + * Test that we can resolve the columns of a table successfully. + */ + @Test + public fun testResolve() { + assertAll(columns.map { column -> { assertNotEquals(-1, reader.resolve(column.name)) } }) + } + + /** + * Test that resolving an empty column name fails + */ + @Test + public fun testResolveEmpty() { + assertEquals(-1, reader.resolve("")) + } + + /** + * Test that reading non-existent columns fails. + */ + @Test + public fun testReadNonExistentColumns() { + assumeTrue(reader.nextRow()) + assertAll( + { assertThrows<IllegalArgumentException> { reader.isNull(-1) } }, + { assertThrows<IllegalArgumentException> { reader.getBoolean(-1) } }, + { assertThrows<IllegalArgumentException> { reader.getInt(-1) } }, + { assertThrows<IllegalArgumentException> { reader.getLong(-1) } }, + { assertThrows<IllegalArgumentException> { reader.getFloat(-1) } }, + { assertThrows<IllegalArgumentException> { reader.getDouble(-1) } }, + { assertThrows<IllegalArgumentException> { reader.getString(-1) } }, + { assertThrows<IllegalArgumentException> { reader.getUUID(-1) } }, + { assertThrows<IllegalArgumentException> { reader.getInstant(-1) } }, + { assertThrows<IllegalArgumentException> { reader.getDuration(-1) } }, + { assertThrows<IllegalArgumentException> { reader.getList(-1, Any::class.java) } }, + { assertThrows<IllegalArgumentException> { reader.getSet(-1, Any::class.java) } }, + { assertThrows<IllegalArgumentException> { reader.getMap(-1, Any::class.java, Any::class.java) } }, + ) + } + + /** + * Test that ensures [TableReader.isNull] reports the correct value. + */ + @Test + public fun testVerifyNullColumns() { + while (reader.nextRow()) { + assertAll( + columns.map { column -> + { + when (column.type) { + is TableColumnType.Boolean -> assertFalse(reader.isNull(column.name) && !reader.getBoolean(column.name)) + is TableColumnType.Int -> assertFalse(reader.isNull(column.name) && reader.getInt(column.name) != 0) + is TableColumnType.Long -> assertFalse(reader.isNull(column.name) && reader.getLong(column.name) != 0L) + is TableColumnType.Float -> assertFalse(reader.isNull(column.name) && reader.getFloat(column.name) != 0f) + is TableColumnType.Double -> assertFalse(reader.isNull(column.name) && reader.getDouble(column.name) != 0.0) + is TableColumnType.String -> assertFalse(reader.isNull(column.name) && reader.getString(column.name) != null) + is TableColumnType.UUID -> assertFalse(reader.isNull(column.name) && reader.getUUID(column.name) != null) + is TableColumnType.Instant -> assertFalse(reader.isNull(column.name) && reader.getInstant(column.name) != null) + is TableColumnType.Duration -> assertFalse(reader.isNull(column.name) && reader.getDuration(column.name) != null) + is TableColumnType.List -> assertFalse(reader.isNull(column.name) && reader.getList(column.name, Any::class.java) != null) + is TableColumnType.Set -> assertFalse(reader.isNull(column.name) && reader.getSet(column.name, Any::class.java) != null) + is TableColumnType.Map -> assertFalse(reader.isNull(column.name) && reader.getMap(column.name, Any::class.java, Any::class.java) != null) + } + } + } + ) + } + } + + /** + * Test that we can read the entire table without any issue. + */ + @Test + public fun testReadFully() { + assertDoesNotThrow { + while (reader.nextRow()) { + assertAll(columns.map { column -> { assertDoesNotThrow { reader.get(column) } } }) + } + reader.close() + } + + assertFalse(reader.nextRow()) { "Reader does not reset" } + } + + /** + * Test that the reader throws an exception when the columns are read without a call to [TableReader.nextRow] + */ + @Test + public fun testReadWithoutNextRow() { + assertAll(columns.map { column -> { assertThrows<IllegalStateException> { reader.get(column) } } }) + } + + /** + * Test that the reader throws an exception when the columns are read after the [TableReader] is finished. + */ + @Test + public fun testReadAfterFinish() { + @Suppress("ControlFlowWithEmptyBody") + while (reader.nextRow()) {} + + testReadWithoutNextRow() + } + + /** + * Helper method to map a [TableColumn] to a read. + */ + private fun TableReader.get(column: TableColumn): Any? { + return when (column.type) { + is TableColumnType.Boolean -> getBoolean(column.name) + is TableColumnType.Int -> getInt(column.name) + is TableColumnType.Long -> getLong(column.name) + is TableColumnType.Float -> getFloat(column.name) + is TableColumnType.Double -> getDouble(column.name) + is TableColumnType.String -> getString(column.name) + is TableColumnType.UUID -> getUUID(column.name) + is TableColumnType.Instant -> getInstant(column.name) + is TableColumnType.Duration -> getDuration(column.name) + is TableColumnType.List -> getList(column.name, Any::class.java) + is TableColumnType.Set -> getSet(column.name, Any::class.java) + is TableColumnType.Map -> getMap(column.name, Any::class.java, Any::class.java) + } + } +} diff --git a/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableWriterTestKit.kt b/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableWriterTestKit.kt new file mode 100644 index 00000000..fab992f0 --- /dev/null +++ b/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableWriterTestKit.kt @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.testkit + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows +import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableWriter +import java.time.Duration +import java.time.Instant +import java.util.* + +/** + * A test suite for implementations of the [TableWriter] interface. + */ +public abstract class TableWriterTestKit { + /** + * The [TableWriter] instance to test. + */ + public abstract val writer: TableWriter + + /** + * The columns of the table. + */ + public abstract val columns: List<TableColumn> + + @AfterEach + public fun tearDown() { + writer.close() + } + + /** + * Test that we can resolve the columns of a table successfully. + */ + @Test + public fun testResolve() { + assertAll(columns.map { column -> { assertNotEquals(-1, writer.resolve(column.name)) } }) + } + + /** + * Test that resolving an empty column name fails + */ + @Test + public fun testResolveEmpty() { + assertEquals(-1, writer.resolve("")) + } + + /** + * Test that writing non-existent columns fails. + */ + @Test + public fun testWriteNonExistentColumns() { + writer.startRow() + assertAll( + { assertThrows<IllegalArgumentException> { writer.setBoolean(-1, false) } }, + { assertThrows<IllegalArgumentException> { writer.setInt(-1, 1) } }, + { assertThrows<IllegalArgumentException> { writer.setLong(-1, 1) } }, + { assertThrows<IllegalArgumentException> { writer.setFloat(-1, 1f) } }, + { assertThrows<IllegalArgumentException> { writer.setDouble(-1, 1.0) } }, + { assertThrows<IllegalArgumentException> { writer.setString(-1, "test") } }, + { assertThrows<IllegalArgumentException> { writer.setUUID(-1, UUID.randomUUID()) } }, + { assertThrows<IllegalArgumentException> { writer.setInstant(-1, Instant.now()) } }, + { assertThrows<IllegalArgumentException> { writer.setDuration(-1, Duration.ofMinutes(5)) } }, + { assertThrows<IllegalArgumentException> { writer.setList(-1, listOf("test")) } }, + { assertThrows<IllegalArgumentException> { writer.setSet(-1, setOf("test")) } }, + { assertThrows<IllegalArgumentException> { writer.setMap(-1, mapOf("test" to "test")) } }, + ) + } + + /** + * Test that writing columns without a row fails. + */ + @Test + public fun testWriteWithoutRow() { + assertAll( + columns.map { column -> + { + assertThrows<IllegalStateException> { + when (column.type) { + is TableColumnType.Boolean -> writer.setBoolean(column.name, true) + is TableColumnType.Int -> writer.setInt(column.name, 21) + is TableColumnType.Long -> writer.setLong(column.name, 21) + is TableColumnType.Float -> writer.setFloat(column.name, 42f) + is TableColumnType.Double -> writer.setDouble(column.name, 42.0) + is TableColumnType.String -> writer.setString(column.name, "test") + is TableColumnType.UUID -> writer.setUUID(column.name, UUID.randomUUID()) + is TableColumnType.Instant -> writer.setInstant(column.name, Instant.now()) + is TableColumnType.Duration -> writer.setDuration(column.name, Duration.ofMinutes(5)) + is TableColumnType.List -> writer.setList(column.name, emptyList<String>()) + is TableColumnType.Set -> writer.setSet(column.name, emptySet<String>()) + is TableColumnType.Map -> writer.setMap(column.name, emptyMap<String, String>()) + } + } + } + } + ) + } + + /** + * Test to verify we cannot end a row without starting it. + */ + @Test + public fun testEndRowWithoutStart() { + assertThrows<IllegalStateException> { writer.endRow() } + } +} diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt index 970de0f4..b6d661e0 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt @@ -224,9 +224,9 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b var stopTime = Long.MIN_VALUE do { - id = reader.get(idCol) as String + id = reader.getString(idCol)!! - val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli() + val timestamp = reader.getInstant(timestampCol)!!.toEpochMilli() startTime = min(startTime, timestamp) stopTime = max(stopTime, timestamp) @@ -238,7 +238,7 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b } hasNextRow = reader.nextRow() - } while (hasNextRow && id == reader.get(RESOURCE_ID)) + } while (hasNextRow && id == reader.getString(RESOURCE_ID)) // Sample only a fraction of the VMs if (random != null && random.nextDouble() > samplingFraction) { @@ -255,9 +255,9 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b } writer.startRow() - writer.set(RESOURCE_ID, id) - writer.set(RESOURCE_START_TIME, startInstant) - writer.set(RESOURCE_STOP_TIME, stopInstant) + writer.setString(RESOURCE_ID, id) + writer.setInstant(RESOURCE_START_TIME, startInstant) + writer.setInstant(RESOURCE_STOP_TIME, stopInstant) writer.setInt(RESOURCE_CPU_COUNT, cpuCount) writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity) writer.setDouble(RESOURCE_MEM_CAPACITY, max(memCapacity, memUsage)) @@ -280,7 +280,7 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b var count = 0 while (hasNextRow) { - val id = reader.get(idCol) as String + val id = reader.getString(idCol)!! val resource = selected[id] if (resource == null) { hasNextRow = reader.nextRow() @@ -290,13 +290,13 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b val cpuCount = reader.getInt(cpuCountCol) val cpuUsage = reader.getDouble(cpuUsageCol) - val startTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli() + val startTimestamp = reader.getInstant(timestampCol)!!.toEpochMilli() var timestamp: Long = startTimestamp var duration: Long = sampleInterval // Attempt to cascade further samples into one if they share the same CPU usage while (reader.nextRow().also { hasNextRow = it }) { - val shouldCascade = id == reader.get(idCol) && + val shouldCascade = id == reader.getString(idCol) && abs(cpuUsage - reader.getDouble(cpuUsageCol)) < SAMPLE_CASCADE_DIFF && cpuCount == reader.getInt(cpuCountCol) @@ -308,7 +308,7 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b break } - val nextTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli() + val nextTimestamp = reader.getInstant(timestampCol)!!.toEpochMilli() // Check whether the interval between both samples is not higher than `SAMPLE_INTERVAL` if ((nextTimestamp - timestamp) > sampleInterval) { @@ -320,9 +320,9 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b } writer.startRow() - writer.set(RESOURCE_ID, id) - writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(timestamp)) - writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) + writer.setString(RESOURCE_ID, id) + writer.setInstant(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(timestamp)) + writer.setDuration(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) writer.setInt(RESOURCE_CPU_COUNT, cpuCount) writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) writer.endRow() @@ -377,9 +377,9 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b continue } - val id = reader.get(idCol) as String - val startTime = (reader.get(startTimeCol) as Instant).toEpochMilli() - val stopTime = (reader.get(stopTimeCol) as Instant).toEpochMilli() + val id = reader.getString(idCol)!! + val startTime = reader.getInstant(startTimeCol)!!.toEpochMilli() + val stopTime = reader.getInstant(stopTimeCol)!!.toEpochMilli() val cpuCount = reader.getInt(cpuCountCol) val memCapacity = reader.getDouble(memCapacityCol) @@ -394,9 +394,9 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b } writer.startRow() - writer.set(RESOURCE_ID, id) - writer.set(RESOURCE_START_TIME, startInstant) - writer.set(RESOURCE_STOP_TIME, stopInstant) + writer.setString(RESOURCE_ID, id) + writer.setInstant(RESOURCE_START_TIME, startInstant) + writer.setInstant(RESOURCE_STOP_TIME, stopInstant) writer.setInt(RESOURCE_CPU_COUNT, cpuCount) writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity) writer.setDouble(RESOURCE_MEM_CAPACITY, memCapacity) @@ -418,12 +418,12 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b var count = 0 while (reader.nextRow()) { - val id = reader.get(idCol) as String + val id = reader.getString(idCol)!! val resource = selected[id] ?: continue val cpuUsage = reader.getDouble(cpuUsageCol) * resource.cpuCapacity // MHz val state = states.computeIfAbsent(id) { State(resource, cpuUsage, sampleInterval) } - val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli() + val timestamp = reader.getInstant(timestampCol)!!.toEpochMilli() val delta = (timestamp - state.time) // Check whether the next sample can be cascaded with the current sample: @@ -463,9 +463,9 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b lastWrite = time writer.startRow() - writer.set(RESOURCE_ID, resource.id) - writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(time)) - writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) + writer.setString(RESOURCE_ID, resource.id) + writer.setInstant(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(time)) + writer.setDuration(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) writer.setInt(RESOURCE_CPU_COUNT, resource.cpuCount) writer.endRow() diff --git a/opendc-trace/opendc-trace-wfformat/build.gradle.kts b/opendc-trace/opendc-trace-wfformat/build.gradle.kts index 875f7915..a0e22b16 100644 --- a/opendc-trace/opendc-trace-wfformat/build.gradle.kts +++ b/opendc-trace/opendc-trace-wfformat/build.gradle.kts @@ -31,4 +31,6 @@ dependencies { api(projects.opendcTrace.opendcTraceApi) implementation(libs.jackson.core) + + testImplementation(projects.opendcTrace.opendcTraceTestkit) } diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt index d8eafa9c..ca1a29d0 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt @@ -27,7 +27,10 @@ import com.fasterxml.jackson.core.JsonParser import com.fasterxml.jackson.core.JsonToken import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.util.convertTo import java.time.Duration +import java.time.Instant +import java.util.* import kotlin.math.roundToInt /** @@ -51,6 +54,7 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe // Check whether the document is not empty and starts with an object if (token == null) { + parser.close() break } else if (token != JsonToken.START_OBJECT) { throw JsonParseException(parser, "Expected object", parser.currentLocation) @@ -61,6 +65,7 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe ParserLevel.TRACE -> { // Seek for the workflow object in the file if (!seekWorkflow()) { + parser.close() break } else if (!parser.isExpectedStartObjectToken) { throw JsonParseException(parser, "Expected object", parser.currentLocation) @@ -95,41 +100,86 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe return hasJob } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> COL_ID + TASK_WORKFLOW_ID -> COL_WORKFLOW_ID + TASK_RUNTIME -> COL_RUNTIME + TASK_REQ_NCPUS -> COL_NPROC + TASK_PARENTS -> COL_PARENTS + TASK_CHILDREN -> COL_CHILDREN + else -> -1 + } + } override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column value" } + require(index in 0..COL_CHILDREN) { "Invalid column value" } return false } - override fun get(index: Int): Any? { + override fun getBoolean(index: Int): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(index: Int): Int { + checkActive() + return when (index) { + COL_NPROC -> cores + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(index: Int): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(index: Int): Double { + throw IllegalArgumentException("Invalid column") + } + + override fun getString(index: Int): String? { + checkActive() return when (index) { COL_ID -> id COL_WORKFLOW_ID -> workflowId - COL_RUNTIME -> runtime - COL_PARENTS -> parents - COL_CHILDREN -> children - COL_NPROC -> getInt(index) else -> throw IllegalArgumentException("Invalid column") } } - override fun getBoolean(index: Int): Boolean { + override fun getUUID(index: Int): UUID? { throw IllegalArgumentException("Invalid column") } - override fun getInt(index: Int): Int { + override fun getInstant(index: Int): Instant? { + throw IllegalArgumentException("Invalid column") + } + + override fun getDuration(index: Int): Duration? { + checkActive() return when (index) { - COL_NPROC -> cores + COL_RUNTIME -> runtime else -> throw IllegalArgumentException("Invalid column") } } - override fun getLong(index: Int): Long { + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { throw IllegalArgumentException("Invalid column") } - override fun getDouble(index: Int): Double { + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + checkActive() + return when (index) { + COL_PARENTS -> TYPE_PARENTS.convertTo(parents, elementType) + COL_CHILDREN -> TYPE_CHILDREN.convertTo(children, elementType) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { throw IllegalArgumentException("Invalid column") } @@ -138,10 +188,17 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe } /** + * Helper method to check if the reader is active. + */ + private fun checkActive() { + check(level != ParserLevel.TOP && !parser.isClosed) { "No active row. Did you call nextRow()?" } + } + + /** * Parse the trace and seek until the workflow description. */ private fun seekWorkflow(): Boolean { - while (parser.nextValue() != JsonToken.END_OBJECT) { + while (parser.nextValue() != JsonToken.END_OBJECT && !parser.isClosed) { when (parser.currentName) { "name" -> workflowId = parser.text "workflow" -> return true @@ -232,12 +289,6 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe private val COL_PARENTS = 5 private val COL_CHILDREN = 6 - private val columns = mapOf( - TASK_ID to COL_ID, - TASK_WORKFLOW_ID to COL_WORKFLOW_ID, - TASK_RUNTIME to COL_RUNTIME, - TASK_REQ_NCPUS to COL_NPROC, - TASK_PARENTS to COL_PARENTS, - TASK_CHILDREN to COL_CHILDREN, - ) + private val TYPE_PARENTS = TableColumnType.Set(TableColumnType.String) + private val TYPE_CHILDREN = TableColumnType.Set(TableColumnType.String) } diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt index 8db4c169..154fa061 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt @@ -50,20 +50,19 @@ public class WfFormatTraceFormat : TraceFormat { return when (table) { TABLE_TASKS -> TableDetails( listOf( - TASK_ID, - TASK_WORKFLOW_ID, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_PARENTS, - TASK_CHILDREN - ), - emptyList() + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)) + ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { + override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { return when (table) { TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile())) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt index e27bc82c..9d9735b1 100644 --- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt @@ -210,7 +210,7 @@ internal class WfFormatTaskTableReaderTest { val reader = WfFormatTaskTableReader(parser) assertTrue(reader.nextRow()) - assertEquals("test", reader.get(TASK_ID)) + assertEquals("test", reader.getString(TASK_ID)) assertFalse(reader.nextRow()) reader.close() @@ -281,7 +281,7 @@ internal class WfFormatTaskTableReaderTest { val reader = WfFormatTaskTableReader(parser) assertTrue(reader.nextRow()) - assertEquals(setOf("1"), reader.get(TASK_PARENTS)) + assertEquals(setOf("1"), reader.getSet(TASK_PARENTS, String::class.java)) assertFalse(reader.nextRow()) reader.close() @@ -337,7 +337,7 @@ internal class WfFormatTaskTableReaderTest { assertTrue(reader.nextRow()) assertTrue(reader.nextRow()) - assertEquals("test2", reader.get(TASK_ID)) + assertEquals("test2", reader.getString(TASK_ID)) assertFalse(reader.nextRow()) reader.close() diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt index 4a8b2792..40506d59 100644 --- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt @@ -22,17 +22,20 @@ package org.opendc.trace.wfformat -import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.* -import org.junit.jupiter.api.Test +import org.junit.jupiter.api.Assertions.assertAll import org.junit.jupiter.api.assertDoesNotThrow -import org.junit.jupiter.api.assertThrows +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader import org.opendc.trace.conv.* +import org.opendc.trace.testkit.TableReaderTestKit import java.nio.file.Paths /** * Test suite for the [WfFormatTraceFormat] class. */ +@DisplayName("WfFormat TraceFormat") class WfFormatTraceFormatTest { private val format = WfFormatTraceFormat() @@ -66,18 +69,18 @@ class WfFormatTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("makebwaindex_mammoth_mt_krause.fasta", reader.get(TASK_ID)) }, - { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals(172000, reader.get(TASK_RUNTIME).toMillis()) }, - { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) }, + { assertEquals("makebwaindex_mammoth_mt_krause.fasta", reader.getString(TASK_ID)) }, + { assertEquals("eager-nextflow-chameleon", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals(172000, reader.getDuration(TASK_RUNTIME)?.toMillis()) }, + { assertEquals(emptySet<String>(), reader.getSet(TASK_PARENTS, String::class.java)) }, ) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("makeseqdict_mammoth_mt_krause.fasta", reader.get(TASK_ID)) }, - { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals(175000, reader.get(TASK_RUNTIME).toMillis()) }, - { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.get(TASK_PARENTS)) }, + { assertEquals("makeseqdict_mammoth_mt_krause.fasta", reader.getString(TASK_ID)) }, + { assertEquals("eager-nextflow-chameleon", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals(175000, reader.getDuration(TASK_RUNTIME)?.toMillis()) }, + { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.getSet(TASK_PARENTS, String::class.java)) }, ) reader.close() @@ -98,4 +101,19 @@ class WfFormatTraceFormatTest { reader.close() } } + + @DisplayName("TableReader for Tasks") + @Nested + inner class TasksTableReaderTest : TableReaderTestKit() { + override lateinit var reader: TableReader + override lateinit var columns: List<TableColumn> + + @BeforeEach + fun setUp() { + val path = Paths.get("src/test/resources/trace.json") + + columns = format.getDetails(path, TABLE_TASKS).columns + reader = format.newReader(path, TABLE_TASKS, null) + } + } } diff --git a/opendc-trace/opendc-trace-wtf/build.gradle.kts b/opendc-trace/opendc-trace-wtf/build.gradle.kts index 35eb32e5..599087e1 100644 --- a/opendc-trace/opendc-trace-wtf/build.gradle.kts +++ b/opendc-trace/opendc-trace-wtf/build.gradle.kts @@ -32,5 +32,6 @@ dependencies { implementation(projects.opendcTrace.opendcTraceParquet) + testImplementation(projects.opendcTrace.opendcTraceTestkit) testRuntimeOnly(libs.slf4j.simple) } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt index f0db78b7..7d2005b2 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt @@ -24,8 +24,12 @@ package org.opendc.trace.wtf import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.util.convertTo import org.opendc.trace.util.parquet.LocalParquetReader import org.opendc.trace.wtf.parquet.Task +import java.time.Duration +import java.time.Instant +import java.util.* /** * A [TableReader] implementation for the WTF format. @@ -48,26 +52,39 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) } } - override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 + private val COL_ID = 0 + private val COL_WORKFLOW_ID = 1 + private val COL_SUBMIT_TIME = 2 + private val COL_WAIT_TIME = 3 + private val COL_RUNTIME = 4 + private val COL_REQ_NCPUS = 5 + private val COL_PARENTS = 6 + private val COL_CHILDREN = 7 + private val COL_GROUP_ID = 8 + private val COL_USER_ID = 9 - override fun isNull(index: Int): Boolean { - check(index in 0..columns.size) { "Invalid column index" } - return get(index) == null + private val TYPE_PARENTS = TableColumnType.Set(TableColumnType.String) + private val TYPE_CHILDREN = TableColumnType.Set(TableColumnType.String) + + override fun resolve(name: String): Int { + return when (name) { + TASK_ID -> COL_ID + TASK_WORKFLOW_ID -> COL_WORKFLOW_ID + TASK_SUBMIT_TIME -> COL_SUBMIT_TIME + TASK_WAIT_TIME -> COL_WAIT_TIME + TASK_RUNTIME -> COL_RUNTIME + TASK_REQ_NCPUS -> COL_REQ_NCPUS + TASK_PARENTS -> COL_PARENTS + TASK_CHILDREN -> COL_CHILDREN + TASK_GROUP_ID -> COL_GROUP_ID + TASK_USER_ID -> COL_USER_ID + else -> -1 + } } - override fun get(index: Int): Any? { - val record = checkNotNull(record) { "Reader in invalid state" } - return when (index) { - COL_ID -> record.id - COL_WORKFLOW_ID -> record.workflowId - COL_SUBMIT_TIME -> record.submitTime - COL_WAIT_TIME -> record.waitTime - COL_RUNTIME -> record.runtime - COL_REQ_NCPUS, COL_GROUP_ID, COL_USER_ID -> getInt(index) - COL_PARENTS -> record.parents - COL_CHILDREN -> record.children - else -> throw IllegalArgumentException("Invalid column") - } + override fun isNull(index: Int): Boolean { + require(index in COL_ID..COL_USER_ID) { "Invalid column index" } + return false } override fun getBoolean(index: Int): Boolean { @@ -89,35 +106,62 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) throw IllegalArgumentException("Invalid column") } + override fun getFloat(index: Int): Float { + throw IllegalArgumentException("Invalid column") + } + override fun getDouble(index: Int): Double { throw IllegalArgumentException("Invalid column") } - override fun close() { - reader.close() + override fun getString(index: Int): String { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + COL_ID -> record.id + COL_WORKFLOW_ID -> record.workflowId + else -> throw IllegalArgumentException("Invalid column") + } } - private val COL_ID = 0 - private val COL_WORKFLOW_ID = 1 - private val COL_SUBMIT_TIME = 2 - private val COL_WAIT_TIME = 3 - private val COL_RUNTIME = 4 - private val COL_REQ_NCPUS = 5 - private val COL_PARENTS = 6 - private val COL_CHILDREN = 7 - private val COL_GROUP_ID = 8 - private val COL_USER_ID = 9 + override fun getUUID(index: Int): UUID? { + throw IllegalArgumentException("Invalid column") + } - private val columns = mapOf( - TASK_ID to COL_ID, - TASK_WORKFLOW_ID to COL_WORKFLOW_ID, - TASK_SUBMIT_TIME to COL_SUBMIT_TIME, - TASK_WAIT_TIME to COL_WAIT_TIME, - TASK_RUNTIME to COL_RUNTIME, - TASK_REQ_NCPUS to COL_REQ_NCPUS, - TASK_PARENTS to COL_PARENTS, - TASK_CHILDREN to COL_CHILDREN, - TASK_GROUP_ID to COL_GROUP_ID, - TASK_USER_ID to COL_USER_ID, - ) + override fun getInstant(index: Int): Instant { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + COL_SUBMIT_TIME -> record.submitTime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getDuration(index: Int): Duration { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + COL_WAIT_TIME -> record.waitTime + COL_RUNTIME -> record.runtime + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + throw IllegalArgumentException("Invalid column") + } + + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + val record = checkNotNull(record) { "Reader in invalid state" } + return when (index) { + COL_PARENTS -> TYPE_PARENTS.convertTo(record.parents, elementType) + COL_CHILDREN -> TYPE_CHILDREN.convertTo(record.children, elementType) + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + throw IllegalArgumentException("Invalid column") + } + + override fun close() { + reader.close() + } } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index e71253ac..c8408626 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -46,24 +46,23 @@ public class WtfTraceFormat : TraceFormat { return when (table) { TABLE_TASKS -> TableDetails( listOf( - TASK_ID, - TASK_WORKFLOW_ID, - TASK_SUBMIT_TIME, - TASK_WAIT_TIME, - TASK_RUNTIME, - TASK_REQ_NCPUS, - TASK_PARENTS, - TASK_CHILDREN, - TASK_GROUP_ID, - TASK_USER_ID - ), - listOf(TASK_SUBMIT_TIME) + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), + TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), + TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_GROUP_ID, TableColumnType.Int), + TableColumn(TASK_USER_ID, TableColumnType.Int) + ) ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { + override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { return when (table) { TABLE_TASKS -> { val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection)) diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt index 8e7325de..a6087d9f 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt @@ -27,7 +27,6 @@ import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.* -import org.opendc.trace.TableColumn import org.opendc.trace.conv.* /** @@ -35,11 +34,11 @@ import org.opendc.trace.conv.* * * @param projection The projection of the table to read. */ -internal class TaskReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Task>() { +internal class TaskReadSupport(private val projection: List<String>?) : ReadSupport<Task>() { /** * Mapping of table columns to their Parquet column names. */ - private val colMap = mapOf<TableColumn<*>, String>( + private val colMap = mapOf( TASK_ID to "id", TASK_WORKFLOW_ID to "workflow_id", TASK_SUBMIT_TIME to "ts_submit", diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index c0eb3f08..f6b821c2 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -22,10 +22,13 @@ package org.opendc.trace.wtf +import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.* -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows +import org.junit.jupiter.api.Assertions.assertAll +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader import org.opendc.trace.conv.* +import org.opendc.trace.testkit.TableReaderTestKit import java.nio.file.Paths import java.time.Duration import java.time.Instant @@ -33,6 +36,7 @@ import java.time.Instant /** * Test suite for the [WtfTraceFormat] class. */ +@DisplayName("WTF TraceFormat") class WtfTraceFormatTest { private val format = WtfTraceFormat() @@ -65,36 +69,47 @@ class WtfTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("362334516345962206", reader.get(TASK_ID)) }, - { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals(Instant.ofEpochMilli(245604), reader.get(TASK_SUBMIT_TIME)) }, - { assertEquals(Duration.ofMillis(8163), reader.get(TASK_RUNTIME)) }, + { assertEquals("362334516345962206", reader.getString(TASK_ID)) }, + { assertEquals("1078341553348591493", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals(Instant.ofEpochMilli(245604), reader.getInstant(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofMillis(8163), reader.getDuration(TASK_RUNTIME)) }, { assertEquals( setOf("584055316413447529", "133113685133695608", "1008582348422865408"), - reader.get( - TASK_PARENTS - ) + reader.getSet(TASK_PARENTS, String::class.java) ) }, ) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("502010169100446658", reader.get(TASK_ID)) }, - { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) }, - { assertEquals(Instant.ofEpochMilli(251325), reader.get(TASK_SUBMIT_TIME)) }, - { assertEquals(Duration.ofMillis(8216), reader.get(TASK_RUNTIME)) }, + { assertEquals("502010169100446658", reader.getString(TASK_ID)) }, + { assertEquals("1078341553348591493", reader.getString(TASK_WORKFLOW_ID)) }, + { assertEquals(Instant.ofEpochMilli(251325), reader.getInstant(TASK_SUBMIT_TIME)) }, + { assertEquals(Duration.ofMillis(8216), reader.getDuration(TASK_RUNTIME)) }, { assertEquals( setOf("584055316413447529", "133113685133695608", "1008582348422865408"), - reader.get( - TASK_PARENTS - ) + reader.getSet(TASK_PARENTS, String::class.java) ) }, ) reader.close() } + + @DisplayName("TableReader for Tasks") + @Nested + inner class TasksTableReaderTest : TableReaderTestKit() { + override lateinit var reader: TableReader + override lateinit var columns: List<TableColumn> + + @BeforeEach + fun setUp() { + val path = Paths.get("src/test/resources/wtf-trace") + + columns = format.getDetails(path, TABLE_TASKS).columns + reader = format.newReader(path, TABLE_TASKS, null) + } + } } diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt index 3aa4463c..5f57723b 100644 --- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt @@ -49,16 +49,16 @@ public fun Trace.toJobs(): List<Job> { try { while (reader.nextRow()) { // Bag of tasks without workflow ID all share the same workflow - val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.get(TASK_WORKFLOW_ID).toLong() else 0L + val workflowId = if (reader.resolve(TASK_WORKFLOW_ID) != -1) reader.getString(TASK_WORKFLOW_ID)!!.toLong() else 0L val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) } - val id = reader.get(TASK_ID).toLong() - val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS)) + val id = reader.getString(TASK_ID)!!.toLong() + val grantedCpus = if (reader.resolve(TASK_ALLOC_NCPUS) != 0) reader.getInt(TASK_ALLOC_NCPUS) else reader.getInt(TASK_REQ_NCPUS) - val submitTime = reader.get(TASK_SUBMIT_TIME) - val runtime = reader.get(TASK_RUNTIME) + val submitTime = reader.getInstant(TASK_SUBMIT_TIME)!! + val runtime = reader.getDuration(TASK_RUNTIME)!! val flops: Long = 4000 * runtime.seconds * grantedCpus val workload = SimFlopsWorkload(flops) val task = Task( @@ -73,7 +73,7 @@ public fun Trace.toJobs(): List<Job> { ) tasks[id] = task - taskDependencies[task] = reader.get(TASK_PARENTS).map { it.toLong() }.toSet() + taskDependencies[task] = reader.getSet(TASK_PARENTS, String::class.java)!!.map { it.toLong() }.toSet() (workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) } (workflow.tasks as MutableSet<Task>).add(task) diff --git a/settings.gradle.kts b/settings.gradle.kts index 88c3ffb6..170267a5 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -50,6 +50,7 @@ include(":opendc-simulator:opendc-simulator-power") include(":opendc-simulator:opendc-simulator-network") include(":opendc-simulator:opendc-simulator-compute") include(":opendc-trace:opendc-trace-api") +include(":opendc-trace:opendc-trace-testkit") include(":opendc-trace:opendc-trace-gwf") include(":opendc-trace:opendc-trace-swf") include(":opendc-trace:opendc-trace-wtf") |
