summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt16
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt13
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt50
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumnType.kt95
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt222
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt197
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt11
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt17
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt28
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt36
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt40
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt7
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt6
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt5
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt53
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt77
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt73
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt87
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt21
-rw-r--r--opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt6
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt88
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt29
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt110
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt57
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt33
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt2
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt4
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt45
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt59
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt96
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt83
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt19
-rw-r--r--opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt20
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt60
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt55
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt96
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt73
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt97
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt73
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt33
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt4
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt4
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt46
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt93
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt25
-rw-r--r--opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt4
-rw-r--r--opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt48
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt78
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt17
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt6
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt16
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt126
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt25
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt5
-rw-r--r--opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt24
-rw-r--r--opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt12
56 files changed, 1857 insertions, 868 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
index 720c7e58..12c2325a 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt
@@ -29,8 +29,6 @@ import org.opendc.trace.*
import org.opendc.trace.conv.*
import java.io.File
import java.lang.ref.SoftReference
-import java.time.Duration
-import java.time.Instant
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import kotlin.math.max
@@ -68,9 +66,9 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
return try {
while (reader.nextRow()) {
- val id = reader.get(idCol) as String
- val time = reader.get(timestampCol) as Instant
- val duration = reader.get(durationCol) as Duration
+ val id = reader.getString(idCol)!!
+ val time = reader.getInstant(timestampCol)!!
+ val duration = reader.getDuration(durationCol)!!
val cores = reader.getInt(coresCol)
val cpuUsage = reader.getDouble(usageCol)
@@ -105,13 +103,13 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
return try {
while (reader.nextRow()) {
- val id = reader.get(idCol) as String
+ val id = reader.getString(idCol)!!
if (!fragments.containsKey(id)) {
continue
}
- val submissionTime = reader.get(startTimeCol) as Instant
- val endTime = reader.get(stopTimeCol) as Instant
+ val submissionTime = reader.getInstant(startTimeCol)!!
+ val endTime = reader.getInstant(stopTimeCol)!!
val cpuCount = reader.getInt(cpuCountCol)
val cpuCapacity = reader.getDouble(cpuCapacityCol)
val memCapacity = reader.getDouble(memCol) / 1000.0 // Convert from KB to MB
@@ -162,7 +160,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
while (reader.nextRow()) {
@Suppress("UNCHECKED_CAST")
- val members = reader.get(membersCol) as Set<String>
+ val members = reader.getSet(membersCol, String::class.java)!!
val target = reader.getDouble(targetCol)
val score = reader.getDouble(scoreCol)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
index 05d0234a..e6e97706 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
@@ -32,21 +32,16 @@ public interface Table {
public val name: String
/**
- * The list of columns supported in this table.
+ * The columns in this table.
*/
- public val columns: List<TableColumn<*>>
-
- /**
- * The columns by which the table is partitioned.
- */
- public val partitionKeys: List<TableColumn<*>>
+ public val columns: List<TableColumn>
/**
* Open a [TableReader] for a projection of this table.
*
- * @param projection The list of columns to fetch from the table or `null` if no projection is performed.
+ * @param projection The names of the columns to fetch from the table or `null` if no projection is performed.
*/
- public fun newReader(projection: List<TableColumn<*>>? = null): TableReader
+ public fun newReader(projection: List<String>? = null): TableReader
/**
* Open a [TableWriter] for this table.
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt
index b77a2982..0f75d890 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt
@@ -22,47 +22,15 @@
package org.opendc.trace
-import java.util.*
-
/**
- * A column in a trace table.
+ * A column in a [Table].
*
- * @param name The universal name of this column.
+ * @property name The universal name of this column.
+ * @property type The type of the column.
+ * @property isNullable A flag to indicate that the column is nullable.
*/
-public class TableColumn<out T>(public val name: String, type: Class<T>) {
- /**
- * The type of the column.
- */
- public val type: Class<*> = type
-
- /**
- * Determine whether the type of the column is a subtype of [column].
- */
- public fun isAssignableTo(column: TableColumn<*>): Boolean {
- return name == column.name && type.isAssignableFrom(column.type)
- }
-
- /**
- * Compute a hash code for this column.
- */
- public override fun hashCode(): Int = Objects.hash(name, type)
-
- /**
- * Determine whether this column is equal to [other].
- */
- public override fun equals(other: Any?): Boolean {
- // Fast-path: reference equality
- if (this === other) {
- return true
- } else if (other == null || other !is TableColumn<*>) {
- return false
- }
-
- return name == other.name && type == other.type
- }
-
- /**
- * Return a string representation of this column.
- */
- public override fun toString(): String = "TableColumn[$name,$type]"
-}
+public data class TableColumn(
+ public val name: String,
+ public val type: TableColumnType,
+ public val isNullable: Boolean = false
+)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumnType.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumnType.kt
new file mode 100644
index 00000000..6cae47f9
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumnType.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace
+
+/**
+ * The type of a [TableColumn].
+ */
+public sealed class TableColumnType {
+ /**
+ * A column of booleans.
+ */
+ public object Boolean : TableColumnType()
+
+ /**
+ * A column of 32-bit integers.
+ */
+ public object Int : TableColumnType()
+
+ /**
+ * A column of 64-bit integers.
+ */
+ public object Long : TableColumnType()
+
+ /**
+ * A column of 32-bit floats.
+ */
+ public object Float : TableColumnType()
+
+ /**
+ * A column of 64-bit floats.
+ */
+ public object Double : TableColumnType()
+
+ /**
+ * A column of UUIDs.
+ */
+ public object UUID : TableColumnType()
+
+ /**
+ * A column of variable-length strings.
+ */
+ public object String : TableColumnType()
+
+ /**
+ * A column of timestamps, mapping to [java.time.Instant].
+ */
+ public object Instant : TableColumnType()
+
+ /**
+ * A column of durations, mapping to [java.time.Duration]
+ */
+ public object Duration : TableColumnType()
+
+ /**
+ * A column containing embedded lists.
+ *
+ * @property elementType The type of the elements in the list.
+ */
+ public data class List(public val elementType: TableColumnType) : TableColumnType()
+
+ /**
+ * A column containing embedded sets.
+ *
+ * @property elementType The type of the elements in the sets.
+ */
+ public data class Set(public val elementType: TableColumnType) : TableColumnType()
+
+ /**
+ * A column containing embedded maps.
+ *
+ * @property keyType The type of the key.
+ * @property valueType The type of the value.
+ */
+ public data class Map(public val keyType: TableColumnType, public val valueType: TableColumnType) : TableColumnType()
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt
index 8a796e6c..42b1c690 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt
@@ -22,8 +22,12 @@
package org.opendc.trace
+import java.time.Duration
+import java.time.Instant
+import java.util.UUID
+
/**
- * Base class for reading entities from a workload trace table in streaming fashion.
+ * Base class for reading entities from a [Table] in streaming fashion.
*/
public interface TableReader : AutoCloseable {
/**
@@ -34,20 +38,15 @@ public interface TableReader : AutoCloseable {
public fun nextRow(): Boolean
/**
- * Resolve the index of the specified [column] for this reader.
+ * Resolve the index of the column by its [name].
*
- * @param column The column to lookup.
+ * @param name The name of the column to lookup.
* @return The zero-based index of the column or a negative value if the column is not present in this table.
*/
- public fun resolve(column: TableColumn<*>): Int
-
- /**
- * Determine whether the [TableReader] supports the specified [column].
- */
- public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0
+ public fun resolve(name: String): Int
/**
- * Determine whether the specified [column] has a `null` value for the current row.
+ * Determine whether the column with the specified [index] has a `null` value for the current row.
*
* @param index The zero-based index of the column to check for a null value.
* @throws IllegalArgumentException if the column index is not valid for this reader.
@@ -56,15 +55,6 @@ public interface TableReader : AutoCloseable {
public fun isNull(index: Int): Boolean
/**
- * Obtain the object value of the column with the specified [index].
- *
- * @param index The zero-based index of the column to obtain the value for.
- * @throws IllegalArgumentException if the column index is not valid for this reader.
- * @return The object value of the column.
- */
- public fun get(index: Int): Any?
-
- /**
* Obtain the boolean value of the column with the specified [index].
*
* @param index The zero-based index of the column to obtain the value for.
@@ -74,7 +64,7 @@ public interface TableReader : AutoCloseable {
public fun getBoolean(index: Int): Boolean
/**
- * Obtain the integer value of the column with the specified [index].
+ * Obtain the boolean value of the column with the specified [index].
*
* @param index The zero-based index of the column to obtain the value for.
* @throws IllegalArgumentException if the column index is not valid for this reader.
@@ -92,6 +82,15 @@ public interface TableReader : AutoCloseable {
public fun getLong(index: Int): Long
/**
+ * Obtain the float value of the column with the specified [index].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The float value of the column or [Float.NaN] if the column is `null`.
+ */
+ public fun getFloat(index: Int): Float
+
+ /**
* Obtain the double value of the column with the specified [index].
*
* @param index The zero-based index of the column to obtain the value for.
@@ -101,62 +100,193 @@ public interface TableReader : AutoCloseable {
public fun getDouble(index: Int): Double
/**
- * Determine whether the specified [column] has a `null` value for the current row.
+ * Obtain the [String] value of the column with the specified [index].
*
- * @param column The column to lookup.
- * @throws IllegalArgumentException if the column is not valid for this table.
- * @return `true` if the column value for the current value has a `null` value, `false` otherwise.
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The [String] value of the column or `null` if the column is `null`.
*/
- public fun isNull(column: TableColumn<*>): Boolean = isNull(resolve(column))
+ public fun getString(index: Int): String?
/**
- * Obtain the value of the current column with type [T].
+ * Obtain the [UUID] value of the column with the specified [index].
*
- * @param column The column to obtain the value for.
- * @throws IllegalArgumentException if the column is not valid for this reader.
- * @return The object value of the column.
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The [UUID] value of the column or `null` if the column is `null`.
+ */
+ public fun getUUID(index: Int): UUID?
+
+ /**
+ * Obtain the [Instant] value of the column with the specified [index].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The [Instant] value of the column or `null` if the column is `null`.
+ */
+ public fun getInstant(index: Int): Instant?
+
+ /**
+ * Obtain the [Duration] value of the column with the specified [index].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @throws IllegalArgumentException if the column index is not valid for this reader.
+ * @return The [Duration] value of the column or `null` if the column is `null`.
+ */
+ public fun getDuration(index: Int): Duration?
+
+ /**
+ * Obtain the value of the column with the specified [index] as [List].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @param elementType Class representing the Java data type to convert elements of the designated column to.
+ * @throws IllegalArgumentException if the column index is not valid for this reader or this type.
+ * @return The value of the column as `List` or `null` if the column is null.
+ */
+ public fun <T> getList(index: Int, elementType: Class<T>): List<T>?
+
+ /**
+ * Obtain the value of the column with the specified [index] as [Set].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @param elementType Class representing the Java data type to convert elements of the designated column to.
+ * @throws IllegalArgumentException if the column index is not valid for this reader or this type.
+ * @return The value of the column as `Set` or `null` if the column is null.
+ */
+ public fun <T> getSet(index: Int, elementType: Class<T>): Set<T>?
+
+ /**
+ * Obtain the value of the column with the specified [index] as [Set].
+ *
+ * @param index The zero-based index of the column to obtain the value for.
+ * @param keyType Class representing the Java data type to convert the keys of the designated column to.
+ * @param valueType Class representing the Java data type to convert the values of the designated column to.
+ * @throws IllegalArgumentException if the column index is not valid for this reader or this type.
+ * @return The value of the column as `Map` or `null` if the column is null.
+ */
+ public fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>?
+
+ /**
+ * Determine whether a column named [name] has a `null` value for the current row.
+ *
+ * @param name The name of the column to lookup.
+ * @throws IllegalArgumentException if the column is not valid for this table.
+ * @return `true` if the column value for the current value has a `null` value, `false` otherwise.
*/
- public fun <T> get(column: TableColumn<T>): T {
- // This cast should always succeed since the resolve the index of the typed column
- @Suppress("UNCHECKED_CAST")
- return get(resolve(column)) as T
- }
+ public fun isNull(name: String): Boolean = isNull(resolve(name))
/**
- * Read the specified [column] as boolean.
+ * Read the column named [name] as boolean.
*
- * @param column The column to obtain the value for.
+ * @param name The name of the column to get the value for.
* @throws IllegalArgumentException if the column is not valid for this reader.
* @return The boolean value of the column or `false` if the column is `null`.
*/
- public fun getBoolean(column: TableColumn<Boolean>): Boolean = getBoolean(resolve(column))
+ public fun getBoolean(name: String): Boolean = getBoolean(resolve(name))
/**
- * Read the specified [column] as integer.
+ * Read the column named [name] as integer.
*
- * @param column The column to obtain the value for.
+ * @param name The name of the column to get the value for.
* @throws IllegalArgumentException if the column is not valid for this reader.
* @return The integer value of the column or `0` if the column is `null`.
*/
- public fun getInt(column: TableColumn<Int>): Int = getInt(resolve(column))
+ public fun getInt(name: String): Int = getInt(resolve(name))
/**
- * Read the specified [column] as long.
+ * Read the column named [name] as long.
*
- * @param column The column to obtain the value for.
+ * @param name The name of the column to get the value for
* @throws IllegalArgumentException if the column is not valid for this reader.
* @return The long value of the column or `0` if the column is `null`.
*/
- public fun getLong(column: TableColumn<Long>): Long = getLong(resolve(column))
+ public fun getLong(name: String): Long = getLong(resolve(name))
+
+ /**
+ * Read the column named [name] as float.
+ *
+ * @param name The name of the column to get the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The float value of the column or [Float.NaN] if the column is `null`.
+ */
+ public fun getFloat(name: String): Float = getFloat(resolve(name))
/**
- * Read the specified [column] as double.
+ * Read the column named [name] as double.
*
- * @param column The column to obtain the value for.
+ * @param name The name of the column to get the value for.
* @throws IllegalArgumentException if the column is not valid for this reader.
* @return The double value of the column or [Double.NaN] if the column is `null`.
*/
- public fun getDouble(column: TableColumn<Double>): Double = getDouble(resolve(column))
+ public fun getDouble(name: String): Double = getDouble(resolve(name))
+
+ /**
+ * Read the column named [name] as [String].
+ *
+ * @param name The name of the column to get the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The [String] value of the column or `null` if the column is `null`.
+ */
+ public fun getString(name: String): String? = getString(resolve(name))
+
+ /**
+ * Read the column named [name] as [UUID].
+ *
+ * @param name The name of the column to get the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The [UUID] value of the column or `null` if the column is `null`.
+ */
+ public fun getUUID(name: String): UUID? = getUUID(resolve(name))
+
+ /**
+ * Read the column named [name] as [Instant].
+ *
+ * @param name The name of the column to get the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The [Instant] value of the column or `null` if the column is `null`.
+ */
+ public fun getInstant(name: String): Instant? = getInstant(resolve(name))
+
+ /**
+ * Read the column named [name] as [Duration].
+ *
+ * @param name The name of the column to get the value for.
+ * @throws IllegalArgumentException if the column is not valid for this reader.
+ * @return The [Duration] value of the column or `null` if the column is `null`.
+ */
+ public fun getDuration(name: String): Duration? = getDuration(resolve(name))
+
+ /**
+ * Obtain the value of the column named [name] as [List].
+ *
+ * @param name The name of the column to get the value for.
+ * @param elementType Class representing the Java data type to convert elements of the designated column to.
+ * @throws IllegalArgumentException if the column index is not valid for this reader or this type.
+ * @return The value of the column as `List` or `null` if the column is null.
+ */
+ public fun <T> getList(name: String, elementType: Class<T>): List<T>? = getList(resolve(name), elementType)
+
+ /**
+ * Obtain the value of the column named [name] as [Set].
+ *
+ * @param name The name of the column to get the value for.
+ * @param elementType Class representing the Java data type to convert elements of the designated column to.
+ * @throws IllegalArgumentException if the column index is not valid for this reader or this type.
+ * @return The value of the column as `Set` or `null` if the column is null.
+ */
+ public fun <T> getSet(name: String, elementType: Class<T>): Set<T>? = getSet(resolve(name), elementType)
+
+ /**
+ * Obtain the value of the column named [name] as [Set].
+ *
+ * @param name The name of the column to get the value for.
+ * @param keyType Class representing the Java data type to convert the keys of the designated column to.
+ * @param valueType Class representing the Java data type to convert the values of the designated column to.
+ * @throws IllegalArgumentException if the column index is not valid for this reader or this type.
+ * @return The value of the column as `Map` or `null` if the column is null.
+ */
+ public fun <K, V> getMap(name: String, keyType: Class<K>, valueType: Class<V>): Map<K, V>? =
+ getMap(resolve(name), keyType, valueType)
/**
* Closes the reader so that no further iteration or data access can be made.
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt
index 423ce86a..a3122ec9 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt
@@ -22,6 +22,10 @@
package org.opendc.trace
+import java.time.Duration
+import java.time.Instant
+import java.util.*
+
/**
* Base class for writing workload traces.
*/
@@ -37,107 +41,228 @@ public interface TableWriter : AutoCloseable {
public fun endRow()
/**
- * Resolve the index of the specified [column] for this writer.
+ * Resolve the index of the column named [name] for this writer.
*
- * @param column The column to lookup.
+ * @param name The name of the column to lookup.
* @return The zero-based index of the column or a negative value if the column is not present in this table.
*/
- public fun resolve(column: TableColumn<*>): Int
+ public fun resolve(name: String): Int
/**
- * Determine whether the [TableReader] supports the specified [column].
+ * Set the column with index [index] to boolean [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The boolean value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun hasColumn(column: TableColumn<*>): Boolean = resolve(column) >= 0
+ public fun setBoolean(index: Int, value: Boolean)
/**
- * Set [column] to [value].
+ * Set the column with index [index] to integer [value].
*
* @param index The zero-based index of the column to set the value for.
- * @param value The value to set the column to.
+ * @param value The integer value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun set(index: Int, value: Any)
+ public fun setInt(index: Int, value: Int)
/**
- * Set [column] to boolean [value].
+ * Set the column with index [index] to long [value].
*
* @param index The zero-based index of the column to set the value for.
- * @param value The boolean value to set the column to.
+ * @param value The long value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setBoolean(index: Int, value: Boolean)
+ public fun setLong(index: Int, value: Long)
/**
- * Set [column] to integer [value].
+ * Set the column with index [index] to float [value].
*
* @param index The zero-based index of the column to set the value for.
- * @param value The integer value to set the column to.
+ * @param value The float value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setInt(index: Int, value: Int)
+ public fun setFloat(index: Int, value: Float)
/**
- * Set [column] to long [value].
+ * Set the column with index [index] to double [value].
*
* @param index The zero-based index of the column to set the value for.
- * @param value The long value to set the column to.
+ * @param value The double value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setLong(index: Int, value: Long)
+ public fun setDouble(index: Int, value: Double)
/**
- * Set [column] to double [value].
+ * Set the column with index [index] to [String] [value].
*
* @param index The zero-based index of the column to set the value for.
- * @param value The double value to set the column to.
+ * @param value The [String] value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setDouble(index: Int, value: Double)
+ public fun setString(index: Int, value: String)
+
+ /**
+ * Set the column with index [index] to [UUID] [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The [UUID] value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setUUID(index: Int, value: UUID)
+
+ /**
+ * Set the column with index [index] to [Instant] [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The [Instant] value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setInstant(index: Int, value: Instant)
/**
- * Set [column] to [value].
+ * Set the column with index [index] to [Duration] [value].
*
- * @param column The column to set the value for.
- * @param value The value to set the column to.
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The [Duration] value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun <T : Any> set(column: TableColumn<T>, value: T): Unit = set(resolve(column), value)
+ public fun setDuration(index: Int, value: Duration)
/**
- * Set [column] to boolean [value].
+ * Set the column with index [index] to [List] [value].
*
- * @param column The column to set the value for.
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The [Map] value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun <T> setList(index: Int, value: List<T>)
+
+ /**
+ * Set the column with index [index] to [Set] [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The [Set] value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun <T> setSet(index: Int, value: Set<T>)
+
+ /**
+ * Set the column with index [index] to [Map] [value].
+ *
+ * @param index The zero-based index of the column to set the value for.
+ * @param value The [Map] value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun <K, V> setMap(index: Int, value: Map<K, V>)
+
+ /**
+ * Set the column named [name] to boolean [value].
+ *
+ * @param name The name of the column to set the value for.
* @param value The boolean value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setBoolean(column: TableColumn<Boolean>, value: Boolean): Unit = setBoolean(resolve(column), value)
+ public fun setBoolean(name: String, value: Boolean): Unit = setBoolean(resolve(name), value)
/**
- * Set [column] to integer [value].
+ * Set the column named [name] to integer [value].
*
- * @param column The column to set the value for.
+ * @param name The name of the column to set the value for.
* @param value The integer value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setInt(column: TableColumn<Int>, value: Int): Unit = setInt(resolve(column), value)
+ public fun setInt(name: String, value: Int): Unit = setInt(resolve(name), value)
/**
- * Set [column] to long [value].
+ * Set the column named [name] to long [value].
*
- * @param column The column to set the value for.
+ * @param name The name of the column to set the value for.
* @param value The long value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setLong(column: TableColumn<Long>, value: Long): Unit = setLong(resolve(column), value)
+ public fun setLong(name: String, value: Long): Unit = setLong(resolve(name), value)
+
+ /**
+ * Set the column named [name] to float [value].
+ *
+ * @param name The name of the column to set the value for.
+ * @param value The float value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setFloat(name: String, value: Float): Unit = setFloat(resolve(name), value)
/**
- * Set [column] to double [value].
+ * Set the column named [name] to double [value].
*
- * @param column The column to set the value for.
+ * @param name The name of the column to set the value for.
* @param value The double value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setDouble(column: TableColumn<Double>, value: Double): Unit = setDouble(resolve(column), value)
+ public fun setDouble(name: String, value: Double): Unit = setDouble(resolve(name), value)
+
+ /**
+ * Set the column named [name] to [String] [value].
+ *
+ * @param name The name of the column to set the value for.
+ * @param value The [String] value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setString(name: String, value: String): Unit = setString(resolve(name), value)
+
+ /**
+ * Set the column named [name] to [UUID] [value].
+ *
+ * @param name The name of the column to set the value for.
+ * @param value The [UUID] value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setUUID(name: String, value: UUID): Unit = setUUID(resolve(name), value)
+
+ /**
+ * Set the column named [name] to [Instant] [value].
+ *
+ * @param name The name of the column to set the value for.
+ * @param value The [Instant] value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setInstant(name: String, value: Instant): Unit = setInstant(resolve(name), value)
+
+ /**
+ * Set the column named [name] to [Duration] [value].
+ *
+ * @param name The name of the column to set the value for.
+ * @param value The [Duration] value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun setDuration(name: String, value: Duration): Unit = setDuration(resolve(name), value)
+
+ /**
+ * Set the column named [name] to [List] [value].
+ *
+ * @param name The name of the column to set the value for.
+ * @param value The [List] value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun <T> setList(name: String, value: List<T>): Unit = setList(resolve(name), value)
+
+ /**
+ * Set the column named [name] to [Set] [value].
+ *
+ * @param name The name of the column to set the value for.
+ * @param value The [Set] value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun <T> setSet(name: String, value: Set<T>): Unit = setSet(resolve(name), value)
+
+ /**
+ * Set the column named [name] to [Map] [value].
+ *
+ * @param name The name of the column to set the value for.
+ * @param value The [Map] value to set the column to.
+ * @throws IllegalArgumentException if the column is not valid for this method.
+ */
+ public fun <K, V> setMap(name: String, value: Map<K, V>): Unit = setMap(resolve(name), value)
/**
* Flush any buffered content to the underlying target.
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt
index 5e8859e4..2a80687c 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt
@@ -23,22 +23,17 @@
@file:JvmName("InterferenceGroupColumns")
package org.opendc.trace.conv
-import org.opendc.trace.TableColumn
-
/**
* Members of the interference group.
*/
-@JvmField
-public val INTERFERENCE_GROUP_MEMBERS: TableColumn<Set<String>> = column("members")
+public const val INTERFERENCE_GROUP_MEMBERS: String = "members"
/**
* Target load after which the interference occurs.
*/
-@JvmField
-public val INTERFERENCE_GROUP_TARGET: TableColumn<Double> = column("target")
+public const val INTERFERENCE_GROUP_TARGET: String = "target"
/**
* Performance score when the interference occurs.
*/
-@JvmField
-public val INTERFERENCE_GROUP_SCORE: TableColumn<Double> = column("score")
+public const val INTERFERENCE_GROUP_SCORE: String = "score"
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
index e602e534..9b7b8195 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
@@ -23,47 +23,44 @@
@file:JvmName("ResourceColumns")
package org.opendc.trace.conv
-import org.opendc.trace.TableColumn
-import java.time.Instant
-
/**
* Identifier of the resource.
*/
@JvmField
-public val RESOURCE_ID: TableColumn<String> = column("id")
+public val RESOURCE_ID: String = "id"
/**
* The cluster to which the resource belongs.
*/
@JvmField
-public val RESOURCE_CLUSTER_ID: TableColumn<String> = column("cluster_id")
+public val RESOURCE_CLUSTER_ID: String = "cluster_id"
/**
* Start time for the resource.
*/
@JvmField
-public val RESOURCE_START_TIME: TableColumn<Instant> = column("start_time")
+public val RESOURCE_START_TIME: String = "start_time"
/**
* End time for the resource.
*/
@JvmField
-public val RESOURCE_STOP_TIME: TableColumn<Instant> = column("stop_time")
+public val RESOURCE_STOP_TIME: String = "stop_time"
/**
* Number of CPUs for the resource.
*/
@JvmField
-public val RESOURCE_CPU_COUNT: TableColumn<Int> = column("cpu_count")
+public val RESOURCE_CPU_COUNT: String = "cpu_count"
/**
* Total CPU capacity of the resource in MHz.
*/
@JvmField
-public val RESOURCE_CPU_CAPACITY: TableColumn<Double> = column("cpu_capacity")
+public val RESOURCE_CPU_CAPACITY: String = "cpu_capacity"
/**
* Memory capacity for the resource in KB.
*/
@JvmField
-public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = column("mem_capacity")
+public val RESOURCE_MEM_CAPACITY: String = "mem_capacity"
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt
index 3a44f817..e9dd7673 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt
@@ -23,78 +23,74 @@
@file:JvmName("ResourceStateColumns")
package org.opendc.trace.conv
-import org.opendc.trace.TableColumn
-import java.time.Duration
-import java.time.Instant
-
/**
* The timestamp at which the state was recorded.
*/
@JvmField
-public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = column("timestamp")
+public val RESOURCE_STATE_TIMESTAMP: String = "timestamp"
/**
* Duration for the state.
*/
@JvmField
-public val RESOURCE_STATE_DURATION: TableColumn<Duration> = column("duration")
+public val RESOURCE_STATE_DURATION: String = "duration"
/**
* A flag to indicate that the resource is powered on.
*/
@JvmField
-public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = column("powered_on")
+public val RESOURCE_STATE_POWERED_ON: String = "powered_on"
/**
* Total CPU usage of the resource in MHz.
*/
@JvmField
-public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = column("cpu_usage")
+public val RESOURCE_STATE_CPU_USAGE: String = "cpu_usage"
/**
* Total CPU usage of the resource in percentage.
*/
@JvmField
-public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = column("cpu_usage_pct")
+public val RESOURCE_STATE_CPU_USAGE_PCT: String = "cpu_usage_pct"
/**
* Total CPU demand of the resource in MHz.
*/
@JvmField
-public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = column("cpu_demand")
+public val RESOURCE_STATE_CPU_DEMAND: String = "cpu_demand"
/**
* CPU ready percentage.
*/
@JvmField
-public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = column("cpu_ready_pct")
+public val RESOURCE_STATE_CPU_READY_PCT: String = "cpu_ready_pct"
/**
* Memory usage of the resource in KB.
*/
@JvmField
-public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = column("mem_usage")
+public val RESOURCE_STATE_MEM_USAGE: String = "mem_usage"
/**
* Disk read throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = column("disk_read")
+public val RESOURCE_STATE_DISK_READ: String = "disk_read"
/**
* Disk write throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = column("disk_write")
+public val RESOURCE_STATE_DISK_WRITE: String = "disk_write"
/**
* Network receive throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_NET_RX: TableColumn<Double> = column("net_rx")
+public val RESOURCE_STATE_NET_RX: String = "net_rx"
/**
* Network transmit throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_NET_TX: TableColumn<Double> = column("net_tx")
+public val RESOURCE_STATE_NET_TX: String = "net_tx"
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt
deleted file mode 100644
index a58505e9..00000000
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-@file:JvmName("TableColumns")
-package org.opendc.trace.conv
-
-import org.opendc.trace.TableColumn
-
-/**
- * Construct a [TableColumn] with the specified [name] and type [T].
- */
-public inline fun <reified T> column(name: String): TableColumn<T> = column(name, T::class.java)
-
-/**
- * Construct a [TableColumn] with the specified [name] and [type].
- */
-public fun <T> column(name: String, type: Class<T>): TableColumn<T> = TableColumn(name, type)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
index e6daafb7..da5c343f 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
@@ -23,78 +23,62 @@
@file:JvmName("TaskColumns")
package org.opendc.trace.conv
-import org.opendc.trace.TableColumn
-import java.time.Duration
-import java.time.Instant
-
/**
* A column containing the task identifier.
*/
-@JvmField
-public val TASK_ID: TableColumn<String> = column("id")
+public const val TASK_ID: String = "id"
/**
* A column containing the identifier of the workflow.
*/
-@JvmField
-public val TASK_WORKFLOW_ID: TableColumn<String> = column("workflow_id")
+public const val TASK_WORKFLOW_ID: String = "workflow_id"
/**
* A column containing the submission time of the task.
*/
-@JvmField
-public val TASK_SUBMIT_TIME: TableColumn<Instant> = column("submit_time")
+public const val TASK_SUBMIT_TIME: String = "submit_time"
/**
* A column containing the wait time of the task.
*/
-@JvmField
-public val TASK_WAIT_TIME: TableColumn<Instant> = column("wait_time")
+public const val TASK_WAIT_TIME: String = "wait_time"
/**
* A column containing the runtime time of the task.
*/
-@JvmField
-public val TASK_RUNTIME: TableColumn<Duration> = column("runtime")
+public const val TASK_RUNTIME: String = "runtime"
/**
* A column containing the parents of a task.
*/
-@JvmField
-public val TASK_PARENTS: TableColumn<Set<String>> = column("parents")
+public const val TASK_PARENTS: String = "parents"
/**
* A column containing the children of a task.
*/
-@JvmField
-public val TASK_CHILDREN: TableColumn<Set<String>> = column("children")
+public const val TASK_CHILDREN: String = "children"
/**
* A column containing the requested CPUs of a task.
*/
-@JvmField
-public val TASK_REQ_NCPUS: TableColumn<Int> = column("req_ncpus")
+public const val TASK_REQ_NCPUS: String = "req_ncpus"
/**
* A column containing the allocated CPUs of a task.
*/
-@JvmField
-public val TASK_ALLOC_NCPUS: TableColumn<Int> = column("alloc_ncpus")
+public const val TASK_ALLOC_NCPUS: String = "alloc_ncpus"
/**
* A column containing the status of a task.
*/
-@JvmField
-public val TASK_STATUS: TableColumn<Int> = column("status")
+public const val TASK_STATUS: String = "status"
/**
* A column containing the group id of a task.
*/
-@JvmField
-public val TASK_GROUP_ID: TableColumn<Int> = column("group_id")
+public const val TASK_GROUP_ID: String = "group_id"
/**
* A column containing the user id of a task.
*/
-@JvmField
-public val TASK_USER_ID: TableColumn<Int> = column("user_id")
+public const val TASK_USER_ID: String = "user_id"
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt
index b848e19a..1e1bf676 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt
@@ -37,13 +37,10 @@ internal class TableImpl(val trace: TraceImpl, override val name: String) : Tabl
*/
private val details = trace.format.getDetails(trace.path, name)
- override val columns: List<TableColumn<*>>
+ override val columns: List<TableColumn>
get() = details.columns
- override val partitionKeys: List<TableColumn<*>>
- get() = details.partitionKeys
-
- override fun newReader(projection: List<TableColumn<*>>?): TableReader {
+ override fun newReader(projection: List<String>?): TableReader {
return trace.format.newReader(trace.path, name, projection)
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt
index 1a9b9ee1..5206d02b 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TableDetails.kt
@@ -29,9 +29,5 @@ import org.opendc.trace.TableColumn
* A class used by the [TraceFormat] interface for describing the metadata of a [Table].
*
* @param columns The available columns in the table.
- * @param partitionKeys The table columns that act as partition keys for the table.
*/
-public data class TableDetails(
- val columns: List<TableColumn<*>>,
- val partitionKeys: List<TableColumn<*>> = emptyList()
-)
+public data class TableDetails(val columns: List<TableColumn>)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
index 47761e0f..eff6fa83 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
@@ -22,7 +22,6 @@
package org.opendc.trace.spi
-import org.opendc.trace.TableColumn
import org.opendc.trace.TableReader
import org.opendc.trace.TableWriter
import java.nio.file.Path
@@ -69,11 +68,11 @@ public interface TraceFormat {
*
* @param path The path to the trace to open.
* @param table The name of the table to open a [TableReader] for.
- * @param projection The list of [TableColumn]s to project or `null` if no projection is performed.
+ * @param projection The name of the columns to project or `null` if no projection is performed.
* @throws IllegalArgumentException If [table] does not exist.
* @return A [TableReader] instance for the table.
*/
- public fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader
+ public fun newReader(path: Path, table: String, projection: List<String>?): TableReader
/**
* Open a [TableWriter] for the specified [table].
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt
index 11e032c7..c4854265 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt
@@ -22,8 +22,10 @@
package org.opendc.trace.util
-import org.opendc.trace.TableColumn
import org.opendc.trace.TableReader
+import java.time.Duration
+import java.time.Instant
+import java.util.*
/**
* A helper class to chain multiple [TableReader]s.
@@ -63,11 +65,11 @@ public abstract class CompositeTableReader : TableReader {
return delegate != null
}
- override fun resolve(column: TableColumn<*>): Int {
+ override fun resolve(name: String): Int {
tryStart()
val delegate = delegate
- return delegate?.resolve(column) ?: -1
+ return delegate?.resolve(name) ?: -1
}
override fun isNull(index: Int): Boolean {
@@ -75,11 +77,6 @@ public abstract class CompositeTableReader : TableReader {
return delegate.isNull(index)
}
- override fun get(index: Int): Any? {
- val delegate = checkNotNull(delegate) { "Invalid reader state" }
- return delegate.get(index)
- }
-
override fun getBoolean(index: Int): Boolean {
val delegate = checkNotNull(delegate) { "Invalid reader state" }
return delegate.getBoolean(index)
@@ -95,11 +92,51 @@ public abstract class CompositeTableReader : TableReader {
return delegate.getLong(index)
}
+ override fun getFloat(index: Int): Float {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getFloat(index)
+ }
+
override fun getDouble(index: Int): Double {
val delegate = checkNotNull(delegate) { "Invalid reader state" }
return delegate.getDouble(index)
}
+ override fun getString(index: Int): String? {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getString(index)
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getUUID(index)
+ }
+
+ override fun getInstant(index: Int): Instant? {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getInstant(index)
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getDuration(index)
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getList(index, elementType)
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getSet(index, elementType)
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ val delegate = checkNotNull(delegate) { "Invalid reader state" }
+ return delegate.getMap(index, keyType, valueType)
+ }
+
override fun close() {
delegate?.close()
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt
new file mode 100644
index 00000000..8ef080b5
--- /dev/null
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+@file:JvmName("TableColumnConversions")
+package org.opendc.trace.util
+
+import org.opendc.trace.TableColumnType
+import java.time.Duration
+import java.time.Instant
+import java.util.UUID
+
+/**
+ * Helper method to convert a [List] into a [List] with elements of [targetElementType].
+ */
+public fun <T> TableColumnType.List.convertTo(value: List<*>?, targetElementType: Class<T>): List<T>? {
+ require(elementType.isCompatible(targetElementType)) { "Target element type is not compatible with $elementType" }
+ @Suppress("UNCHECKED_CAST")
+ return value as List<T>?
+}
+
+/**
+ * Helper method to convert a [Set] into a [Set] with elements of [targetElementType].
+ */
+public fun <T> TableColumnType.Set.convertTo(value: Set<*>?, targetElementType: Class<T>): Set<T>? {
+ require(elementType.isCompatible(targetElementType)) { "Target element type is not compatible with $elementType" }
+ @Suppress("UNCHECKED_CAST")
+ return value as Set<T>?
+}
+
+/**
+ * Helper method to convert a [Map] into a [Map] with [targetKeyType] keys and [targetValueType] values.
+ */
+public fun <K, V> TableColumnType.Map.convertTo(value: Map<*, *>?, targetKeyType: Class<K>, targetValueType: Class<V>): Map<K, V>? {
+ require(keyType.isCompatible(targetKeyType)) { "Target key type $targetKeyType is not compatible with $keyType" }
+ require(valueType.isCompatible(targetValueType)) { "Target value type $targetValueType is not compatible with $valueType" }
+ @Suppress("UNCHECKED_CAST")
+ return value as Map<K, V>?
+}
+
+/**
+ * Helper method to determine [javaType] is compatible with this [TableColumnType].
+ */
+private fun TableColumnType.isCompatible(javaType: Class<*>): Boolean {
+ return when (this) {
+ is TableColumnType.Boolean -> javaType.isAssignableFrom(Boolean::class.java)
+ is TableColumnType.Int -> javaType.isAssignableFrom(Int::class.java)
+ is TableColumnType.Long -> javaType.isAssignableFrom(Long::class.java)
+ is TableColumnType.Float -> javaType.isAssignableFrom(Float::class.java)
+ is TableColumnType.Double -> javaType.isAssignableFrom(Double::class.java)
+ is TableColumnType.String -> javaType.isAssignableFrom(String::class.java)
+ is TableColumnType.UUID -> javaType.isAssignableFrom(UUID::class.java)
+ is TableColumnType.Instant -> javaType.isAssignableFrom(Instant::class.java)
+ is TableColumnType.Duration -> javaType.isAssignableFrom(Duration::class.java)
+ is TableColumnType.List -> javaType.isAssignableFrom(List::class.java)
+ is TableColumnType.Set -> javaType.isAssignableFrom(Set::class.java)
+ is TableColumnType.Map -> javaType.isAssignableFrom(Map::class.java)
+ }
+}
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
index 3132b1d9..e9017b35 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
@@ -29,7 +29,9 @@ import org.opendc.trace.*
import org.opendc.trace.conv.RESOURCE_ID
import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT
import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
+import java.time.Duration
import java.time.Instant
+import java.util.*
/**
* A [TableReader] for the Azure v1 VM resource state table.
@@ -63,20 +65,22 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
return true
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ private val COL_ID = 0
+ private val COL_TIMESTAMP = 1
+ private val COL_CPU_USAGE_PCT = 2
- override fun isNull(index: Int): Boolean {
- require(index in 0..columns.size) { "Invalid column index" }
- return false
+ override fun resolve(name: String): Int {
+ return when (name) {
+ RESOURCE_ID -> COL_ID
+ RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP
+ RESOURCE_STATE_CPU_USAGE_PCT -> COL_CPU_USAGE_PCT
+ else -> -1
+ }
}
- override fun get(index: Int): Any? {
- return when (index) {
- COL_ID -> id
- COL_TIMESTAMP -> timestamp
- COL_CPU_USAGE_PCT -> cpuUsagePct
- else -> throw IllegalArgumentException("Invalid column index")
- }
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..COL_CPU_USAGE_PCT) { "Invalid column index" }
+ return false
}
override fun getBoolean(index: Int): Boolean {
@@ -91,6 +95,10 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
throw IllegalArgumentException("Invalid column")
}
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
override fun getDouble(index: Int): Double {
return when (index) {
COL_CPU_USAGE_PCT -> cpuUsagePct
@@ -98,6 +106,40 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
}
}
+ override fun getString(index: Int): String? {
+ return when (index) {
+ COL_ID -> id
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant? {
+ return when (index) {
+ COL_TIMESTAMP -> timestamp
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
override fun close() {
parser.close()
}
@@ -131,15 +173,6 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
cpuUsagePct = Double.NaN
}
- private val COL_ID = 0
- private val COL_TIMESTAMP = 1
- private val COL_CPU_USAGE_PCT = 2
- private val columns = mapOf(
- RESOURCE_ID to COL_ID,
- RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
- RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT
- )
-
companion object {
/**
* The [CsvSchema] that is used to parse the trace.
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
index 154a37e4..456a2536 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
@@ -27,7 +27,9 @@ import com.fasterxml.jackson.dataformat.csv.CsvParser
import com.fasterxml.jackson.dataformat.csv.CsvSchema
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import java.time.Duration
import java.time.Instant
+import java.util.*
/**
* A [TableReader] for the Azure v1 VM resources table.
@@ -63,22 +65,26 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
return true
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ private val COL_ID = 0
+ private val COL_START_TIME = 1
+ private val COL_STOP_TIME = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_MEM_CAPACITY = 4
- override fun isNull(index: Int): Boolean {
- require(index in 0..columns.size) { "Invalid column index" }
- return false
+ override fun resolve(name: String): Int {
+ return when (name) {
+ RESOURCE_ID -> COL_ID
+ RESOURCE_START_TIME -> COL_START_TIME
+ RESOURCE_STOP_TIME -> COL_STOP_TIME
+ RESOURCE_CPU_COUNT -> COL_CPU_COUNT
+ RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY
+ else -> -1
+ }
}
- override fun get(index: Int): Any? {
- return when (index) {
- COL_ID -> id
- COL_START_TIME -> startTime
- COL_STOP_TIME -> stopTime
- COL_CPU_COUNT -> getInt(index)
- COL_MEM_CAPACITY -> getDouble(index)
- else -> throw IllegalArgumentException("Invalid column")
- }
+ override fun isNull(index: Int): Boolean {
+ require(index in 0..COL_MEM_CAPACITY) { "Invalid column index" }
+ return false
}
override fun getBoolean(index: Int): Boolean {
@@ -93,6 +99,13 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
}
override fun getLong(index: Int): Long {
+ return when (index) {
+ COL_CPU_COUNT -> cpuCores.toLong()
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getFloat(index: Int): Float {
throw IllegalArgumentException("Invalid column")
}
@@ -103,6 +116,41 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
}
}
+ override fun getString(index: Int): String? {
+ return when (index) {
+ COL_ID -> id
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant? {
+ return when (index) {
+ COL_START_TIME -> startTime
+ COL_STOP_TIME -> stopTime
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
override fun close() {
parser.close()
}
@@ -140,19 +188,6 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
memCapacity = Double.NaN
}
- private val COL_ID = 0
- private val COL_START_TIME = 1
- private val COL_STOP_TIME = 2
- private val COL_CPU_COUNT = 3
- private val COL_MEM_CAPACITY = 4
- private val columns = mapOf(
- RESOURCE_ID to COL_ID,
- RESOURCE_START_TIME to COL_START_TIME,
- RESOURCE_STOP_TIME to COL_STOP_TIME,
- RESOURCE_CPU_COUNT to COL_CPU_COUNT,
- RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY
- )
-
companion object {
/**
* The [CsvSchema] that is used to parse the trace.
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
index 73978990..2294e4a4 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
@@ -62,26 +62,25 @@ public class AzureTraceFormat : TraceFormat {
return when (table) {
TABLE_RESOURCES -> TableDetails(
listOf(
- RESOURCE_ID,
- RESOURCE_START_TIME,
- RESOURCE_STOP_TIME,
- RESOURCE_CPU_COUNT,
- RESOURCE_MEM_CAPACITY
+ TableColumn(RESOURCE_ID, TableColumnType.String),
+ TableColumn(RESOURCE_START_TIME, TableColumnType.Instant),
+ TableColumn(RESOURCE_STOP_TIME, TableColumnType.Instant),
+ TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int),
+ TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double),
)
)
TABLE_RESOURCE_STATES -> TableDetails(
listOf(
- RESOURCE_ID,
- RESOURCE_STATE_TIMESTAMP,
- RESOURCE_STATE_CPU_USAGE_PCT
- ),
- listOf(RESOURCE_STATE_TIMESTAMP)
+ TableColumn(RESOURCE_ID, TableColumnType.String),
+ TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant),
+ TableColumn(RESOURCE_STATE_CPU_USAGE_PCT, TableColumnType.Double),
+ )
)
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
return when (table) {
TABLE_RESOURCES -> {
val stream = GZIPInputStream(path.resolve("vmtable/vmtable.csv.gz").inputStream())
diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
index 263d26ce..932858f8 100644
--- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
@@ -60,7 +60,7 @@ class AzureTraceFormatTest {
val reader = format.newReader(path, TABLE_RESOURCES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) },
+ { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.getString(RESOURCE_ID)) },
{ assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
{ assertEquals(1750000.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) },
)
@@ -75,8 +75,8 @@ class AzureTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.get(RESOURCE_ID)) },
- { assertEquals(0, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) },
+ { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.getString(RESOURCE_ID)) },
+ { assertEquals(0, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) },
{ assertEquals(0.0286979, reader.getDouble(RESOURCE_STATE_CPU_USAGE_PCT), 0.01) }
)
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
index 1e1d1a09..f9bd6200 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
@@ -25,7 +25,9 @@ package org.opendc.trace.bitbrains
import org.opendc.trace.*
import org.opendc.trace.conv.*
import java.io.BufferedReader
+import java.time.Duration
import java.time.Instant
+import java.util.*
/**
* A [TableReader] for the Bitbrains resource state table.
@@ -89,25 +91,29 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
return true
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ override fun resolve(name: String): Int {
+ return when (name) {
+ RESOURCE_ID -> COL_ID
+ RESOURCE_CLUSTER_ID -> COL_CLUSTER_ID
+ RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP
+ RESOURCE_CPU_COUNT -> COL_NCPUS
+ RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY
+ RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE
+ RESOURCE_STATE_CPU_USAGE_PCT -> COL_CPU_USAGE_PCT
+ RESOURCE_STATE_CPU_DEMAND -> COL_CPU_DEMAND
+ RESOURCE_STATE_CPU_READY_PCT -> COL_CPU_READY_PCT
+ RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY
+ RESOURCE_STATE_DISK_READ -> COL_DISK_READ
+ RESOURCE_STATE_DISK_WRITE -> COL_DISK_WRITE
+ else -> -1
+ }
+ }
override fun isNull(index: Int): Boolean {
- require(index in 0..COL_MAX) { "Invalid column index" }
+ require(index in 0 until COL_MAX) { "Invalid column index" }
return false
}
- override fun get(index: Int): Any? {
- return when (index) {
- COL_ID -> id
- COL_CLUSTER_ID -> cluster
- COL_TIMESTAMP -> timestamp
- COL_NCPUS -> getInt(index)
- COL_POWERED_ON -> getInt(index)
- COL_CPU_CAPACITY, COL_CPU_USAGE, COL_CPU_USAGE_PCT, COL_CPU_READY_PCT, COL_CPU_DEMAND, COL_MEM_CAPACITY, COL_DISK_READ, COL_DISK_WRITE -> getDouble(index)
- else -> throw IllegalArgumentException("Invalid column")
- }
- }
-
override fun getBoolean(index: Int): Boolean {
return when (index) {
COL_POWERED_ON -> poweredOn
@@ -126,6 +132,10 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
throw IllegalArgumentException("Invalid column")
}
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
override fun getDouble(index: Int): Double {
return when (index) {
COL_CPU_CAPACITY -> cpuCapacity
@@ -140,6 +150,41 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
}
}
+ override fun getString(index: Int): String? {
+ return when (index) {
+ COL_ID -> id
+ COL_CLUSTER_ID -> cluster
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant? {
+ return when (index) {
+ COL_TIMESTAMP -> timestamp
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
override fun close() {
reader.close()
}
@@ -195,19 +240,4 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
private val COL_MEM_CAPACITY = 20
private val COL_CPU_USAGE_PCT = 21
private val COL_MAX = COL_CPU_USAGE_PCT + 1
-
- private val columns = mapOf(
- RESOURCE_ID to COL_ID,
- RESOURCE_CLUSTER_ID to COL_CLUSTER_ID,
- RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
- RESOURCE_CPU_COUNT to COL_NCPUS,
- RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY,
- RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE,
- RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT,
- RESOURCE_STATE_CPU_DEMAND to COL_CPU_DEMAND,
- RESOURCE_STATE_CPU_READY_PCT to COL_CPU_READY_PCT,
- RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY,
- RESOURCE_STATE_DISK_READ to COL_DISK_READ,
- RESOURCE_STATE_DISK_WRITE to COL_DISK_WRITE
- )
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
index 82e454ad..31c4f1e2 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
@@ -53,26 +53,25 @@ public class BitbrainsExTraceFormat : TraceFormat {
return when (table) {
TABLE_RESOURCE_STATES -> TableDetails(
listOf(
- RESOURCE_ID,
- RESOURCE_CLUSTER_ID,
- RESOURCE_STATE_TIMESTAMP,
- RESOURCE_CPU_COUNT,
- RESOURCE_CPU_CAPACITY,
- RESOURCE_STATE_CPU_USAGE,
- RESOURCE_STATE_CPU_USAGE_PCT,
- RESOURCE_STATE_CPU_DEMAND,
- RESOURCE_STATE_CPU_READY_PCT,
- RESOURCE_MEM_CAPACITY,
- RESOURCE_STATE_DISK_READ,
- RESOURCE_STATE_DISK_WRITE
- ),
- listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP)
+ TableColumn(RESOURCE_ID, TableColumnType.String),
+ TableColumn(RESOURCE_CLUSTER_ID, TableColumnType.String),
+ TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant),
+ TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int),
+ TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double),
+ TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double),
+ TableColumn(RESOURCE_STATE_CPU_USAGE_PCT, TableColumnType.Double),
+ TableColumn(RESOURCE_STATE_CPU_DEMAND, TableColumnType.Double),
+ TableColumn(RESOURCE_STATE_CPU_READY_PCT, TableColumnType.Double),
+ TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double),
+ TableColumn(RESOURCE_STATE_DISK_READ, TableColumnType.Double),
+ TableColumn(RESOURCE_STATE_DISK_WRITE, TableColumnType.Double),
+ )
)
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
return when (table) {
TABLE_RESOURCE_STATES -> newResourceStateReader(path)
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
index 214fd749..14c1f801 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
@@ -29,6 +29,7 @@ import com.fasterxml.jackson.dataformat.csv.CsvSchema
import org.opendc.trace.*
import org.opendc.trace.conv.*
import java.text.NumberFormat
+import java.time.Duration
import java.time.Instant
import java.time.LocalDateTime
import java.time.ZoneOffset
@@ -112,21 +113,40 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
return true
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ private val COL_TIMESTAMP = 0
+ private val COL_CPU_COUNT = 1
+ private val COL_CPU_CAPACITY = 2
+ private val COL_CPU_USAGE = 3
+ private val COL_CPU_USAGE_PCT = 4
+ private val COL_MEM_CAPACITY = 5
+ private val COL_MEM_USAGE = 6
+ private val COL_DISK_READ = 7
+ private val COL_DISK_WRITE = 8
+ private val COL_NET_RX = 9
+ private val COL_NET_TX = 10
+ private val COL_ID = 11
- override fun isNull(index: Int): Boolean {
- check(index in 0..columns.size) { "Invalid column index" }
- return false
+ override fun resolve(name: String): Int {
+ return when (name) {
+ RESOURCE_ID -> COL_ID
+ RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP
+ RESOURCE_CPU_COUNT -> COL_CPU_COUNT
+ RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY
+ RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE
+ RESOURCE_STATE_CPU_USAGE_PCT -> COL_CPU_USAGE_PCT
+ RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY
+ RESOURCE_STATE_MEM_USAGE -> COL_MEM_USAGE
+ RESOURCE_STATE_DISK_READ -> COL_DISK_READ
+ RESOURCE_STATE_DISK_WRITE -> COL_DISK_WRITE
+ RESOURCE_STATE_NET_RX -> COL_NET_RX
+ RESOURCE_STATE_NET_TX -> COL_NET_TX
+ else -> -1
+ }
}
- override fun get(index: Int): Any? {
- return when (index) {
- COL_ID -> partition
- COL_TIMESTAMP -> timestamp
- COL_CPU_COUNT -> getInt(index)
- COL_CPU_CAPACITY, COL_CPU_USAGE, COL_CPU_USAGE_PCT, COL_MEM_CAPACITY, COL_MEM_USAGE, COL_DISK_READ, COL_DISK_WRITE, COL_NET_RX, COL_NET_TX -> getDouble(index)
- else -> throw IllegalArgumentException("Invalid column")
- }
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..COL_NET_TX) { "Invalid column index" }
+ return false
}
override fun getBoolean(index: Int): Boolean {
@@ -144,6 +164,10 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
throw IllegalArgumentException("Invalid column")
}
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
override fun getDouble(index: Int): Double {
return when (index) {
COL_CPU_CAPACITY -> cpuCapacity
@@ -159,6 +183,40 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
}
}
+ override fun getString(index: Int): String {
+ return when (index) {
+ COL_ID -> partition
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant? {
+ return when (index) {
+ COL_TIMESTAMP -> timestamp
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
override fun close() {
parser.close()
}
@@ -228,34 +286,6 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
netTransmitted = Double.NaN
}
- private val COL_TIMESTAMP = 0
- private val COL_CPU_COUNT = 1
- private val COL_CPU_CAPACITY = 2
- private val COL_CPU_USAGE = 3
- private val COL_CPU_USAGE_PCT = 4
- private val COL_MEM_CAPACITY = 5
- private val COL_MEM_USAGE = 6
- private val COL_DISK_READ = 7
- private val COL_DISK_WRITE = 8
- private val COL_NET_RX = 9
- private val COL_NET_TX = 10
- private val COL_ID = 11
-
- private val columns = mapOf(
- RESOURCE_ID to COL_ID,
- RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
- RESOURCE_CPU_COUNT to COL_CPU_COUNT,
- RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY,
- RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE,
- RESOURCE_STATE_CPU_USAGE_PCT to COL_CPU_USAGE_PCT,
- RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY,
- RESOURCE_STATE_MEM_USAGE to COL_MEM_USAGE,
- RESOURCE_STATE_DISK_READ to COL_DISK_READ,
- RESOURCE_STATE_DISK_WRITE to COL_DISK_WRITE,
- RESOURCE_STATE_NET_RX to COL_NET_RX,
- RESOURCE_STATE_NET_TX to COL_NET_TX
- )
-
/**
* The type of the timestamp in the trace.
*/
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt
index 55f09f43..c57c4cb2 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt
@@ -26,6 +26,9 @@ import com.fasterxml.jackson.dataformat.csv.CsvFactory
import org.opendc.trace.*
import org.opendc.trace.conv.RESOURCE_ID
import java.nio.file.Path
+import java.time.Duration
+import java.time.Instant
+import java.util.*
/**
* A [TableReader] for the Bitbrains resource table.
@@ -51,7 +54,7 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms
continue
}
- id = reader.get(idCol) as String
+ id = reader.getString(idCol)
return true
} finally {
reader.close()
@@ -61,33 +64,68 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms
return false
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ private val COL_ID = 0
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ RESOURCE_ID -> COL_ID
+ else -> -1
+ }
+ }
override fun isNull(index: Int): Boolean {
- check(index in 0..columns.size) { "Invalid column index" }
+ check(index in 0..COL_ID) { "Invalid column index" }
return false
}
- override fun get(index: Int): Any? {
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(index: Int): Int {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(index: Int): Double {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getString(index: Int): String? {
return when (index) {
COL_ID -> id
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getBoolean(index: Int): Boolean {
+ override fun getUUID(index: Int): UUID? {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(index: Int): Int {
+ override fun getInstant(index: Int): Instant? {
throw IllegalArgumentException("Invalid column")
}
- override fun getLong(index: Int): Long {
+ override fun getDuration(index: Int): Duration? {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(index: Int): Double {
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
throw IllegalArgumentException("Invalid column")
}
@@ -104,7 +142,4 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms
private fun reset() {
id = null
}
-
- private val COL_ID = 0
- private val columns = mapOf(RESOURCE_ID to COL_ID)
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
index a374e951..f3030893 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
@@ -59,29 +59,32 @@ public class BitbrainsTraceFormat : TraceFormat {
override fun getDetails(path: Path, table: String): TableDetails {
return when (table) {
- TABLE_RESOURCES -> TableDetails(listOf(RESOURCE_ID))
+ TABLE_RESOURCES -> TableDetails(
+ listOf(
+ TableColumn(RESOURCE_ID, TableColumnType.String)
+ )
+ )
TABLE_RESOURCE_STATES -> TableDetails(
listOf(
- RESOURCE_ID,
- RESOURCE_STATE_TIMESTAMP,
- RESOURCE_CPU_COUNT,
- RESOURCE_CPU_CAPACITY,
- RESOURCE_STATE_CPU_USAGE,
- RESOURCE_STATE_CPU_USAGE_PCT,
- RESOURCE_MEM_CAPACITY,
- RESOURCE_STATE_MEM_USAGE,
- RESOURCE_STATE_DISK_READ,
- RESOURCE_STATE_DISK_WRITE,
- RESOURCE_STATE_NET_RX,
- RESOURCE_STATE_NET_TX,
+ TableColumn(RESOURCE_ID, TableColumnType.String),
+ TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant),
+ TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int),
+ TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double),
+ TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double),
+ TableColumn(RESOURCE_STATE_CPU_USAGE_PCT, TableColumnType.Double),
+ TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double),
+ TableColumn(RESOURCE_STATE_MEM_USAGE, TableColumnType.Double),
+ TableColumn(RESOURCE_STATE_DISK_READ, TableColumnType.Double),
+ TableColumn(RESOURCE_STATE_DISK_WRITE, TableColumnType.Double),
+ TableColumn(RESOURCE_STATE_NET_RX, TableColumnType.Double),
+ TableColumn(RESOURCE_STATE_NET_TX, TableColumnType.Double),
),
- listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP)
)
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
return when (table) {
TABLE_RESOURCES -> {
val vms = Files.walk(path, 1)
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
index c944cb98..870129e4 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
@@ -63,7 +63,7 @@ internal class BitbrainsExTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals(1631911500, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) },
+ { assertEquals(1631911500, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) },
{ assertEquals(21.2, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) }
)
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
index 841801e6..557f8c21 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
@@ -61,7 +61,7 @@ class BitbrainsTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("bitbrains", reader.get(RESOURCE_ID)) },
+ { assertEquals("bitbrains", reader.getString(RESOURCE_ID)) },
{ assertFalse(reader.nextRow()) }
)
@@ -75,7 +75,7 @@ class BitbrainsTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals(1376314846, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) },
+ { assertEquals(1376314846, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) },
{ assertEquals(19.066, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) }
)
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
index 1854f262..74bd188b 100644
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
@@ -24,10 +24,10 @@ package org.opendc.trace.calcite
import org.apache.calcite.linq4j.Enumerator
import org.opendc.trace.TableColumn
+import org.opendc.trace.TableColumnType
import org.opendc.trace.TableReader
-import java.sql.Timestamp
-import java.time.Duration
-import java.time.Instant
+import java.nio.ByteBuffer
+import java.nio.ByteOrder
import java.util.concurrent.atomic.AtomicBoolean
/**
@@ -35,10 +35,10 @@ import java.util.concurrent.atomic.AtomicBoolean
*/
internal class TraceReaderEnumerator<E>(
private val reader: TableReader,
- private val columns: List<TableColumn<*>>,
+ private val columns: List<TableColumn>,
private val cancelFlag: AtomicBoolean
) : Enumerator<E> {
- private val columnIndices = columns.map { reader.resolve(it) }.toIntArray()
+ private val columnIndices = columns.map { reader.resolve(it.name) }.toIntArray()
private var current: E? = null
override fun moveNext(): Boolean {
@@ -80,14 +80,35 @@ internal class TraceReaderEnumerator<E>(
return res
}
- private fun convertColumn(reader: TableReader, column: TableColumn<*>, columnIndex: Int): Any? {
- val value = reader.get(columnIndex)
-
+ private fun convertColumn(reader: TableReader, column: TableColumn, columnIndex: Int): Any? {
return when (column.type) {
- Instant::class.java -> Timestamp.from(value as Instant)
- Duration::class.java -> (value as Duration).toMillis()
- Set::class.java -> (value as Set<*>).toTypedArray()
- else -> value
+ is TableColumnType.Boolean -> reader.getBoolean(columnIndex)
+ is TableColumnType.Int -> reader.getInt(columnIndex)
+ is TableColumnType.Long -> reader.getLong(columnIndex)
+ is TableColumnType.Float -> reader.getFloat(columnIndex)
+ is TableColumnType.Double -> reader.getDouble(columnIndex)
+ is TableColumnType.String -> reader.getString(columnIndex)
+ is TableColumnType.UUID -> {
+ val uuid = reader.getUUID(columnIndex)
+
+ if (uuid != null) {
+ val uuidBytes = ByteArray(16)
+
+ ByteBuffer.wrap(uuidBytes)
+ .order(ByteOrder.BIG_ENDIAN)
+ .putLong(uuid.mostSignificantBits)
+ .putLong(uuid.leastSignificantBits)
+
+ uuidBytes
+ } else {
+ null
+ }
+ }
+ is TableColumnType.Instant -> reader.getInstant(columnIndex)?.toEpochMilli()
+ is TableColumnType.Duration -> reader.getDuration(columnIndex)?.toMillis() ?: 0
+ is TableColumnType.List -> reader.getList(columnIndex, Any::class.java)?.toTypedArray()
+ is TableColumnType.Set -> reader.getSet(columnIndex, Any::class.java)?.toTypedArray()
+ is TableColumnType.Map -> reader.getMap(columnIndex, Any::class.java, Any::class.java)
}
}
}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
index 8c571b82..dfcc22a3 100644
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
@@ -38,8 +38,11 @@ import org.apache.calcite.rex.RexNode
import org.apache.calcite.schema.*
import org.apache.calcite.schema.impl.AbstractTableQueryable
import org.apache.calcite.sql.type.SqlTypeName
+import org.opendc.trace.TableColumnType
+import java.nio.ByteBuffer
import java.time.Duration
import java.time.Instant
+import java.util.*
import java.util.concurrent.atomic.AtomicBoolean
/**
@@ -70,7 +73,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) :
val cancelFlag = DataContext.Variable.CANCEL_FLAG.get<AtomicBoolean>(root)
return object : AbstractEnumerable<Array<Any?>>() {
override fun enumerator(): Enumerator<Array<Any?>> =
- TraceReaderEnumerator(table.newReader(projection), projection ?: table.columns, cancelFlag)
+ TraceReaderEnumerator(table.newReader(projection?.map { it.name }), projection ?: table.columns, cancelFlag)
}
}
@@ -78,7 +81,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) :
val table = table
val columns = table.columns
val writer = table.newWriter()
- val columnIndices = columns.map { writer.resolve(it) }.toIntArray()
+ val columnIndices = columns.map { writer.resolve(it.name) }.toIntArray()
var rowCount = 0L
try {
@@ -90,16 +93,24 @@ internal class TraceTable(private val table: org.opendc.trace.Table) :
continue
}
val columnType = columns[index].type
-
- writer.set(
- columnIndices[index],
- when (columnType) {
- Duration::class.java -> Duration.ofMillis(value as Long)
- Instant::class.java -> Instant.ofEpochMilli(value as Long)
- Set::class.java -> (value as List<*>).toSet()
- else -> value
+ val columnIndex = columnIndices[index]
+ when (columnType) {
+ is TableColumnType.Boolean -> writer.setBoolean(columnIndex, value as Boolean)
+ is TableColumnType.Int -> writer.setInt(columnIndex, value as Int)
+ is TableColumnType.Long -> writer.setLong(columnIndex, value as Long)
+ is TableColumnType.Float -> writer.setFloat(columnIndex, value as Float)
+ is TableColumnType.Double -> writer.setDouble(columnIndex, value as Double)
+ is TableColumnType.String -> writer.setString(columnIndex, value as String)
+ is TableColumnType.UUID -> {
+ val bb = ByteBuffer.wrap(value as ByteArray)
+ writer.setUUID(columnIndex, UUID(bb.getLong(), bb.getLong()))
}
- )
+ is TableColumnType.Instant -> writer.setInstant(columnIndex, Instant.ofEpochMilli(value as Long))
+ is TableColumnType.Duration -> writer.setDuration(columnIndex, Duration.ofMillis(value as Long))
+ is TableColumnType.List -> writer.setList(columnIndex, value as List<*>)
+ is TableColumnType.Set -> writer.setSet(columnIndex, (value as List<*>).toSet())
+ is TableColumnType.Map -> writer.setMap(columnIndex, value as Map<*, *>)
+ }
}
writer.endRow()
@@ -161,16 +172,26 @@ internal class TraceTable(private val table: org.opendc.trace.Table) :
for (column in table.columns) {
names.add(column.name)
- types.add(
- when (column.type) {
- Instant::class.java -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP)
- Duration::class.java -> typeFactory.createSqlType(SqlTypeName.BIGINT)
- Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.UNKNOWN), -1)
- else -> typeFactory.createType(column.type)
- }
- )
+ types.add(mapType(typeFactory, column.type))
}
return typeFactory.createStructType(types, names)
}
+
+ private fun mapType(typeFactory: JavaTypeFactory, type: TableColumnType): RelDataType {
+ return when (type) {
+ is TableColumnType.Boolean -> typeFactory.createSqlType(SqlTypeName.BOOLEAN)
+ is TableColumnType.Int -> typeFactory.createSqlType(SqlTypeName.INTEGER)
+ is TableColumnType.Long -> typeFactory.createSqlType(SqlTypeName.BIGINT)
+ is TableColumnType.Float -> typeFactory.createSqlType(SqlTypeName.FLOAT)
+ is TableColumnType.Double -> typeFactory.createSqlType(SqlTypeName.DOUBLE)
+ is TableColumnType.String -> typeFactory.createSqlType(SqlTypeName.VARCHAR)
+ is TableColumnType.UUID -> typeFactory.createSqlType(SqlTypeName.BINARY, 16)
+ is TableColumnType.Instant -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP)
+ is TableColumnType.Duration -> typeFactory.createSqlType(SqlTypeName.BIGINT)
+ is TableColumnType.List -> typeFactory.createArrayType(mapType(typeFactory, type.elementType), -1)
+ is TableColumnType.Set -> typeFactory.createMultisetType(mapType(typeFactory, type.elementType), -1)
+ is TableColumnType.Map -> typeFactory.createMapType(mapType(typeFactory, type.keyType), mapType(typeFactory, type.valueType))
+ }
+ }
}
diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt
index d2877d7c..d8729034 100644
--- a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt
+++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt
@@ -22,16 +22,24 @@
package org.opendc.trace.calcite
+import io.mockk.every
+import io.mockk.mockk
import org.apache.calcite.jdbc.CalciteConnection
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableColumnType
+import org.opendc.trace.TableReader
import org.opendc.trace.Trace
+import org.opendc.trace.conv.TABLE_RESOURCES
import java.nio.file.Files
import java.nio.file.Paths
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.Statement
import java.sql.Timestamp
+import java.time.Duration
+import java.time.Instant
import java.util.*
/**
@@ -41,11 +49,11 @@ class CalciteTest {
/**
* The trace to experiment with.
*/
- private val trace = Trace.open(Paths.get("src/test/resources/trace"), format = "opendc-vm")
+ private val odcTrace = Trace.open(Paths.get("src/test/resources/trace"), format = "opendc-vm")
@Test
fun testResources() {
- runQuery(trace, "SELECT * FROM trace.resources") { rs ->
+ runQuery(odcTrace, "SELECT * FROM trace.resources") { rs ->
assertAll(
{ assertTrue(rs.next()) },
{ assertEquals("1019", rs.getString("id")) },
@@ -65,7 +73,7 @@ class CalciteTest {
@Test
fun testResourceStates() {
- runQuery(trace, "SELECT * FROM trace.resource_states") { rs ->
+ runQuery(odcTrace, "SELECT * FROM trace.resource_states") { rs ->
assertAll(
{ assertTrue(rs.next()) },
{ assertEquals("1019", rs.getString("id")) },
@@ -80,7 +88,7 @@ class CalciteTest {
@Test
fun testInterferenceGroups() {
- runQuery(trace, "SELECT * FROM trace.interference_groups") { rs ->
+ runQuery(odcTrace, "SELECT * FROM trace.interference_groups") { rs ->
assertAll(
{ assertTrue(rs.next()) },
{ assertArrayEquals(arrayOf("1019", "1023", "1052"), rs.getArray("members").array as Array<*>) },
@@ -92,7 +100,7 @@ class CalciteTest {
@Test
fun testComplexQuery() {
- runQuery(trace, "SELECT max(cpu_usage) as max_cpu_usage, avg(cpu_usage) as avg_cpu_usage FROM trace.resource_states") { rs ->
+ runQuery(odcTrace, "SELECT max(cpu_usage) as max_cpu_usage, avg(cpu_usage) as avg_cpu_usage FROM trace.resource_states") { rs ->
assertAll(
{ assertTrue(rs.next()) },
{ assertEquals(249.59993808, rs.getDouble("max_cpu_usage")) },
@@ -128,6 +136,84 @@ class CalciteTest {
}
}
+ @Test
+ fun testUUID() {
+ val trace = mockk<Trace>()
+ every { trace.tables } returns listOf(TABLE_RESOURCES)
+ every { trace.getTable(TABLE_RESOURCES)!!.columns } returns listOf(
+ TableColumn("id", TableColumnType.UUID)
+ )
+ every { trace.getTable(TABLE_RESOURCES)!!.newReader() } answers {
+ object : TableReader {
+ override fun nextRow(): Boolean = true
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ "id" -> 0
+ else -> -1
+ }
+ }
+
+ override fun isNull(index: Int): Boolean = false
+
+ override fun getBoolean(index: Int): Boolean {
+ TODO("not implemented")
+ }
+
+ override fun getInt(index: Int): Int {
+ TODO("not implemented")
+ }
+
+ override fun getLong(index: Int): Long {
+ TODO("not implemented")
+ }
+
+ override fun getFloat(index: Int): Float {
+ TODO("not implemented")
+ }
+
+ override fun getDouble(index: Int): Double {
+ TODO("not implemented")
+ }
+
+ override fun getString(index: Int): String? {
+ TODO("not implemented")
+ }
+
+ override fun getUUID(index: Int): UUID = UUID(1, 2)
+
+ override fun getInstant(index: Int): Instant? {
+ TODO("not implemented")
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ TODO("not implemented")
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ TODO("not implemented")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ TODO("not implemented")
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ TODO("not implemented")
+ }
+
+ override fun close() {}
+ }
+ }
+
+ runQuery(trace, "SELECT id FROM trace.resources") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertArrayEquals(byteArrayOf(0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2), rs.getBytes("id")) },
+ )
+ }
+ }
+
/**
* Helper function to run statement for the specified trace.
*/
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
index 42a9469e..007ab90a 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
@@ -27,8 +27,10 @@ import com.fasterxml.jackson.dataformat.csv.CsvParser
import com.fasterxml.jackson.dataformat.csv.CsvSchema
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.util.convertTo
import java.time.Duration
import java.time.Instant
+import java.util.*
import java.util.regex.Pattern
/**
@@ -68,46 +70,89 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
return true
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ override fun resolve(name: String): Int {
+ return when (name) {
+ TASK_ID -> COL_JOB_ID
+ TASK_WORKFLOW_ID -> COL_WORKFLOW_ID
+ TASK_SUBMIT_TIME -> COL_SUBMIT_TIME
+ TASK_RUNTIME -> COL_RUNTIME
+ TASK_ALLOC_NCPUS -> COL_NPROC
+ TASK_REQ_NCPUS -> COL_REQ_NPROC
+ TASK_PARENTS -> COL_DEPS
+ else -> -1
+ }
+ }
override fun isNull(index: Int): Boolean {
- check(index in 0..columns.size) { "Invalid column" }
+ check(index in 0..COL_DEPS) { "Invalid column" }
return false
}
- override fun get(index: Int): Any? {
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(index: Int): Int {
+ return when (index) {
+ COL_REQ_NPROC -> reqNProcs
+ COL_NPROC -> nProcs
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(index: Int): Double {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getString(index: Int): String? {
return when (index) {
COL_JOB_ID -> jobId
COL_WORKFLOW_ID -> workflowId
- COL_SUBMIT_TIME -> submitTime
- COL_RUNTIME -> runtime
- COL_REQ_NPROC -> getInt(index)
- COL_NPROC -> getInt(index)
- COL_DEPS -> dependencies
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getBoolean(index: Int): Boolean {
+ override fun getUUID(index: Int): UUID? {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(index: Int): Int {
+ override fun getInstant(index: Int): Instant? {
return when (index) {
- COL_REQ_NPROC -> reqNProcs
- COL_NPROC -> nProcs
+ COL_SUBMIT_TIME -> submitTime
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(index: Int): Long {
+ override fun getDuration(index: Int): Duration? {
+ return when (index) {
+ COL_RUNTIME -> runtime
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(index: Int): Double {
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
throw IllegalArgumentException("Invalid column")
}
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ return when (index) {
+ COL_DEPS -> TYPE_DEPS.convertTo(dependencies, elementType)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
override fun close() {
parser.close()
}
@@ -180,15 +225,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
private val COL_REQ_NPROC = 5
private val COL_DEPS = 6
- private val columns = mapOf(
- TASK_ID to COL_JOB_ID,
- TASK_WORKFLOW_ID to COL_WORKFLOW_ID,
- TASK_SUBMIT_TIME to COL_SUBMIT_TIME,
- TASK_RUNTIME to COL_RUNTIME,
- TASK_ALLOC_NCPUS to COL_NPROC,
- TASK_REQ_NCPUS to COL_REQ_NPROC,
- TASK_PARENTS to COL_DEPS
- )
+ private val TYPE_DEPS = TableColumnType.Set(TableColumnType.String)
companion object {
/**
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
index 8d9eab82..ca63b624 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
@@ -56,21 +56,20 @@ public class GwfTraceFormat : TraceFormat {
return when (table) {
TABLE_TASKS -> TableDetails(
listOf(
- TASK_WORKFLOW_ID,
- TASK_ID,
- TASK_SUBMIT_TIME,
- TASK_RUNTIME,
- TASK_REQ_NCPUS,
- TASK_ALLOC_NCPUS,
- TASK_PARENTS,
- ),
- listOf(TASK_WORKFLOW_ID)
+ TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
+ TableColumn(TASK_ID, TableColumnType.String),
+ TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
+ TableColumn(TASK_RUNTIME, TableColumnType.Duration),
+ TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
+ )
)
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
return when (table) {
TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile()))
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
index 411d45d0..dd0e6066 100644
--- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
@@ -62,11 +62,11 @@ internal class GwfTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) },
- { assertEquals("1", reader.get(TASK_ID)) },
- { assertEquals(Instant.ofEpochSecond(16), reader.get(TASK_SUBMIT_TIME)) },
- { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) },
- { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) },
+ { assertEquals("0", reader.getString(TASK_WORKFLOW_ID)) },
+ { assertEquals("1", reader.getString(TASK_ID)) },
+ { assertEquals(Instant.ofEpochSecond(16), reader.getInstant(TASK_SUBMIT_TIME)) },
+ { assertEquals(Duration.ofSeconds(11), reader.getDuration(TASK_RUNTIME)) },
+ { assertEquals(emptySet<String>(), reader.getSet(TASK_PARENTS, String::class.java)) },
)
}
@@ -81,11 +81,11 @@ internal class GwfTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("0", reader.get(TASK_WORKFLOW_ID)) },
- { assertEquals("7", reader.get(TASK_ID)) },
- { assertEquals(Instant.ofEpochSecond(87), reader.get(TASK_SUBMIT_TIME)) },
- { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) },
- { assertEquals(setOf("4", "5", "6"), reader.get(TASK_PARENTS)) },
+ { assertEquals("0", reader.getString(TASK_WORKFLOW_ID)) },
+ { assertEquals("7", reader.getString(TASK_ID)) },
+ { assertEquals(Instant.ofEpochSecond(87), reader.getInstant(TASK_SUBMIT_TIME)) },
+ { assertEquals(Duration.ofSeconds(11), reader.getDuration(TASK_RUNTIME)) },
+ { assertEquals(setOf("4", "5", "6"), reader.getSet(TASK_PARENTS, String::class.java)) },
)
}
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt
index eb91e305..920daeea 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt
@@ -26,9 +26,13 @@ import org.opendc.trace.*
import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
+import org.opendc.trace.util.convertTo
import shaded.parquet.com.fasterxml.jackson.core.JsonParseException
import shaded.parquet.com.fasterxml.jackson.core.JsonParser
import shaded.parquet.com.fasterxml.jackson.core.JsonToken
+import java.time.Duration
+import java.time.Instant
+import java.util.*
/**
* A [TableReader] implementation for the OpenDC VM interference JSON format.
@@ -59,8 +63,14 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser)
}
}
- override fun resolve(column: TableColumn<*>): Int {
- return when (column) {
+ private val COL_MEMBERS = 0
+ private val COL_TARGET = 1
+ private val COL_SCORE = 2
+
+ private val TYPE_MEMBERS = TableColumnType.Set(TableColumnType.String)
+
+ override fun resolve(name: String): Int {
+ return when (name) {
INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS
INTERFERENCE_GROUP_TARGET -> COL_TARGET
INTERFERENCE_GROUP_SCORE -> COL_SCORE
@@ -75,43 +85,65 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser)
}
}
- override fun get(index: Int): Any {
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getInt(index: Int): Int {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun getDouble(index: Int): Double {
return when (index) {
- COL_MEMBERS -> members
COL_TARGET -> targetLoad
COL_SCORE -> score
else -> throw IllegalArgumentException("Invalid column $index")
}
}
- override fun getBoolean(index: Int): Boolean {
+ override fun getString(index: Int): String? {
throw IllegalArgumentException("Invalid column $index")
}
- override fun getInt(index: Int): Int {
+ override fun getUUID(index: Int): UUID? {
throw IllegalArgumentException("Invalid column $index")
}
- override fun getLong(index: Int): Long {
+ override fun getInstant(index: Int): Instant? {
throw IllegalArgumentException("Invalid column $index")
}
- override fun getDouble(index: Int): Double {
+ override fun getDuration(index: Int): Duration? {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
return when (index) {
- COL_TARGET -> targetLoad
- COL_SCORE -> score
+ COL_MEMBERS -> TYPE_MEMBERS.convertTo(members, elementType)
else -> throw IllegalArgumentException("Invalid column $index")
}
}
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
override fun close() {
parser.close()
}
- private val COL_MEMBERS = 0
- private val COL_TARGET = 1
- private val COL_SCORE = 2
-
private var members = emptySet<String>()
private var targetLoad = Double.POSITIVE_INFINITY
private var score = 1.0
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt
index 64bc4356..d726e890 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt
@@ -27,6 +27,9 @@ import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
import shaded.parquet.com.fasterxml.jackson.core.JsonGenerator
+import java.time.Duration
+import java.time.Instant
+import java.util.*
/**
* A [TableWriter] implementation for the OpenDC VM interference JSON format.
@@ -65,8 +68,8 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener
generator.writeEndObject()
}
- override fun resolve(column: TableColumn<*>): Int {
- return when (column) {
+ override fun resolve(name: String): Int {
+ return when (name) {
INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS
INTERFERENCE_GROUP_TARGET -> COL_TARGET
INTERFERENCE_GROUP_SCORE -> COL_SCORE
@@ -74,40 +77,66 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener
}
}
- override fun set(index: Int, value: Any) {
+ override fun setBoolean(index: Int, value: Boolean) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setInt(index: Int, value: Int) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setLong(index: Int, value: Long) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setFloat(index: Int, value: Float) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun setDouble(index: Int, value: Double) {
check(isRowActive) { "No active row" }
- @Suppress("UNCHECKED_CAST")
when (index) {
- COL_MEMBERS -> members = value as Set<String>
COL_TARGET -> targetLoad = (value as Number).toDouble()
COL_SCORE -> score = (value as Number).toDouble()
- else -> throw IllegalArgumentException("Invalid column index $index")
+ else -> throw IllegalArgumentException("Invalid column $index")
}
}
- override fun setBoolean(index: Int, value: Boolean) {
+ override fun setString(index: Int, value: String) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setInt(index: Int, value: Int) {
+ override fun setUUID(index: Int, value: UUID) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setLong(index: Int, value: Long) {
+ override fun setInstant(index: Int, value: Instant) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setDouble(index: Int, value: Double) {
+ override fun setDuration(index: Int, value: Duration) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun <T> setList(index: Int, value: List<T>) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
+ override fun <T> setSet(index: Int, value: Set<T>) {
check(isRowActive) { "No active row" }
+ @Suppress("UNCHECKED_CAST")
when (index) {
- COL_TARGET -> targetLoad = (value as Number).toDouble()
- COL_SCORE -> score = (value as Number).toDouble()
- else -> throw IllegalArgumentException("Invalid column $index")
+ COL_MEMBERS -> members = value as Set<String>
+ else -> throw IllegalArgumentException("Invalid column index $index")
}
}
+ override fun <K, V> setMap(index: Int, value: Map<K, V>) {
+ throw IllegalArgumentException("Invalid column $index")
+ }
+
override fun flush() {
generator.flush()
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
index 7a01b881..599f46f1 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
@@ -26,6 +26,9 @@ import org.opendc.trace.*
import org.opendc.trace.conv.*
import org.opendc.trace.opendc.parquet.ResourceState
import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Duration
+import java.time.Instant
+import java.util.*
/**
* A [TableReader] implementation for the OpenDC virtual machine trace format.
@@ -48,24 +51,26 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
}
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ private val COL_ID = 0
+ private val COL_TIMESTAMP = 1
+ private val COL_DURATION = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_CPU_USAGE = 4
- override fun isNull(index: Int): Boolean {
- check(index in 0..columns.size) { "Invalid column index" }
- return get(index) == null
+ override fun resolve(name: String): Int {
+ return when (name) {
+ RESOURCE_ID -> COL_ID
+ RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP
+ RESOURCE_STATE_DURATION -> COL_DURATION
+ RESOURCE_CPU_COUNT -> COL_CPU_COUNT
+ RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE
+ else -> -1
+ }
}
- override fun get(index: Int): Any? {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (index) {
- COL_ID -> record.id
- COL_TIMESTAMP -> record.timestamp
- COL_DURATION -> record.duration
- COL_CPU_COUNT -> record.cpuCount
- COL_CPU_USAGE -> record.cpuUsage
- else -> throw IllegalArgumentException("Invalid column index $index")
- }
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..COL_CPU_USAGE) { "Invalid column index" }
+ return false
}
override fun getBoolean(index: Int): Boolean {
@@ -84,6 +89,10 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
throw IllegalArgumentException("Invalid column or type [index $index]")
}
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
override fun getDouble(index: Int): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
@@ -92,23 +101,52 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
}
}
+ override fun getString(index: Int): String {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ COL_ID -> record.id
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun getInstant(index: Int): Instant {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ COL_TIMESTAMP -> record.timestamp
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ COL_DURATION -> record.duration
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
override fun close() {
reader.close()
}
override fun toString(): String = "OdcVmResourceStateTableReader"
-
- private val COL_ID = 0
- private val COL_TIMESTAMP = 1
- private val COL_DURATION = 2
- private val COL_CPU_COUNT = 3
- private val COL_CPU_USAGE = 4
-
- private val columns = mapOf(
- RESOURCE_ID to COL_ID,
- RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
- RESOURCE_STATE_DURATION to COL_DURATION,
- RESOURCE_CPU_COUNT to COL_CPU_COUNT,
- RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE,
- )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
index 97af5b59..f5e8b863 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
@@ -28,6 +28,7 @@ import org.opendc.trace.conv.*
import org.opendc.trace.opendc.parquet.ResourceState
import java.time.Duration
import java.time.Instant
+import java.util.*
/**
* A [TableWriter] implementation for the OpenDC virtual machine trace format.
@@ -64,17 +65,14 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
lastTimestamp = _timestamp
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
-
- override fun set(index: Int, value: Any) {
- check(_isActive) { "No active row" }
-
- when (index) {
- COL_ID -> _id = value as String
- COL_TIMESTAMP -> _timestamp = value as Instant
- COL_DURATION -> _duration = value as Duration
- COL_CPU_COUNT -> _cpuCount = value as Int
- COL_CPU_USAGE -> _cpuUsage = value as Double
+ override fun resolve(name: String): Int {
+ return when (name) {
+ RESOURCE_ID -> COL_ID
+ RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP
+ RESOURCE_STATE_DURATION -> COL_DURATION
+ RESOURCE_CPU_COUNT -> COL_CPU_COUNT
+ RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE
+ else -> -1
}
}
@@ -94,6 +92,10 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
throw IllegalArgumentException("Invalid column or type [index $index]")
}
+ override fun setFloat(index: Int, value: Float) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
override fun setDouble(index: Int, value: Double) {
check(_isActive) { "No active row" }
when (index) {
@@ -102,6 +104,47 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
}
}
+ override fun setString(index: Int, value: String) {
+ check(_isActive) { "No active row" }
+
+ when (index) {
+ COL_ID -> _id = value
+ }
+ }
+
+ override fun setUUID(index: Int, value: UUID) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun setInstant(index: Int, value: Instant) {
+ check(_isActive) { "No active row" }
+
+ when (index) {
+ COL_TIMESTAMP -> _timestamp = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
+
+ override fun setDuration(index: Int, value: Duration) {
+ check(_isActive) { "No active row" }
+
+ when (index) {
+ COL_DURATION -> _duration = value
+ }
+ }
+
+ override fun <T> setList(index: Int, value: List<T>) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <T> setSet(index: Int, value: Set<T>) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <K, V> setMap(index: Int, value: Map<K, V>) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
override fun flush() {
// Not available
}
@@ -121,12 +164,4 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
private val COL_DURATION = 2
private val COL_CPU_COUNT = 3
private val COL_CPU_USAGE = 4
-
- private val columns = mapOf(
- RESOURCE_ID to COL_ID,
- RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
- RESOURCE_STATE_DURATION to COL_DURATION,
- RESOURCE_CPU_COUNT to COL_CPU_COUNT,
- RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE,
- )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
index 6102332f..88f9b781 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
@@ -26,6 +26,9 @@ import org.opendc.trace.*
import org.opendc.trace.conv.*
import org.opendc.trace.opendc.parquet.Resource
import org.opendc.trace.util.parquet.LocalParquetReader
+import java.time.Duration
+import java.time.Instant
+import java.util.*
/**
* A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format.
@@ -48,25 +51,28 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
}
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ private val COL_ID = 0
+ private val COL_START_TIME = 1
+ private val COL_STOP_TIME = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_CPU_CAPACITY = 4
+ private val COL_MEM_CAPACITY = 5
- override fun isNull(index: Int): Boolean {
- check(index in 0..columns.size) { "Invalid column index" }
- return get(index) == null
+ override fun resolve(name: String): Int {
+ return when (name) {
+ RESOURCE_ID -> COL_ID
+ RESOURCE_START_TIME -> COL_START_TIME
+ RESOURCE_STOP_TIME -> COL_STOP_TIME
+ RESOURCE_CPU_COUNT -> COL_CPU_COUNT
+ RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY
+ RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY
+ else -> -1
+ }
}
- override fun get(index: Int): Any? {
- val record = checkNotNull(record) { "Reader in invalid state" }
-
- return when (index) {
- COL_ID -> record.id
- COL_START_TIME -> record.startTime
- COL_STOP_TIME -> record.stopTime
- COL_CPU_COUNT -> getInt(index)
- COL_CPU_CAPACITY -> getDouble(index)
- COL_MEM_CAPACITY -> getDouble(index)
- else -> throw IllegalArgumentException("Invalid column")
- }
+ override fun isNull(index: Int): Boolean {
+ check(index in 0..COL_MEM_CAPACITY) { "Invalid column index" }
+ return false
}
override fun getBoolean(index: Int): Boolean {
@@ -86,6 +92,10 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
throw IllegalArgumentException("Invalid column")
}
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
override fun getDouble(index: Int): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
@@ -96,25 +106,48 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
}
}
+ override fun getString(index: Int): String {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ COL_ID -> record.id
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInstant(index: Int): Instant {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+
+ return when (index) {
+ COL_START_TIME -> record.startTime
+ COL_STOP_TIME -> record.stopTime
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
override fun close() {
reader.close()
}
override fun toString(): String = "OdcVmResourceTableReader"
-
- private val COL_ID = 0
- private val COL_START_TIME = 1
- private val COL_STOP_TIME = 2
- private val COL_CPU_COUNT = 3
- private val COL_CPU_CAPACITY = 4
- private val COL_MEM_CAPACITY = 5
-
- private val columns = mapOf(
- RESOURCE_ID to COL_ID,
- RESOURCE_START_TIME to COL_START_TIME,
- RESOURCE_STOP_TIME to COL_STOP_TIME,
- RESOURCE_CPU_COUNT to COL_CPU_COUNT,
- RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY,
- RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY,
- )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
index cae65faa..8117c3cd 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
@@ -26,7 +26,9 @@ import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.*
import org.opendc.trace.conv.*
import org.opendc.trace.opendc.parquet.Resource
+import java.time.Duration
import java.time.Instant
+import java.util.*
/**
* A [TableWriter] implementation for the OpenDC virtual machine trace format.
@@ -59,18 +61,15 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
writer.write(Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity))
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
-
- override fun set(index: Int, value: Any) {
- check(_isActive) { "No active row" }
- when (index) {
- COL_ID -> _id = value as String
- COL_START_TIME -> _startTime = value as Instant
- COL_STOP_TIME -> _stopTime = value as Instant
- COL_CPU_COUNT -> _cpuCount = value as Int
- COL_CPU_CAPACITY -> _cpuCapacity = value as Double
- COL_MEM_CAPACITY -> _memCapacity = value as Double
- else -> throw IllegalArgumentException("Invalid column index $index")
+ override fun resolve(name: String): Int {
+ return when (name) {
+ RESOURCE_ID -> COL_ID
+ RESOURCE_START_TIME -> COL_START_TIME
+ RESOURCE_STOP_TIME -> COL_STOP_TIME
+ RESOURCE_CPU_COUNT -> COL_CPU_COUNT
+ RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY
+ RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY
+ else -> -1
}
}
@@ -90,6 +89,10 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
throw IllegalArgumentException("Invalid column or type [index $index]")
}
+ override fun setFloat(index: Int, value: Float) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
override fun setDouble(index: Int, value: Double) {
check(_isActive) { "No active row" }
when (index) {
@@ -99,6 +102,43 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
}
}
+ override fun setString(index: Int, value: String) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_ID -> _id = value
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun setUUID(index: Int, value: UUID) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun setInstant(index: Int, value: Instant) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_START_TIME -> _startTime = value
+ COL_STOP_TIME -> _stopTime = value
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
+ }
+
+ override fun setDuration(index: Int, value: Duration) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <T> setList(index: Int, value: List<T>) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <T> setSet(index: Int, value: Set<T>) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
+ override fun <K, V> setMap(index: Int, value: Map<K, V>) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+
override fun flush() {
// Not available
}
@@ -113,13 +153,4 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
private val COL_CPU_COUNT = 3
private val COL_CPU_CAPACITY = 4
private val COL_MEM_CAPACITY = 5
-
- private val columns = mapOf(
- RESOURCE_ID to COL_ID,
- RESOURCE_START_TIME to COL_START_TIME,
- RESOURCE_STOP_TIME to COL_STOP_TIME,
- RESOURCE_CPU_COUNT to COL_CPU_COUNT,
- RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY,
- RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY,
- )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index d45910c6..a9c5b934 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -73,36 +73,35 @@ public class OdcVmTraceFormat : TraceFormat {
return when (table) {
TABLE_RESOURCES -> TableDetails(
listOf(
- RESOURCE_ID,
- RESOURCE_START_TIME,
- RESOURCE_STOP_TIME,
- RESOURCE_CPU_COUNT,
- RESOURCE_CPU_CAPACITY,
- RESOURCE_MEM_CAPACITY,
+ TableColumn(RESOURCE_ID, TableColumnType.String),
+ TableColumn(RESOURCE_START_TIME, TableColumnType.Instant),
+ TableColumn(RESOURCE_STOP_TIME, TableColumnType.Instant),
+ TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int),
+ TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double),
+ TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double),
)
)
TABLE_RESOURCE_STATES -> TableDetails(
listOf(
- RESOURCE_ID,
- RESOURCE_STATE_TIMESTAMP,
- RESOURCE_STATE_DURATION,
- RESOURCE_CPU_COUNT,
- RESOURCE_STATE_CPU_USAGE,
- ),
- listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP)
+ TableColumn(RESOURCE_ID, TableColumnType.String),
+ TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant),
+ TableColumn(RESOURCE_STATE_DURATION, TableColumnType.Duration),
+ TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int),
+ TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double),
+ )
)
TABLE_INTERFERENCE_GROUPS -> TableDetails(
listOf(
- INTERFERENCE_GROUP_MEMBERS,
- INTERFERENCE_GROUP_TARGET,
- INTERFERENCE_GROUP_SCORE,
+ TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double),
+ TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double),
)
)
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
return when (table) {
TABLE_RESOURCES -> {
val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection))
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
index 0d70446d..8a8ed790 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
@@ -33,11 +33,11 @@ import org.opendc.trace.conv.*
/**
* A [ReadSupport] instance for [Resource] objects.
*/
-internal class ResourceReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Resource>() {
+internal class ResourceReadSupport(private val projection: List<String>?) : ReadSupport<Resource>() {
/**
* Mapping from field names to [TableColumn]s.
*/
- private val fieldMap = mapOf<String, TableColumn<*>>(
+ private val fieldMap = mapOf(
"id" to RESOURCE_ID,
"submissionTime" to RESOURCE_START_TIME,
"start_time" to RESOURCE_START_TIME,
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
index 97aa00b2..78adc649 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
@@ -33,11 +33,11 @@ import org.opendc.trace.conv.*
/**
* A [ReadSupport] instance for [ResourceState] objects.
*/
-internal class ResourceStateReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<ResourceState>() {
+internal class ResourceStateReadSupport(private val projection: List<String>?) : ReadSupport<ResourceState>() {
/**
* Mapping from field names to [TableColumn]s.
*/
- private val fieldMap = mapOf<String, TableColumn<*>>(
+ private val fieldMap = mapOf(
"id" to RESOURCE_ID,
"time" to RESOURCE_STATE_TIMESTAMP,
"timestamp" to RESOURCE_STATE_TIMESTAMP,
diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
index 1f4f6195..ae6e62d8 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
@@ -67,14 +67,14 @@ internal class OdcVmTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.get(RESOURCE_ID)) },
- { assertEquals(Instant.ofEpochMilli(1376314846000), reader.get(RESOURCE_START_TIME)) },
+ { assertEquals("1019", reader.getString(RESOURCE_ID)) },
+ { assertEquals(Instant.ofEpochMilli(1376314846000), reader.getInstant(RESOURCE_START_TIME)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals("1023", reader.get(RESOURCE_ID)) },
+ { assertEquals("1023", reader.getString(RESOURCE_ID)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals("1052", reader.get(RESOURCE_ID)) },
+ { assertEquals("1052", reader.getString(RESOURCE_ID)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals("1073", reader.get(RESOURCE_ID)) },
+ { assertEquals("1073", reader.getString(RESOURCE_ID)) },
{ assertFalse(reader.nextRow()) }
)
@@ -87,9 +87,9 @@ internal class OdcVmTraceFormatTest {
val writer = format.newWriter(path, TABLE_RESOURCES)
writer.startRow()
- writer.set(RESOURCE_ID, "1019")
- writer.set(RESOURCE_START_TIME, Instant.EPOCH)
- writer.set(RESOURCE_STOP_TIME, Instant.EPOCH)
+ writer.setString(RESOURCE_ID, "1019")
+ writer.setInstant(RESOURCE_START_TIME, Instant.EPOCH)
+ writer.setInstant(RESOURCE_STOP_TIME, Instant.EPOCH)
writer.setInt(RESOURCE_CPU_COUNT, 1)
writer.setDouble(RESOURCE_CPU_CAPACITY, 1024.0)
writer.setDouble(RESOURCE_MEM_CAPACITY, 1024.0)
@@ -100,9 +100,9 @@ internal class OdcVmTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.get(RESOURCE_ID)) },
- { assertEquals(Instant.EPOCH, reader.get(RESOURCE_START_TIME)) },
- { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STOP_TIME)) },
+ { assertEquals("1019", reader.getString(RESOURCE_ID)) },
+ { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_START_TIME)) },
+ { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_STOP_TIME)) },
{ assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
{ assertEquals(1024.0, reader.getDouble(RESOURCE_CPU_CAPACITY)) },
{ assertEquals(1024.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) },
@@ -124,8 +124,8 @@ internal class OdcVmTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.get(RESOURCE_ID)) },
- { assertEquals(1376314846, reader.get(RESOURCE_STATE_TIMESTAMP).epochSecond) },
+ { assertEquals("1019", reader.getString(RESOURCE_ID)) },
+ { assertEquals(1376314846, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) },
{ assertEquals(0.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) }
)
@@ -138,8 +138,8 @@ internal class OdcVmTraceFormatTest {
val writer = format.newWriter(path, TABLE_RESOURCE_STATES)
writer.startRow()
- writer.set(RESOURCE_ID, "1019")
- writer.set(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH)
+ writer.setString(RESOURCE_ID, "1019")
+ writer.setInstant(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH)
writer.setDouble(RESOURCE_STATE_CPU_USAGE, 23.0)
writer.setInt(RESOURCE_CPU_COUNT, 1)
writer.endRow()
@@ -149,8 +149,8 @@ internal class OdcVmTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.get(RESOURCE_ID)) },
- { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STATE_TIMESTAMP)) },
+ { assertEquals("1019", reader.getString(RESOURCE_ID)) },
+ { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_STATE_TIMESTAMP)) },
{ assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
{ assertEquals(23.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE)) },
{ assertFalse(reader.nextRow()) },
@@ -170,13 +170,13 @@ internal class OdcVmTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals(setOf("1019", "1023", "1052"), reader.get(INTERFERENCE_GROUP_MEMBERS)) },
- { assertEquals(0.0, reader.get(INTERFERENCE_GROUP_TARGET)) },
- { assertEquals(0.8830158730158756, reader.get(INTERFERENCE_GROUP_SCORE)) },
+ { assertEquals(setOf("1019", "1023", "1052"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) },
+ { assertEquals(0.0, reader.getDouble(INTERFERENCE_GROUP_TARGET)) },
+ { assertEquals(0.8830158730158756, reader.getDouble(INTERFERENCE_GROUP_SCORE)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals(setOf("1023", "1052", "1073"), reader.get(INTERFERENCE_GROUP_MEMBERS)) },
- { assertEquals(0.0, reader.get(INTERFERENCE_GROUP_TARGET)) },
- { assertEquals(0.7133055555552751, reader.get(INTERFERENCE_GROUP_SCORE)) },
+ { assertEquals(setOf("1023", "1052", "1073"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) },
+ { assertEquals(0.0, reader.getDouble(INTERFERENCE_GROUP_TARGET)) },
+ { assertEquals(0.7133055555552751, reader.getDouble(INTERFERENCE_GROUP_SCORE)) },
{ assertFalse(reader.nextRow()) }
)
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
index 40b604c3..4d0a9008 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
@@ -27,6 +27,7 @@ import org.opendc.trace.conv.*
import java.io.BufferedReader
import java.time.Duration
import java.time.Instant
+import java.util.*
/**
* A [TableReader] implementation for the SWF format.
@@ -70,43 +71,92 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
return true
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ override fun resolve(name: String): Int {
+ return when (name) {
+ TASK_ID -> COL_JOB_ID
+ TASK_SUBMIT_TIME -> COL_SUBMIT_TIME
+ TASK_WAIT_TIME -> COL_WAIT_TIME
+ TASK_RUNTIME -> COL_RUN_TIME
+ TASK_ALLOC_NCPUS -> COL_ALLOC_NCPUS
+ TASK_REQ_NCPUS -> COL_REQ_NCPUS
+ TASK_STATUS -> COL_STATUS
+ TASK_USER_ID -> COL_USER_ID
+ TASK_GROUP_ID -> COL_GROUP_ID
+ TASK_PARENTS -> COL_PARENT_JOB
+ else -> -1
+ }
+ }
override fun isNull(index: Int): Boolean {
- require(index in columns.values) { "Invalid column index" }
+ require(index in COL_JOB_ID..COL_PARENT_THINK_TIME) { "Invalid column index" }
return false
}
- override fun get(index: Int): Any? {
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(index: Int): Int {
+ return when (index) {
+ COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> fields[index].toInt(10)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(index: Int): Double {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getString(index: Int): String {
return when (index) {
COL_JOB_ID -> fields[index]
- COL_SUBMIT_TIME -> Instant.ofEpochSecond(fields[index].toLong(10))
- COL_WAIT_TIME, COL_RUN_TIME -> Duration.ofSeconds(fields[index].toLong(10))
- COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> getInt(index)
- COL_PARENT_JOB -> {
- val parent = fields[index].toLong(10)
- if (parent < 0) emptySet() else setOf(parent)
- }
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getBoolean(index: Int): Boolean {
+ override fun getUUID(index: Int): UUID? {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(index: Int): Int {
+ override fun getInstant(index: Int): Instant? {
return when (index) {
- COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> fields[index].toInt(10)
+ COL_SUBMIT_TIME -> Instant.ofEpochSecond(fields[index].toLong(10))
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(index: Int): Long {
+ override fun getDuration(index: Int): Duration? {
+ return when (index) {
+ COL_WAIT_TIME, COL_RUN_TIME -> Duration.ofSeconds(fields[index].toLong(10))
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(index: Int): Double {
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ @Suppress("UNCHECKED_CAST")
+ return when (index) {
+ COL_PARENT_JOB -> {
+ require(elementType.isAssignableFrom(String::class.java))
+ val parent = fields[index].toLong(10)
+ if (parent < 0) emptySet() else setOf(parent)
+ }
+ else -> throw IllegalArgumentException("Invalid column")
+ } as Set<T>?
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
throw IllegalArgumentException("Invalid column")
}
@@ -135,17 +185,4 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
private val COL_PART_NUM = 15
private val COL_PARENT_JOB = 16
private val COL_PARENT_THINK_TIME = 17
-
- private val columns = mapOf(
- TASK_ID to COL_JOB_ID,
- TASK_SUBMIT_TIME to COL_SUBMIT_TIME,
- TASK_WAIT_TIME to COL_WAIT_TIME,
- TASK_RUNTIME to COL_RUN_TIME,
- TASK_ALLOC_NCPUS to COL_ALLOC_NCPUS,
- TASK_REQ_NCPUS to COL_REQ_NCPUS,
- TASK_STATUS to COL_STATUS,
- TASK_USER_ID to COL_USER_ID,
- TASK_GROUP_ID to COL_GROUP_ID,
- TASK_PARENTS to COL_PARENT_JOB
- )
}
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
index 916a5eca..40f98a01 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
@@ -47,24 +47,23 @@ public class SwfTraceFormat : TraceFormat {
return when (table) {
TABLE_TASKS -> TableDetails(
listOf(
- TASK_ID,
- TASK_SUBMIT_TIME,
- TASK_WAIT_TIME,
- TASK_RUNTIME,
- TASK_REQ_NCPUS,
- TASK_ALLOC_NCPUS,
- TASK_PARENTS,
- TASK_STATUS,
- TASK_GROUP_ID,
- TASK_USER_ID
- ),
- emptyList()
+ TableColumn(TASK_ID, TableColumnType.String),
+ TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
+ TableColumn(TASK_WAIT_TIME, TableColumnType.Duration),
+ TableColumn(TASK_RUNTIME, TableColumnType.Duration),
+ TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_STATUS, TableColumnType.String),
+ TableColumn(TASK_GROUP_ID, TableColumnType.Int),
+ TableColumn(TASK_USER_ID, TableColumnType.Int)
+ )
)
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
return when (table) {
TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader())
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
index c3d644e8..afecdbb9 100644
--- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
@@ -62,10 +62,10 @@ internal class SwfTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1", reader.get(TASK_ID)) },
+ { assertEquals("1", reader.getString(TASK_ID)) },
{ assertEquals(306, reader.getInt(TASK_ALLOC_NCPUS)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals("2", reader.get(TASK_ID)) },
+ { assertEquals("2", reader.getString(TASK_ID)) },
{ assertEquals(17, reader.getInt(TASK_ALLOC_NCPUS)) },
)
diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt
index 970de0f4..b6d661e0 100644
--- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt
+++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt
@@ -224,9 +224,9 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
var stopTime = Long.MIN_VALUE
do {
- id = reader.get(idCol) as String
+ id = reader.getString(idCol)!!
- val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli()
+ val timestamp = reader.getInstant(timestampCol)!!.toEpochMilli()
startTime = min(startTime, timestamp)
stopTime = max(stopTime, timestamp)
@@ -238,7 +238,7 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
}
hasNextRow = reader.nextRow()
- } while (hasNextRow && id == reader.get(RESOURCE_ID))
+ } while (hasNextRow && id == reader.getString(RESOURCE_ID))
// Sample only a fraction of the VMs
if (random != null && random.nextDouble() > samplingFraction) {
@@ -255,9 +255,9 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
}
writer.startRow()
- writer.set(RESOURCE_ID, id)
- writer.set(RESOURCE_START_TIME, startInstant)
- writer.set(RESOURCE_STOP_TIME, stopInstant)
+ writer.setString(RESOURCE_ID, id)
+ writer.setInstant(RESOURCE_START_TIME, startInstant)
+ writer.setInstant(RESOURCE_STOP_TIME, stopInstant)
writer.setInt(RESOURCE_CPU_COUNT, cpuCount)
writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity)
writer.setDouble(RESOURCE_MEM_CAPACITY, max(memCapacity, memUsage))
@@ -280,7 +280,7 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
var count = 0
while (hasNextRow) {
- val id = reader.get(idCol) as String
+ val id = reader.getString(idCol)!!
val resource = selected[id]
if (resource == null) {
hasNextRow = reader.nextRow()
@@ -290,13 +290,13 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
val cpuCount = reader.getInt(cpuCountCol)
val cpuUsage = reader.getDouble(cpuUsageCol)
- val startTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli()
+ val startTimestamp = reader.getInstant(timestampCol)!!.toEpochMilli()
var timestamp: Long = startTimestamp
var duration: Long = sampleInterval
// Attempt to cascade further samples into one if they share the same CPU usage
while (reader.nextRow().also { hasNextRow = it }) {
- val shouldCascade = id == reader.get(idCol) &&
+ val shouldCascade = id == reader.getString(idCol) &&
abs(cpuUsage - reader.getDouble(cpuUsageCol)) < SAMPLE_CASCADE_DIFF &&
cpuCount == reader.getInt(cpuCountCol)
@@ -308,7 +308,7 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
break
}
- val nextTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli()
+ val nextTimestamp = reader.getInstant(timestampCol)!!.toEpochMilli()
// Check whether the interval between both samples is not higher than `SAMPLE_INTERVAL`
if ((nextTimestamp - timestamp) > sampleInterval) {
@@ -320,9 +320,9 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
}
writer.startRow()
- writer.set(RESOURCE_ID, id)
- writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(timestamp))
- writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration))
+ writer.setString(RESOURCE_ID, id)
+ writer.setInstant(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(timestamp))
+ writer.setDuration(RESOURCE_STATE_DURATION, Duration.ofMillis(duration))
writer.setInt(RESOURCE_CPU_COUNT, cpuCount)
writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage)
writer.endRow()
@@ -377,9 +377,9 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
continue
}
- val id = reader.get(idCol) as String
- val startTime = (reader.get(startTimeCol) as Instant).toEpochMilli()
- val stopTime = (reader.get(stopTimeCol) as Instant).toEpochMilli()
+ val id = reader.getString(idCol)!!
+ val startTime = reader.getInstant(startTimeCol)!!.toEpochMilli()
+ val stopTime = reader.getInstant(stopTimeCol)!!.toEpochMilli()
val cpuCount = reader.getInt(cpuCountCol)
val memCapacity = reader.getDouble(memCapacityCol)
@@ -394,9 +394,9 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
}
writer.startRow()
- writer.set(RESOURCE_ID, id)
- writer.set(RESOURCE_START_TIME, startInstant)
- writer.set(RESOURCE_STOP_TIME, stopInstant)
+ writer.setString(RESOURCE_ID, id)
+ writer.setInstant(RESOURCE_START_TIME, startInstant)
+ writer.setInstant(RESOURCE_STOP_TIME, stopInstant)
writer.setInt(RESOURCE_CPU_COUNT, cpuCount)
writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity)
writer.setDouble(RESOURCE_MEM_CAPACITY, memCapacity)
@@ -418,12 +418,12 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
var count = 0
while (reader.nextRow()) {
- val id = reader.get(idCol) as String
+ val id = reader.getString(idCol)!!
val resource = selected[id] ?: continue
val cpuUsage = reader.getDouble(cpuUsageCol) * resource.cpuCapacity // MHz
val state = states.computeIfAbsent(id) { State(resource, cpuUsage, sampleInterval) }
- val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli()
+ val timestamp = reader.getInstant(timestampCol)!!.toEpochMilli()
val delta = (timestamp - state.time)
// Check whether the next sample can be cascaded with the current sample:
@@ -463,9 +463,9 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
lastWrite = time
writer.startRow()
- writer.set(RESOURCE_ID, resource.id)
- writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(time))
- writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration))
+ writer.setString(RESOURCE_ID, resource.id)
+ writer.setInstant(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(time))
+ writer.setDuration(RESOURCE_STATE_DURATION, Duration.ofMillis(duration))
writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage)
writer.setInt(RESOURCE_CPU_COUNT, resource.cpuCount)
writer.endRow()
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
index d8eafa9c..0be9dec6 100644
--- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
@@ -27,7 +27,10 @@ import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.core.JsonToken
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.util.convertTo
import java.time.Duration
+import java.time.Instant
+import java.util.*
import kotlin.math.roundToInt
/**
@@ -95,41 +98,82 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe
return hasJob
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ override fun resolve(name: String): Int {
+ return when (name) {
+ TASK_ID -> COL_ID
+ TASK_WORKFLOW_ID -> COL_WORKFLOW_ID
+ TASK_RUNTIME -> COL_RUNTIME
+ TASK_REQ_NCPUS -> COL_NPROC
+ TASK_PARENTS -> COL_PARENTS
+ TASK_CHILDREN -> COL_CHILDREN
+ else -> -1
+ }
+ }
override fun isNull(index: Int): Boolean {
- check(index in 0..columns.size) { "Invalid column value" }
+ check(index in 0..COL_CHILDREN) { "Invalid column value" }
return false
}
- override fun get(index: Int): Any? {
+ override fun getBoolean(index: Int): Boolean {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getInt(index: Int): Int {
+ return when (index) {
+ COL_NPROC -> cores
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getLong(index: Int): Long {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDouble(index: Int): Double {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getString(index: Int): String? {
return when (index) {
COL_ID -> id
COL_WORKFLOW_ID -> workflowId
- COL_RUNTIME -> runtime
- COL_PARENTS -> parents
- COL_CHILDREN -> children
- COL_NPROC -> getInt(index)
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getBoolean(index: Int): Boolean {
+ override fun getUUID(index: Int): UUID? {
throw IllegalArgumentException("Invalid column")
}
- override fun getInt(index: Int): Int {
+ override fun getInstant(index: Int): Instant? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun getDuration(index: Int): Duration? {
return when (index) {
- COL_NPROC -> cores
+ COL_RUNTIME -> runtime
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun getLong(index: Int): Long {
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun getDouble(index: Int): Double {
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ return when (index) {
+ COL_PARENTS -> TYPE_PARENTS.convertTo(parents, elementType)
+ COL_CHILDREN -> TYPE_CHILDREN.convertTo(children, elementType)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
throw IllegalArgumentException("Invalid column")
}
@@ -232,12 +276,6 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe
private val COL_PARENTS = 5
private val COL_CHILDREN = 6
- private val columns = mapOf(
- TASK_ID to COL_ID,
- TASK_WORKFLOW_ID to COL_WORKFLOW_ID,
- TASK_RUNTIME to COL_RUNTIME,
- TASK_REQ_NCPUS to COL_NPROC,
- TASK_PARENTS to COL_PARENTS,
- TASK_CHILDREN to COL_CHILDREN,
- )
+ private val TYPE_PARENTS = TableColumnType.Set(TableColumnType.String)
+ private val TYPE_CHILDREN = TableColumnType.Set(TableColumnType.String)
}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
index 8db4c169..154fa061 100644
--- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
@@ -50,20 +50,19 @@ public class WfFormatTraceFormat : TraceFormat {
return when (table) {
TABLE_TASKS -> TableDetails(
listOf(
- TASK_ID,
- TASK_WORKFLOW_ID,
- TASK_RUNTIME,
- TASK_REQ_NCPUS,
- TASK_PARENTS,
- TASK_CHILDREN
- ),
- emptyList()
+ TableColumn(TASK_ID, TableColumnType.String),
+ TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
+ TableColumn(TASK_RUNTIME, TableColumnType.Duration),
+ TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String))
+ )
)
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
return when (table) {
TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile()))
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt
index e27bc82c..9d9735b1 100644
--- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt
@@ -210,7 +210,7 @@ internal class WfFormatTaskTableReaderTest {
val reader = WfFormatTaskTableReader(parser)
assertTrue(reader.nextRow())
- assertEquals("test", reader.get(TASK_ID))
+ assertEquals("test", reader.getString(TASK_ID))
assertFalse(reader.nextRow())
reader.close()
@@ -281,7 +281,7 @@ internal class WfFormatTaskTableReaderTest {
val reader = WfFormatTaskTableReader(parser)
assertTrue(reader.nextRow())
- assertEquals(setOf("1"), reader.get(TASK_PARENTS))
+ assertEquals(setOf("1"), reader.getSet(TASK_PARENTS, String::class.java))
assertFalse(reader.nextRow())
reader.close()
@@ -337,7 +337,7 @@ internal class WfFormatTaskTableReaderTest {
assertTrue(reader.nextRow())
assertTrue(reader.nextRow())
- assertEquals("test2", reader.get(TASK_ID))
+ assertEquals("test2", reader.getString(TASK_ID))
assertFalse(reader.nextRow())
reader.close()
diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
index 4a8b2792..a460c5f6 100644
--- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
@@ -66,18 +66,18 @@ class WfFormatTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("makebwaindex_mammoth_mt_krause.fasta", reader.get(TASK_ID)) },
- { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) },
- { assertEquals(172000, reader.get(TASK_RUNTIME).toMillis()) },
- { assertEquals(emptySet<String>(), reader.get(TASK_PARENTS)) },
+ { assertEquals("makebwaindex_mammoth_mt_krause.fasta", reader.getString(TASK_ID)) },
+ { assertEquals("eager-nextflow-chameleon", reader.getString(TASK_WORKFLOW_ID)) },
+ { assertEquals(172000, reader.getDuration(TASK_RUNTIME)?.toMillis()) },
+ { assertEquals(emptySet<String>(), reader.getSet(TASK_PARENTS, String::class.java)) },
)
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("makeseqdict_mammoth_mt_krause.fasta", reader.get(TASK_ID)) },
- { assertEquals("eager-nextflow-chameleon", reader.get(TASK_WORKFLOW_ID)) },
- { assertEquals(175000, reader.get(TASK_RUNTIME).toMillis()) },
- { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.get(TASK_PARENTS)) },
+ { assertEquals("makeseqdict_mammoth_mt_krause.fasta", reader.getString(TASK_ID)) },
+ { assertEquals("eager-nextflow-chameleon", reader.getString(TASK_WORKFLOW_ID)) },
+ { assertEquals(175000, reader.getDuration(TASK_RUNTIME)?.toMillis()) },
+ { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.getSet(TASK_PARENTS, String::class.java)) },
)
reader.close()
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
index f0db78b7..bb5eb668 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
@@ -24,8 +24,12 @@ package org.opendc.trace.wtf
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.util.convertTo
import org.opendc.trace.util.parquet.LocalParquetReader
import org.opendc.trace.wtf.parquet.Task
+import java.time.Duration
+import java.time.Instant
+import java.util.*
/**
* A [TableReader] implementation for the WTF format.
@@ -48,26 +52,39 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>)
}
}
- override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
+ private val COL_ID = 0
+ private val COL_WORKFLOW_ID = 1
+ private val COL_SUBMIT_TIME = 2
+ private val COL_WAIT_TIME = 3
+ private val COL_RUNTIME = 4
+ private val COL_REQ_NCPUS = 5
+ private val COL_PARENTS = 6
+ private val COL_CHILDREN = 7
+ private val COL_GROUP_ID = 8
+ private val COL_USER_ID = 9
- override fun isNull(index: Int): Boolean {
- check(index in 0..columns.size) { "Invalid column index" }
- return get(index) == null
+ private val TYPE_PARENTS = TableColumnType.Set(TableColumnType.String)
+ private val TYPE_CHILDREN = TableColumnType.Set(TableColumnType.String)
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ TASK_ID -> COL_ID
+ TASK_WORKFLOW_ID -> COL_WORKFLOW_ID
+ TASK_SUBMIT_TIME -> COL_SUBMIT_TIME
+ TASK_WAIT_TIME -> COL_WAIT_TIME
+ TASK_RUNTIME -> COL_RUNTIME
+ TASK_REQ_NCPUS -> COL_REQ_NCPUS
+ TASK_PARENTS -> COL_PARENTS
+ TASK_CHILDREN -> COL_CHILDREN
+ TASK_GROUP_ID -> COL_GROUP_ID
+ TASK_USER_ID -> COL_USER_ID
+ else -> -1
+ }
}
- override fun get(index: Int): Any? {
- val record = checkNotNull(record) { "Reader in invalid state" }
- return when (index) {
- COL_ID -> record.id
- COL_WORKFLOW_ID -> record.workflowId
- COL_SUBMIT_TIME -> record.submitTime
- COL_WAIT_TIME -> record.waitTime
- COL_RUNTIME -> record.runtime
- COL_REQ_NCPUS, COL_GROUP_ID, COL_USER_ID -> getInt(index)
- COL_PARENTS -> record.parents
- COL_CHILDREN -> record.children
- else -> throw IllegalArgumentException("Invalid column")
- }
+ override fun isNull(index: Int): Boolean {
+ check(index in COL_ID..COL_USER_ID) { "Invalid column index" }
+ return false
}
override fun getBoolean(index: Int): Boolean {
@@ -89,35 +106,62 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>)
throw IllegalArgumentException("Invalid column")
}
+ override fun getFloat(index: Int): Float {
+ throw IllegalArgumentException("Invalid column")
+ }
+
override fun getDouble(index: Int): Double {
throw IllegalArgumentException("Invalid column")
}
- override fun close() {
- reader.close()
+ override fun getString(index: Int): String {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (index) {
+ COL_ID -> record.id
+ COL_WORKFLOW_ID -> record.workflowId
+ else -> throw IllegalArgumentException("Invalid column")
+ }
}
- private val COL_ID = 0
- private val COL_WORKFLOW_ID = 1
- private val COL_SUBMIT_TIME = 2
- private val COL_WAIT_TIME = 3
- private val COL_RUNTIME = 4
- private val COL_REQ_NCPUS = 5
- private val COL_PARENTS = 6
- private val COL_CHILDREN = 7
- private val COL_GROUP_ID = 8
- private val COL_USER_ID = 9
+ override fun getUUID(index: Int): UUID? {
+ throw IllegalArgumentException("Invalid column")
+ }
- private val columns = mapOf(
- TASK_ID to COL_ID,
- TASK_WORKFLOW_ID to COL_WORKFLOW_ID,
- TASK_SUBMIT_TIME to COL_SUBMIT_TIME,
- TASK_WAIT_TIME to COL_WAIT_TIME,
- TASK_RUNTIME to COL_RUNTIME,
- TASK_REQ_NCPUS to COL_REQ_NCPUS,
- TASK_PARENTS to COL_PARENTS,
- TASK_CHILDREN to COL_CHILDREN,
- TASK_GROUP_ID to COL_GROUP_ID,
- TASK_USER_ID to COL_USER_ID,
- )
+ override fun getInstant(index: Int): Instant {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (index) {
+ COL_SUBMIT_TIME -> record.submitTime
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun getDuration(index: Int): Duration {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (index) {
+ COL_WAIT_TIME -> record.waitTime
+ COL_RUNTIME -> record.runtime
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ val record = checkNotNull(record) { "Reader in invalid state" }
+ return when (index) {
+ COL_PARENTS -> TYPE_PARENTS.convertTo(record.parents, elementType)
+ COL_CHILDREN -> TYPE_CHILDREN.convertTo(record.children, elementType)
+ else -> throw IllegalArgumentException("Invalid column")
+ }
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ throw IllegalArgumentException("Invalid column")
+ }
+
+ override fun close() {
+ reader.close()
+ }
}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
index e71253ac..c8408626 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
@@ -46,24 +46,23 @@ public class WtfTraceFormat : TraceFormat {
return when (table) {
TABLE_TASKS -> TableDetails(
listOf(
- TASK_ID,
- TASK_WORKFLOW_ID,
- TASK_SUBMIT_TIME,
- TASK_WAIT_TIME,
- TASK_RUNTIME,
- TASK_REQ_NCPUS,
- TASK_PARENTS,
- TASK_CHILDREN,
- TASK_GROUP_ID,
- TASK_USER_ID
- ),
- listOf(TASK_SUBMIT_TIME)
+ TableColumn(TASK_ID, TableColumnType.String),
+ TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
+ TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
+ TableColumn(TASK_WAIT_TIME, TableColumnType.Duration),
+ TableColumn(TASK_RUNTIME, TableColumnType.Duration),
+ TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_GROUP_ID, TableColumnType.Int),
+ TableColumn(TASK_USER_ID, TableColumnType.Int)
+ )
)
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
return when (table) {
TABLE_TASKS -> {
val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection))
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
index 8e7325de..a6087d9f 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
@@ -27,7 +27,6 @@ import org.apache.parquet.hadoop.api.InitContext
import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.io.api.RecordMaterializer
import org.apache.parquet.schema.*
-import org.opendc.trace.TableColumn
import org.opendc.trace.conv.*
/**
@@ -35,11 +34,11 @@ import org.opendc.trace.conv.*
*
* @param projection The projection of the table to read.
*/
-internal class TaskReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Task>() {
+internal class TaskReadSupport(private val projection: List<String>?) : ReadSupport<Task>() {
/**
* Mapping of table columns to their Parquet column names.
*/
- private val colMap = mapOf<TableColumn<*>, String>(
+ private val colMap = mapOf(
TASK_ID to "id",
TASK_WORKFLOW_ID to "workflow_id",
TASK_SUBMIT_TIME to "ts_submit",
diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
index c0eb3f08..2312035a 100644
--- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
@@ -65,32 +65,28 @@ class WtfTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("362334516345962206", reader.get(TASK_ID)) },
- { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) },
- { assertEquals(Instant.ofEpochMilli(245604), reader.get(TASK_SUBMIT_TIME)) },
- { assertEquals(Duration.ofMillis(8163), reader.get(TASK_RUNTIME)) },
+ { assertEquals("362334516345962206", reader.getString(TASK_ID)) },
+ { assertEquals("1078341553348591493", reader.getString(TASK_WORKFLOW_ID)) },
+ { assertEquals(Instant.ofEpochMilli(245604), reader.getInstant(TASK_SUBMIT_TIME)) },
+ { assertEquals(Duration.ofMillis(8163), reader.getDuration(TASK_RUNTIME)) },
{
assertEquals(
setOf("584055316413447529", "133113685133695608", "1008582348422865408"),
- reader.get(
- TASK_PARENTS
- )
+ reader.getSet(TASK_PARENTS, String::class.java)
)
},
)
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("502010169100446658", reader.get(TASK_ID)) },
- { assertEquals("1078341553348591493", reader.get(TASK_WORKFLOW_ID)) },
- { assertEquals(Instant.ofEpochMilli(251325), reader.get(TASK_SUBMIT_TIME)) },
- { assertEquals(Duration.ofMillis(8216), reader.get(TASK_RUNTIME)) },
+ { assertEquals("502010169100446658", reader.getString(TASK_ID)) },
+ { assertEquals("1078341553348591493", reader.getString(TASK_WORKFLOW_ID)) },
+ { assertEquals(Instant.ofEpochMilli(251325), reader.getInstant(TASK_SUBMIT_TIME)) },
+ { assertEquals(Duration.ofMillis(8216), reader.getDuration(TASK_RUNTIME)) },
{
assertEquals(
setOf("584055316413447529", "133113685133695608", "1008582348422865408"),
- reader.get(
- TASK_PARENTS
- )
+ reader.getSet(TASK_PARENTS, String::class.java)
)
},
)
diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt
index 3aa4463c..5f57723b 100644
--- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt
+++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/TraceHelpers.kt
@@ -49,16 +49,16 @@ public fun Trace.toJobs(): List<Job> {
try {
while (reader.nextRow()) {
// Bag of tasks without workflow ID all share the same workflow
- val workflowId = if (reader.hasColumn(TASK_WORKFLOW_ID)) reader.get(TASK_WORKFLOW_ID).toLong() else 0L
+ val workflowId = if (reader.resolve(TASK_WORKFLOW_ID) != -1) reader.getString(TASK_WORKFLOW_ID)!!.toLong() else 0L
val workflow = jobs.computeIfAbsent(workflowId) { id -> Job(UUID(0L, id), "<unnamed>", HashSet(), HashMap()) }
- val id = reader.get(TASK_ID).toLong()
- val grantedCpus = if (reader.hasColumn(TASK_ALLOC_NCPUS))
+ val id = reader.getString(TASK_ID)!!.toLong()
+ val grantedCpus = if (reader.resolve(TASK_ALLOC_NCPUS) != 0)
reader.getInt(TASK_ALLOC_NCPUS)
else
reader.getInt(TASK_REQ_NCPUS)
- val submitTime = reader.get(TASK_SUBMIT_TIME)
- val runtime = reader.get(TASK_RUNTIME)
+ val submitTime = reader.getInstant(TASK_SUBMIT_TIME)!!
+ val runtime = reader.getDuration(TASK_RUNTIME)!!
val flops: Long = 4000 * runtime.seconds * grantedCpus
val workload = SimFlopsWorkload(flops)
val task = Task(
@@ -73,7 +73,7 @@ public fun Trace.toJobs(): List<Job> {
)
tasks[id] = task
- taskDependencies[task] = reader.get(TASK_PARENTS).map { it.toLong() }.toSet()
+ taskDependencies[task] = reader.getSet(TASK_PARENTS, String::class.java)!!.map { it.toLong() }.toSet()
(workflow.metadata as MutableMap<String, Any>).merge("WORKFLOW_SUBMIT_TIME", submitTime.toEpochMilli()) { a, b -> min(a as Long, b as Long) }
(workflow.tasks as MutableSet<Task>).add(task)