summaryrefslogtreecommitdiff
path: root/opendc-trace
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-trace')
-rw-r--r--opendc-trace/opendc-trace-api/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt2
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt33
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt120
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt20
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt14
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt24
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt16
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt16
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt16
-rw-r--r--opendc-trace/opendc-trace-azure/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-azure/src/jmh/kotlin/org/opendc/trace/azure/AzureTraceBenchmarks.kt6
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt59
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt89
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt76
-rw-r--r--opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt22
-rw-r--r--opendc-trace/opendc-trace-bitbrains/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt148
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt80
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt180
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt32
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt105
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt8
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt14
-rw-r--r--opendc-trace/opendc-trace-calcite/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt8
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt6
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt23
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt67
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt14
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt59
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt2
-rw-r--r--opendc-trace/opendc-trace-gwf/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt87
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt44
-rw-r--r--opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt4
-rw-r--r--opendc-trace/opendc-trace-opendc/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt6
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt43
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt78
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt58
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt146
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt66
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt150
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt124
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt2
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt140
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt132
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt2
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt119
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt114
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt48
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt56
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt106
-rw-r--r--opendc-trace/opendc-trace-parquet/build.gradle.kts4
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt87
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt9
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt32
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt8
-rw-r--r--opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt112
-rw-r--r--opendc-trace/opendc-trace-swf/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt88
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt43
-rw-r--r--opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt2
-rw-r--r--opendc-trace/opendc-trace-testkit/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableReaderTestKit.kt24
-rw-r--r--opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableWriterTestKit.kt4
-rw-r--r--opendc-trace/opendc-trace-tools/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt178
-rw-r--r--opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt25
-rw-r--r--opendc-trace/opendc-trace-wfformat/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt78
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt35
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt251
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt4
-rw-r--r--opendc-trace/opendc-trace-wtf/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt84
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt43
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt2
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt122
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt217
-rw-r--r--opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt8
82 files changed, 2470 insertions, 1796 deletions
diff --git a/opendc-trace/opendc-trace-api/build.gradle.kts b/opendc-trace/opendc-trace-api/build.gradle.kts
index 977eec0d..514cd777 100644
--- a/opendc-trace/opendc-trace-api/build.gradle.kts
+++ b/opendc-trace/opendc-trace-api/build.gradle.kts
@@ -22,7 +22,7 @@
description = "Workload trace library for OpenDC"
-/* Build configuration */
+// Build configuration
plugins {
`kotlin-library-conventions`
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt
index 0f75d890..4e82e48a 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt
@@ -32,5 +32,5 @@ package org.opendc.trace
public data class TableColumn(
public val name: String,
public val type: TableColumnType,
- public val isNullable: Boolean = false
+ public val isNullable: Boolean = false,
)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt
index 42b1c690..95a58935 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt
@@ -143,7 +143,10 @@ public interface TableReader : AutoCloseable {
* @throws IllegalArgumentException if the column index is not valid for this reader or this type.
* @return The value of the column as `List` or `null` if the column is null.
*/
- public fun <T> getList(index: Int, elementType: Class<T>): List<T>?
+ public fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>?
/**
* Obtain the value of the column with the specified [index] as [Set].
@@ -153,7 +156,10 @@ public interface TableReader : AutoCloseable {
* @throws IllegalArgumentException if the column index is not valid for this reader or this type.
* @return The value of the column as `Set` or `null` if the column is null.
*/
- public fun <T> getSet(index: Int, elementType: Class<T>): Set<T>?
+ public fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>?
/**
* Obtain the value of the column with the specified [index] as [Set].
@@ -164,7 +170,11 @@ public interface TableReader : AutoCloseable {
* @throws IllegalArgumentException if the column index is not valid for this reader or this type.
* @return The value of the column as `Map` or `null` if the column is null.
*/
- public fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>?
+ public fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>?
/**
* Determine whether a column named [name] has a `null` value for the current row.
@@ -264,7 +274,10 @@ public interface TableReader : AutoCloseable {
* @throws IllegalArgumentException if the column index is not valid for this reader or this type.
* @return The value of the column as `List` or `null` if the column is null.
*/
- public fun <T> getList(name: String, elementType: Class<T>): List<T>? = getList(resolve(name), elementType)
+ public fun <T> getList(
+ name: String,
+ elementType: Class<T>,
+ ): List<T>? = getList(resolve(name), elementType)
/**
* Obtain the value of the column named [name] as [Set].
@@ -274,7 +287,10 @@ public interface TableReader : AutoCloseable {
* @throws IllegalArgumentException if the column index is not valid for this reader or this type.
* @return The value of the column as `Set` or `null` if the column is null.
*/
- public fun <T> getSet(name: String, elementType: Class<T>): Set<T>? = getSet(resolve(name), elementType)
+ public fun <T> getSet(
+ name: String,
+ elementType: Class<T>,
+ ): Set<T>? = getSet(resolve(name), elementType)
/**
* Obtain the value of the column named [name] as [Set].
@@ -285,8 +301,11 @@ public interface TableReader : AutoCloseable {
* @throws IllegalArgumentException if the column index is not valid for this reader or this type.
* @return The value of the column as `Map` or `null` if the column is null.
*/
- public fun <K, V> getMap(name: String, keyType: Class<K>, valueType: Class<V>): Map<K, V>? =
- getMap(resolve(name), keyType, valueType)
+ public fun <K, V> getMap(
+ name: String,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? = getMap(resolve(name), keyType, valueType)
/**
* Closes the reader so that no further iteration or data access can be made.
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt
index 3b02794d..133bd01c 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt
@@ -55,7 +55,10 @@ public interface TableWriter : AutoCloseable {
* @param value The boolean value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setBoolean(index: Int, value: Boolean)
+ public fun setBoolean(
+ index: Int,
+ value: Boolean,
+ )
/**
* Set the column with index [index] to integer [value].
@@ -64,7 +67,10 @@ public interface TableWriter : AutoCloseable {
* @param value The integer value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setInt(index: Int, value: Int)
+ public fun setInt(
+ index: Int,
+ value: Int,
+ )
/**
* Set the column with index [index] to long [value].
@@ -73,7 +79,10 @@ public interface TableWriter : AutoCloseable {
* @param value The long value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setLong(index: Int, value: Long)
+ public fun setLong(
+ index: Int,
+ value: Long,
+ )
/**
* Set the column with index [index] to float [value].
@@ -82,7 +91,10 @@ public interface TableWriter : AutoCloseable {
* @param value The float value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setFloat(index: Int, value: Float)
+ public fun setFloat(
+ index: Int,
+ value: Float,
+ )
/**
* Set the column with index [index] to double [value].
@@ -91,7 +103,10 @@ public interface TableWriter : AutoCloseable {
* @param value The double value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setDouble(index: Int, value: Double)
+ public fun setDouble(
+ index: Int,
+ value: Double,
+ )
/**
* Set the column with index [index] to [String] [value].
@@ -100,7 +115,10 @@ public interface TableWriter : AutoCloseable {
* @param value The [String] value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setString(index: Int, value: String)
+ public fun setString(
+ index: Int,
+ value: String,
+ )
/**
* Set the column with index [index] to [UUID] [value].
@@ -109,7 +127,10 @@ public interface TableWriter : AutoCloseable {
* @param value The [UUID] value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setUUID(index: Int, value: UUID)
+ public fun setUUID(
+ index: Int,
+ value: UUID,
+ )
/**
* Set the column with index [index] to [Instant] [value].
@@ -118,7 +139,10 @@ public interface TableWriter : AutoCloseable {
* @param value The [Instant] value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setInstant(index: Int, value: Instant)
+ public fun setInstant(
+ index: Int,
+ value: Instant,
+ )
/**
* Set the column with index [index] to [Duration] [value].
@@ -127,7 +151,10 @@ public interface TableWriter : AutoCloseable {
* @param value The [Duration] value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setDuration(index: Int, value: Duration)
+ public fun setDuration(
+ index: Int,
+ value: Duration,
+ )
/**
* Set the column with index [index] to [List] [value].
@@ -136,7 +163,10 @@ public interface TableWriter : AutoCloseable {
* @param value The [Map] value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun <T> setList(index: Int, value: List<T>)
+ public fun <T> setList(
+ index: Int,
+ value: List<T>,
+ )
/**
* Set the column with index [index] to [Set] [value].
@@ -145,7 +175,10 @@ public interface TableWriter : AutoCloseable {
* @param value The [Set] value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun <T> setSet(index: Int, value: Set<T>)
+ public fun <T> setSet(
+ index: Int,
+ value: Set<T>,
+ )
/**
* Set the column with index [index] to [Map] [value].
@@ -154,7 +187,10 @@ public interface TableWriter : AutoCloseable {
* @param value The [Map] value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun <K, V> setMap(index: Int, value: Map<K, V>)
+ public fun <K, V> setMap(
+ index: Int,
+ value: Map<K, V>,
+ )
/**
* Set the column named [name] to boolean [value].
@@ -163,7 +199,10 @@ public interface TableWriter : AutoCloseable {
* @param value The boolean value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setBoolean(name: String, value: Boolean): Unit = setBoolean(resolve(name), value)
+ public fun setBoolean(
+ name: String,
+ value: Boolean,
+ ): Unit = setBoolean(resolve(name), value)
/**
* Set the column named [name] to integer [value].
@@ -172,7 +211,10 @@ public interface TableWriter : AutoCloseable {
* @param value The integer value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setInt(name: String, value: Int): Unit = setInt(resolve(name), value)
+ public fun setInt(
+ name: String,
+ value: Int,
+ ): Unit = setInt(resolve(name), value)
/**
* Set the column named [name] to long [value].
@@ -181,7 +223,10 @@ public interface TableWriter : AutoCloseable {
* @param value The long value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setLong(name: String, value: Long): Unit = setLong(resolve(name), value)
+ public fun setLong(
+ name: String,
+ value: Long,
+ ): Unit = setLong(resolve(name), value)
/**
* Set the column named [name] to float [value].
@@ -190,7 +235,10 @@ public interface TableWriter : AutoCloseable {
* @param value The float value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setFloat(name: String, value: Float): Unit = setFloat(resolve(name), value)
+ public fun setFloat(
+ name: String,
+ value: Float,
+ ): Unit = setFloat(resolve(name), value)
/**
* Set the column named [name] to double [value].
@@ -199,7 +247,10 @@ public interface TableWriter : AutoCloseable {
* @param value The double value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setDouble(name: String, value: Double): Unit = setDouble(resolve(name), value)
+ public fun setDouble(
+ name: String,
+ value: Double,
+ ): Unit = setDouble(resolve(name), value)
/**
* Set the column named [name] to [String] [value].
@@ -208,7 +259,10 @@ public interface TableWriter : AutoCloseable {
* @param value The [String] value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setString(name: String, value: String): Unit = setString(resolve(name), value)
+ public fun setString(
+ name: String,
+ value: String,
+ ): Unit = setString(resolve(name), value)
/**
* Set the column named [name] to [UUID] [value].
@@ -217,7 +271,10 @@ public interface TableWriter : AutoCloseable {
* @param value The [UUID] value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setUUID(name: String, value: UUID): Unit = setUUID(resolve(name), value)
+ public fun setUUID(
+ name: String,
+ value: UUID,
+ ): Unit = setUUID(resolve(name), value)
/**
* Set the column named [name] to [Instant] [value].
@@ -226,7 +283,10 @@ public interface TableWriter : AutoCloseable {
* @param value The [Instant] value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setInstant(name: String, value: Instant): Unit = setInstant(resolve(name), value)
+ public fun setInstant(
+ name: String,
+ value: Instant,
+ ): Unit = setInstant(resolve(name), value)
/**
* Set the column named [name] to [Duration] [value].
@@ -235,7 +295,10 @@ public interface TableWriter : AutoCloseable {
* @param value The [Duration] value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun setDuration(name: String, value: Duration): Unit = setDuration(resolve(name), value)
+ public fun setDuration(
+ name: String,
+ value: Duration,
+ ): Unit = setDuration(resolve(name), value)
/**
* Set the column named [name] to [List] [value].
@@ -244,7 +307,10 @@ public interface TableWriter : AutoCloseable {
* @param value The [List] value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun <T> setList(name: String, value: List<T>): Unit = setList(resolve(name), value)
+ public fun <T> setList(
+ name: String,
+ value: List<T>,
+ ): Unit = setList(resolve(name), value)
/**
* Set the column named [name] to [Set] [value].
@@ -253,7 +319,10 @@ public interface TableWriter : AutoCloseable {
* @param value The [Set] value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun <T> setSet(name: String, value: Set<T>): Unit = setSet(resolve(name), value)
+ public fun <T> setSet(
+ name: String,
+ value: Set<T>,
+ ): Unit = setSet(resolve(name), value)
/**
* Set the column named [name] to [Map] [value].
@@ -262,7 +331,10 @@ public interface TableWriter : AutoCloseable {
* @param value The [Map] value to set the column to.
* @throws IllegalArgumentException if the column is not valid for this method.
*/
- public fun <K, V> setMap(name: String, value: Map<K, V>): Unit = setMap(resolve(name), value)
+ public fun <K, V> setMap(
+ name: String,
+ value: Map<K, V>,
+ ): Unit = setMap(resolve(name), value)
/**
* Flush any buffered content to the underlying target.
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
index 64e8f272..a1059e9e 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt
@@ -55,7 +55,10 @@ public interface Trace {
* @throws IllegalArgumentException if [format] is not supported.
*/
@JvmStatic
- public fun open(path: File, format: String): Trace = open(path.toPath(), format)
+ public fun open(
+ path: File,
+ format: String,
+ ): Trace = open(path.toPath(), format)
/**
* Open a [Trace] at the specified [path] in the given [format].
@@ -65,7 +68,10 @@ public interface Trace {
* @throws IllegalArgumentException if [format] is not supported.
*/
@JvmStatic
- public fun open(path: Path, format: String): Trace {
+ public fun open(
+ path: Path,
+ format: String,
+ ): Trace {
val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" }
return TraceImpl(provider, path)
}
@@ -77,7 +83,10 @@ public interface Trace {
* @param format The format of the trace to create.
*/
@JvmStatic
- public fun create(path: File, format: String): Trace = create(path.toPath(), format)
+ public fun create(
+ path: File,
+ format: String,
+ ): Trace = create(path.toPath(), format)
/**
* Create a [Trace] at the specified [path] in the given [format].
@@ -86,7 +95,10 @@ public interface Trace {
* @param format The format of the trace to create.
*/
@JvmStatic
- public fun create(path: Path, format: String): Trace {
+ public fun create(
+ path: Path,
+ format: String,
+ ): Trace {
val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" }
provider.create(path)
return TraceImpl(provider, path)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
index 89f8dbc4..046dd13d 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
@@ -28,40 +28,40 @@ package org.opendc.trace.conv
* Identifier of the resource.
*/
@JvmField
-public val RESOURCE_ID: String = "id"
+public val resourceID: String = "id"
/**
* The cluster to which the resource belongs.
*/
@JvmField
-public val RESOURCE_CLUSTER_ID: String = "cluster_id"
+public val resourceClusterID: String = "cluster_id"
/**
* Start time for the resource.
*/
@JvmField
-public val RESOURCE_START_TIME: String = "start_time"
+public val resourceStartTime: String = "start_time"
/**
* End time for the resource.
*/
@JvmField
-public val RESOURCE_STOP_TIME: String = "stop_time"
+public val resourceStopTime: String = "stop_time"
/**
* Number of CPUs for the resource.
*/
@JvmField
-public val RESOURCE_CPU_COUNT: String = "cpu_count"
+public val resourceCpuCount: String = "cpu_count"
/**
* Total CPU capacity of the resource in MHz.
*/
@JvmField
-public val RESOURCE_CPU_CAPACITY: String = "cpu_capacity"
+public val resourceCpuCapacity: String = "cpu_capacity"
/**
* Memory capacity for the resource in KB.
*/
@JvmField
-public val RESOURCE_MEM_CAPACITY: String = "mem_capacity"
+public val resourceMemCapacity: String = "mem_capacity"
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt
index 5187d501..eede6bd6 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt
@@ -28,70 +28,70 @@ package org.opendc.trace.conv
* The timestamp at which the state was recorded.
*/
@JvmField
-public val RESOURCE_STATE_TIMESTAMP: String = "timestamp"
+public val resourceStateTimestamp: String = "timestamp"
/**
* Duration for the state.
*/
@JvmField
-public val RESOURCE_STATE_DURATION: String = "duration"
+public val resourceStateDuration: String = "duration"
/**
* A flag to indicate that the resource is powered on.
*/
@JvmField
-public val RESOURCE_STATE_POWERED_ON: String = "powered_on"
+public val resourceStatePoweredOn: String = "powered_on"
/**
* Total CPU usage of the resource in MHz.
*/
@JvmField
-public val RESOURCE_STATE_CPU_USAGE: String = "cpu_usage"
+public val resourceStateCpuUsage: String = "cpu_usage"
/**
* Total CPU usage of the resource in percentage.
*/
@JvmField
-public val RESOURCE_STATE_CPU_USAGE_PCT: String = "cpu_usage_pct"
+public val resourceStateCpuUsagePct: String = "cpu_usage_pct"
/**
* Total CPU demand of the resource in MHz.
*/
@JvmField
-public val RESOURCE_STATE_CPU_DEMAND: String = "cpu_demand"
+public val resourceStateCpuDemand: String = "cpu_demand"
/**
* CPU ready percentage.
*/
@JvmField
-public val RESOURCE_STATE_CPU_READY_PCT: String = "cpu_ready_pct"
+public val resourceStateCpuReadyPct: String = "cpu_ready_pct"
/**
* Memory usage of the resource in KB.
*/
@JvmField
-public val RESOURCE_STATE_MEM_USAGE: String = "mem_usage"
+public val resourceStateMemUsage: String = "mem_usage"
/**
* Disk read throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_DISK_READ: String = "disk_read"
+public val resourceStateDiskRead: String = "disk_read"
/**
* Disk write throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_DISK_WRITE: String = "disk_write"
+public val resourceStateDiskWrite: String = "disk_write"
/**
* Network receive throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_NET_RX: String = "net_rx"
+public val resourceStateNetRx: String = "net_rx"
/**
* Network transmit throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_NET_TX: String = "net_tx"
+public val resourceStateNetTx: String = "net_tx"
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
index 46ef051d..83537822 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
@@ -61,7 +61,10 @@ public interface TraceFormat {
* @throws IllegalArgumentException If [table] does not exist.
* @return The [TableDetails] for the specified [table].
*/
- public fun getDetails(path: Path, table: String): TableDetails
+ public fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails
/**
* Open a [TableReader] for the specified [table].
@@ -72,7 +75,11 @@ public interface TraceFormat {
* @throws IllegalArgumentException If [table] does not exist.
* @return A [TableReader] instance for the table.
*/
- public fun newReader(path: Path, table: String, projection: List<String>?): TableReader
+ public fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader
/**
* Open a [TableWriter] for the specified [table].
@@ -83,7 +90,10 @@ public interface TraceFormat {
* @throws UnsupportedOperationException If the format does not support writing.
* @return A [TableWriter] instance for the table.
*/
- public fun newWriter(path: Path, table: String): TableWriter
+ public fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter
/**
* A helper object for resolving providers.
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt
index 2fe820c4..4b9a0d95 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt
@@ -122,17 +122,27 @@ public abstract class CompositeTableReader : TableReader {
return delegate.getDuration(index)
}
- override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
val delegate = checkNotNull(delegate) { "Invalid reader state" }
return delegate.getList(index, elementType)
}
- override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
val delegate = checkNotNull(delegate) { "Invalid reader state" }
return delegate.getSet(index, elementType)
}
- override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
val delegate = checkNotNull(delegate) { "Invalid reader state" }
return delegate.getMap(index, keyType, valueType)
}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt
index 26739e34..fda2bc54 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt
@@ -32,7 +32,10 @@ import java.util.UUID
/**
* Helper method to convert a [List] into a [List] with elements of [targetElementType].
*/
-public fun <T> TableColumnType.List.convertTo(value: List<*>?, targetElementType: Class<T>): List<T>? {
+public fun <T> TableColumnType.List.convertTo(
+ value: List<*>?,
+ targetElementType: Class<T>,
+): List<T>? {
require(elementType.isCompatible(targetElementType)) { "Target element type is not compatible with $elementType" }
@Suppress("UNCHECKED_CAST")
return value as List<T>?
@@ -41,7 +44,10 @@ public fun <T> TableColumnType.List.convertTo(value: List<*>?, targetElementType
/**
* Helper method to convert a [Set] into a [Set] with elements of [targetElementType].
*/
-public fun <T> TableColumnType.Set.convertTo(value: Set<*>?, targetElementType: Class<T>): Set<T>? {
+public fun <T> TableColumnType.Set.convertTo(
+ value: Set<*>?,
+ targetElementType: Class<T>,
+): Set<T>? {
require(elementType.isCompatible(targetElementType)) { "Target element type is not compatible with $elementType" }
@Suppress("UNCHECKED_CAST")
return value as Set<T>?
@@ -50,7 +56,11 @@ public fun <T> TableColumnType.Set.convertTo(value: Set<*>?, targetElementType:
/**
* Helper method to convert a [Map] into a [Map] with [targetKeyType] keys and [targetValueType] values.
*/
-public fun <K, V> TableColumnType.Map.convertTo(value: Map<*, *>?, targetKeyType: Class<K>, targetValueType: Class<V>): Map<K, V>? {
+public fun <K, V> TableColumnType.Map.convertTo(
+ value: Map<*, *>?,
+ targetKeyType: Class<K>,
+ targetValueType: Class<V>,
+): Map<K, V>? {
require(keyType.isCompatible(targetKeyType)) { "Target key type $targetKeyType is not compatible with $keyType" }
require(valueType.isCompatible(targetValueType)) { "Target value type $targetValueType is not compatible with $valueType" }
@Suppress("UNCHECKED_CAST")
diff --git a/opendc-trace/opendc-trace-azure/build.gradle.kts b/opendc-trace/opendc-trace-azure/build.gradle.kts
index ee53c583..21b8b439 100644
--- a/opendc-trace/opendc-trace-azure/build.gradle.kts
+++ b/opendc-trace/opendc-trace-azure/build.gradle.kts
@@ -22,7 +22,7 @@
description = "Support for Azure VM traces in OpenDC"
-/* Build configuration */
+// Build configuration
plugins {
`kotlin-library-conventions`
`benchmark-conventions`
diff --git a/opendc-trace/opendc-trace-azure/src/jmh/kotlin/org/opendc/trace/azure/AzureTraceBenchmarks.kt b/opendc-trace/opendc-trace-azure/src/jmh/kotlin/org/opendc/trace/azure/AzureTraceBenchmarks.kt
index 6759f38a..bb3c2450 100644
--- a/opendc-trace/opendc-trace-azure/src/jmh/kotlin/org/opendc/trace/azure/AzureTraceBenchmarks.kt
+++ b/opendc-trace/opendc-trace-azure/src/jmh/kotlin/org/opendc/trace/azure/AzureTraceBenchmarks.kt
@@ -22,9 +22,9 @@
package org.opendc.trace.azure
-import org.opendc.trace.conv.RESOURCE_ID
import org.opendc.trace.conv.TABLE_RESOURCES
import org.opendc.trace.conv.TABLE_RESOURCE_STATES
+import org.opendc.trace.conv.resourceID
import org.opendc.trace.spi.TraceFormat
import org.openjdk.jmh.annotations.Benchmark
import org.openjdk.jmh.annotations.Fork
@@ -58,7 +58,7 @@ class AzureTraceBenchmarks {
fun benchmarkResourcesReader(bh: Blackhole) {
val reader = format.newReader(path, TABLE_RESOURCES, null)
try {
- val idColumn = reader.resolve(RESOURCE_ID)
+ val idColumn = reader.resolve(resourceID)
while (reader.nextRow()) {
bh.consume(reader.getString(idColumn))
}
@@ -71,7 +71,7 @@ class AzureTraceBenchmarks {
fun benchmarkResourceStatesReader(bh: Blackhole) {
val reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
try {
- val idColumn = reader.resolve(RESOURCE_ID)
+ val idColumn = reader.resolve(resourceID)
while (reader.nextRow()) {
bh.consume(reader.getString(idColumn))
}
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
index 0c60c75d..bcf6ff52 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt
@@ -26,9 +26,9 @@ import com.fasterxml.jackson.core.JsonToken
import com.fasterxml.jackson.dataformat.csv.CsvParser
import com.fasterxml.jackson.dataformat.csv.CsvSchema
import org.opendc.trace.TableReader
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceStateCpuUsagePct
+import org.opendc.trace.conv.resourceStateTimestamp
import java.time.Duration
import java.time.Instant
import java.util.UUID
@@ -74,21 +74,21 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
return true
}
- private val COL_ID = 0
- private val COL_TIMESTAMP = 1
- private val COL_CPU_USAGE_PCT = 2
+ private val colID = 0
+ private val colTimestamp = 1
+ private val colCpuUsagePct = 2
override fun resolve(name: String): Int {
return when (name) {
- RESOURCE_ID -> COL_ID
- RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP
- RESOURCE_STATE_CPU_USAGE_PCT -> COL_CPU_USAGE_PCT
+ resourceID -> colID
+ resourceStateTimestamp -> colTimestamp
+ resourceStateCpuUsagePct -> colCpuUsagePct
else -> -1
}
}
override fun isNull(index: Int): Boolean {
- require(index in 0..COL_CPU_USAGE_PCT) { "Invalid column index" }
+ require(index in 0..colCpuUsagePct) { "Invalid column index" }
return false
}
@@ -111,7 +111,7 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
override fun getDouble(index: Int): Double {
checkActive()
return when (index) {
- COL_CPU_USAGE_PCT -> cpuUsagePct
+ colCpuUsagePct -> cpuUsagePct
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -119,7 +119,7 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
override fun getString(index: Int): String? {
checkActive()
return when (index) {
- COL_ID -> id
+ colID -> id
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -131,7 +131,7 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
override fun getInstant(index: Int): Instant? {
checkActive()
return when (index) {
- COL_TIMESTAMP -> timestamp
+ colTimestamp -> timestamp
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -140,15 +140,25 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
throw IllegalArgumentException("Invalid column")
}
- override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
throw IllegalArgumentException("Invalid column")
}
@@ -196,13 +206,14 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta
/**
* The [CsvSchema] that is used to parse the trace.
*/
- private val schema = CsvSchema.builder()
- .addColumn("timestamp", CsvSchema.ColumnType.NUMBER)
- .addColumn("vm id", CsvSchema.ColumnType.STRING)
- .addColumn("CPU min cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU max cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU avg cpu", CsvSchema.ColumnType.NUMBER)
- .setAllowComments(true)
- .build()
+ private val schema =
+ CsvSchema.builder()
+ .addColumn("timestamp", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm id", CsvSchema.ColumnType.STRING)
+ .addColumn("CPU min cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU max cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU avg cpu", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .build()
}
}
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
index c0acb67a..d86a0466 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt
@@ -26,11 +26,11 @@ import com.fasterxml.jackson.core.JsonToken
import com.fasterxml.jackson.dataformat.csv.CsvParser
import com.fasterxml.jackson.dataformat.csv.CsvSchema
import org.opendc.trace.TableReader
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY
-import org.opendc.trace.conv.RESOURCE_START_TIME
-import org.opendc.trace.conv.RESOURCE_STOP_TIME
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStartTime
+import org.opendc.trace.conv.resourceStopTime
import java.time.Duration
import java.time.Instant
import java.util.UUID
@@ -78,25 +78,25 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
return true
}
- private val COL_ID = 0
- private val COL_START_TIME = 1
- private val COL_STOP_TIME = 2
- private val COL_CPU_COUNT = 3
- private val COL_MEM_CAPACITY = 4
+ private val colID = 0
+ private val colStartTime = 1
+ private val colStopTime = 2
+ private val colCpuCount = 3
+ private val colMemCapacity = 4
override fun resolve(name: String): Int {
return when (name) {
- RESOURCE_ID -> COL_ID
- RESOURCE_START_TIME -> COL_START_TIME
- RESOURCE_STOP_TIME -> COL_STOP_TIME
- RESOURCE_CPU_COUNT -> COL_CPU_COUNT
- RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY
+ resourceID -> colID
+ resourceStartTime -> colStartTime
+ resourceStopTime -> colStopTime
+ resourceCpuCount -> colCpuCount
+ resourceMemCapacity -> colMemCapacity
else -> -1
}
}
override fun isNull(index: Int): Boolean {
- require(index in 0..COL_MEM_CAPACITY) { "Invalid column index" }
+ require(index in 0..colMemCapacity) { "Invalid column index" }
return false
}
@@ -107,7 +107,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
override fun getInt(index: Int): Int {
checkActive()
return when (index) {
- COL_CPU_COUNT -> cpuCores
+ colCpuCount -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -115,7 +115,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
override fun getLong(index: Int): Long {
checkActive()
return when (index) {
- COL_CPU_COUNT -> cpuCores.toLong()
+ colCpuCount -> cpuCores.toLong()
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -127,7 +127,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
override fun getDouble(index: Int): Double {
checkActive()
return when (index) {
- COL_MEM_CAPACITY -> memCapacity
+ colMemCapacity -> memCapacity
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -135,7 +135,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
override fun getString(index: Int): String? {
checkActive()
return when (index) {
- COL_ID -> id
+ colID -> id
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -147,8 +147,8 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
override fun getInstant(index: Int): Instant? {
checkActive()
return when (index) {
- COL_START_TIME -> startTime
- COL_STOP_TIME -> stopTime
+ colStartTime -> startTime
+ colStopTime -> stopTime
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -157,15 +157,25 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
throw IllegalArgumentException("Invalid column")
}
- override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
throw IllegalArgumentException("Invalid column")
}
@@ -217,19 +227,20 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe
/**
* The [CsvSchema] that is used to parse the trace.
*/
- private val schema = CsvSchema.builder()
- .addColumn("vm id", CsvSchema.ColumnType.NUMBER)
- .addColumn("subscription id", CsvSchema.ColumnType.STRING)
- .addColumn("deployment id", CsvSchema.ColumnType.NUMBER)
- .addColumn("timestamp vm created", CsvSchema.ColumnType.NUMBER)
- .addColumn("timestamp vm deleted", CsvSchema.ColumnType.NUMBER)
- .addColumn("max cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("avg cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("p95 cpu", CsvSchema.ColumnType.NUMBER)
- .addColumn("vm category", CsvSchema.ColumnType.NUMBER)
- .addColumn("vm virtual core count", CsvSchema.ColumnType.NUMBER)
- .addColumn("vm memory", CsvSchema.ColumnType.NUMBER)
- .setAllowComments(true)
- .build()
+ private val schema =
+ CsvSchema.builder()
+ .addColumn("vm id", CsvSchema.ColumnType.NUMBER)
+ .addColumn("subscription id", CsvSchema.ColumnType.STRING)
+ .addColumn("deployment id", CsvSchema.ColumnType.NUMBER)
+ .addColumn("timestamp vm created", CsvSchema.ColumnType.NUMBER)
+ .addColumn("timestamp vm deleted", CsvSchema.ColumnType.NUMBER)
+ .addColumn("max cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("avg cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("p95 cpu", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm category", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm virtual core count", CsvSchema.ColumnType.NUMBER)
+ .addColumn("vm memory", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .build()
}
}
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
index 3f64c640..a75da9d9 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
@@ -28,15 +28,15 @@ import org.opendc.trace.TableColumn
import org.opendc.trace.TableColumnType
import org.opendc.trace.TableReader
import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY
-import org.opendc.trace.conv.RESOURCE_START_TIME
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
-import org.opendc.trace.conv.RESOURCE_STOP_TIME
import org.opendc.trace.conv.TABLE_RESOURCES
import org.opendc.trace.conv.TABLE_RESOURCE_STATES
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStartTime
+import org.opendc.trace.conv.resourceStateCpuUsagePct
+import org.opendc.trace.conv.resourceStateTimestamp
+import org.opendc.trace.conv.resourceStopTime
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
import org.opendc.trace.util.CompositeTableReader
@@ -59,9 +59,10 @@ public class AzureTraceFormat : TraceFormat {
/**
* The [CsvFactory] used to create the parser.
*/
- private val factory = CsvFactory()
- .enable(CsvParser.Feature.ALLOW_COMMENTS)
- .enable(CsvParser.Feature.TRIM_SPACES)
+ private val factory =
+ CsvFactory()
+ .enable(CsvParser.Feature.ALLOW_COMMENTS)
+ .enable(CsvParser.Feature.TRIM_SPACES)
override fun create(path: Path) {
throw UnsupportedOperationException("Writing not supported for this format")
@@ -69,29 +70,38 @@ public class AzureTraceFormat : TraceFormat {
override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
- override fun getDetails(path: Path, table: String): TableDetails {
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
return when (table) {
- TABLE_RESOURCES -> TableDetails(
- listOf(
- TableColumn(RESOURCE_ID, TableColumnType.String),
- TableColumn(RESOURCE_START_TIME, TableColumnType.Instant),
- TableColumn(RESOURCE_STOP_TIME, TableColumnType.Instant),
- TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int),
- TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double)
+ TABLE_RESOURCES ->
+ TableDetails(
+ listOf(
+ TableColumn(resourceID, TableColumnType.String),
+ TableColumn(resourceStartTime, TableColumnType.Instant),
+ TableColumn(resourceStopTime, TableColumnType.Instant),
+ TableColumn(resourceCpuCount, TableColumnType.Int),
+ TableColumn(resourceMemCapacity, TableColumnType.Double),
+ ),
)
- )
- TABLE_RESOURCE_STATES -> TableDetails(
- listOf(
- TableColumn(RESOURCE_ID, TableColumnType.String),
- TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant),
- TableColumn(RESOURCE_STATE_CPU_USAGE_PCT, TableColumnType.Double)
+ TABLE_RESOURCE_STATES ->
+ TableDetails(
+ listOf(
+ TableColumn(resourceID, TableColumnType.String),
+ TableColumn(resourceStateTimestamp, TableColumnType.Instant),
+ TableColumn(resourceStateCpuUsagePct, TableColumnType.Double),
+ ),
)
- )
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
return when (table) {
TABLE_RESOURCES -> {
val stream = GZIPInputStream(path.resolve("vmtable/vmtable.csv.gz").inputStream())
@@ -102,7 +112,10 @@ public class AzureTraceFormat : TraceFormat {
}
}
- override fun newWriter(path: Path, table: String): TableWriter {
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
throw UnsupportedOperationException("Writing not supported for this format")
}
@@ -110,10 +123,11 @@ public class AzureTraceFormat : TraceFormat {
* Construct a [TableReader] for reading over all VM CPU readings.
*/
private fun newResourceStateReader(path: Path): TableReader {
- val partitions = Files.walk(path.resolve("vm_cpu_readings"), 1)
- .filter { !Files.isDirectory(it) && it.name.endsWith(".csv.gz") }
- .collect(Collectors.toMap({ it.name.removeSuffix(".csv.gz") }, { it }))
- .toSortedMap()
+ val partitions =
+ Files.walk(path.resolve("vm_cpu_readings"), 1)
+ .filter { !Files.isDirectory(it) && it.name.endsWith(".csv.gz") }
+ .collect(Collectors.toMap({ it.name.removeSuffix(".csv.gz") }, { it }))
+ .toSortedMap()
val it = partitions.iterator()
return object : CompositeTableReader() {
diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
index 00cdc174..4fe96a8e 100644
--- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
@@ -33,13 +33,13 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.trace.TableColumn
import org.opendc.trace.TableReader
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
import org.opendc.trace.conv.TABLE_RESOURCES
import org.opendc.trace.conv.TABLE_RESOURCE_STATES
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStateCpuUsagePct
+import org.opendc.trace.conv.resourceStateTimestamp
import org.opendc.trace.testkit.TableReaderTestKit
import java.nio.file.Paths
@@ -76,9 +76,9 @@ class AzureTraceFormatTest {
val reader = format.newReader(path, TABLE_RESOURCES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.getString(RESOURCE_ID)) },
- { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
- { assertEquals(1750000.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) }
+ { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.getString(resourceID)) },
+ { assertEquals(1, reader.getInt(resourceCpuCount)) },
+ { assertEquals(1750000.0, reader.getDouble(resourceMemCapacity)) },
)
reader.close()
@@ -91,9 +91,9 @@ class AzureTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.getString(RESOURCE_ID)) },
- { assertEquals(0, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) },
- { assertEquals(0.0286979, reader.getDouble(RESOURCE_STATE_CPU_USAGE_PCT), 0.01) }
+ { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.getString(resourceID)) },
+ { assertEquals(0, reader.getInstant(resourceStateTimestamp)?.epochSecond) },
+ { assertEquals(0.0286979, reader.getDouble(resourceStateCpuUsagePct), 0.01) },
)
reader.close()
diff --git a/opendc-trace/opendc-trace-bitbrains/build.gradle.kts b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts
index 502b052a..6ca40d3d 100644
--- a/opendc-trace/opendc-trace-bitbrains/build.gradle.kts
+++ b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts
@@ -22,7 +22,7 @@
description = "Support for GWF traces in OpenDC"
-/* Build configuration */
+// Build configuration
plugins {
`kotlin-library-conventions`
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
index 511f02db..8387d1ed 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt
@@ -23,18 +23,18 @@
package org.opendc.trace.bitbrains
import org.opendc.trace.TableReader
-import org.opendc.trace.conv.RESOURCE_CLUSTER_ID
-import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_DEMAND
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_READY_PCT
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT
-import org.opendc.trace.conv.RESOURCE_STATE_DISK_READ
-import org.opendc.trace.conv.RESOURCE_STATE_DISK_WRITE
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
+import org.opendc.trace.conv.resourceClusterID
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStateCpuDemand
+import org.opendc.trace.conv.resourceStateCpuReadyPct
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateCpuUsagePct
+import org.opendc.trace.conv.resourceStateDiskRead
+import org.opendc.trace.conv.resourceStateDiskWrite
+import org.opendc.trace.conv.resourceStateTimestamp
import java.io.BufferedReader
import java.time.Duration
import java.time.Instant
@@ -99,18 +99,18 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
val field = line.subSequence(start, end) as String
when (col++) {
- COL_TIMESTAMP -> timestamp = Instant.ofEpochSecond(field.toLong(10))
- COL_CPU_USAGE -> cpuUsage = field.toDouble()
- COL_CPU_DEMAND -> cpuDemand = field.toDouble()
- COL_DISK_READ -> diskRead = field.toDouble()
- COL_DISK_WRITE -> diskWrite = field.toDouble()
- COL_CLUSTER_ID -> cluster = field.trim()
- COL_NCPUS -> cpuCores = field.toInt(10)
- COL_CPU_READY_PCT -> cpuReadyPct = field.toDouble()
- COL_POWERED_ON -> poweredOn = field.toInt(10) == 1
- COL_CPU_CAPACITY -> cpuCapacity = field.toDouble()
- COL_ID -> id = field.trim()
- COL_MEM_CAPACITY -> memCapacity = field.toDouble() * 1000 // Convert from MB to KB
+ colTimestamp -> timestamp = Instant.ofEpochSecond(field.toLong(10))
+ colCpuUsage -> cpuUsage = field.toDouble()
+ colCpuDemand -> cpuDemand = field.toDouble()
+ colDiskRead -> diskRead = field.toDouble()
+ colDiskWrite -> diskWrite = field.toDouble()
+ colClusterID -> cluster = field.trim()
+ colNcpus -> cpuCores = field.toInt(10)
+ colCpuReadyPct -> cpuReadyPct = field.toDouble()
+ colPoweredOn -> poweredOn = field.toInt(10) == 1
+ colCpuCapacity -> cpuCapacity = field.toDouble()
+ colID -> id = field.trim()
+ colMemCapacity -> memCapacity = field.toDouble() * 1000 // Convert from MB to KB
}
}
@@ -119,31 +119,31 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
override fun resolve(name: String): Int {
return when (name) {
- RESOURCE_ID -> COL_ID
- RESOURCE_CLUSTER_ID -> COL_CLUSTER_ID
- RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP
- RESOURCE_CPU_COUNT -> COL_NCPUS
- RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY
- RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE
- RESOURCE_STATE_CPU_USAGE_PCT -> COL_CPU_USAGE_PCT
- RESOURCE_STATE_CPU_DEMAND -> COL_CPU_DEMAND
- RESOURCE_STATE_CPU_READY_PCT -> COL_CPU_READY_PCT
- RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY
- RESOURCE_STATE_DISK_READ -> COL_DISK_READ
- RESOURCE_STATE_DISK_WRITE -> COL_DISK_WRITE
+ resourceID -> colID
+ resourceClusterID -> colClusterID
+ resourceStateTimestamp -> colTimestamp
+ resourceCpuCount -> colNcpus
+ resourceCpuCapacity -> colCpuCapacity
+ resourceStateCpuUsage -> colCpuUsage
+ resourceStateCpuUsagePct -> colCpuUsagePct
+ resourceStateCpuDemand -> colCpuDemand
+ resourceStateCpuReadyPct -> colCpuReadyPct
+ resourceMemCapacity -> colMemCapacity
+ resourceStateDiskRead -> colDiskRead
+ resourceStateDiskWrite -> colDiskWrite
else -> -1
}
}
override fun isNull(index: Int): Boolean {
- require(index in 0 until COL_MAX) { "Invalid column index" }
+ require(index in 0 until colMax) { "Invalid column index" }
return false
}
override fun getBoolean(index: Int): Boolean {
check(state == State.Active) { "No active row" }
return when (index) {
- COL_POWERED_ON -> poweredOn
+ colPoweredOn -> poweredOn
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -151,7 +151,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
override fun getInt(index: Int): Int {
check(state == State.Active) { "No active row" }
return when (index) {
- COL_NCPUS -> cpuCores
+ colNcpus -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -167,14 +167,14 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
override fun getDouble(index: Int): Double {
check(state == State.Active) { "No active row" }
return when (index) {
- COL_CPU_CAPACITY -> cpuCapacity
- COL_CPU_USAGE -> cpuUsage
- COL_CPU_USAGE_PCT -> cpuUsage / cpuCapacity
- COL_CPU_READY_PCT -> cpuReadyPct
- COL_CPU_DEMAND -> cpuDemand
- COL_MEM_CAPACITY -> memCapacity
- COL_DISK_READ -> diskRead
- COL_DISK_WRITE -> diskWrite
+ colCpuCapacity -> cpuCapacity
+ colCpuUsage -> cpuUsage
+ colCpuUsagePct -> cpuUsage / cpuCapacity
+ colCpuReadyPct -> cpuReadyPct
+ colCpuDemand -> cpuDemand
+ colMemCapacity -> memCapacity
+ colDiskRead -> diskRead
+ colDiskWrite -> diskWrite
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -182,8 +182,8 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
override fun getString(index: Int): String? {
check(state == State.Active) { "No active row" }
return when (index) {
- COL_ID -> id
- COL_CLUSTER_ID -> cluster
+ colID -> id
+ colClusterID -> cluster
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -195,7 +195,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
override fun getInstant(index: Int): Instant? {
check(state == State.Active) { "No active row" }
return when (index) {
- COL_TIMESTAMP -> timestamp
+ colTimestamp -> timestamp
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -204,15 +204,25 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
throw IllegalArgumentException("Invalid column")
}
- override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
throw IllegalArgumentException("Invalid column")
}
@@ -259,22 +269,24 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR
/**
* Default column indices for the extended Bitbrains format.
*/
- private val COL_TIMESTAMP = 0
- private val COL_CPU_USAGE = 1
- private val COL_CPU_DEMAND = 2
- private val COL_DISK_READ = 4
- private val COL_DISK_WRITE = 6
- private val COL_CLUSTER_ID = 10
- private val COL_NCPUS = 12
- private val COL_CPU_READY_PCT = 13
- private val COL_POWERED_ON = 14
- private val COL_CPU_CAPACITY = 18
- private val COL_ID = 19
- private val COL_MEM_CAPACITY = 20
- private val COL_CPU_USAGE_PCT = 21
- private val COL_MAX = COL_CPU_USAGE_PCT + 1
+ private val colTimestamp = 0
+ private val colCpuUsage = 1
+ private val colCpuDemand = 2
+ private val colDiskRead = 4
+ private val colDiskWrite = 6
+ private val colClusterID = 10
+ private val colNcpus = 12
+ private val colCpuReadyPct = 13
+ private val colPoweredOn = 14
+ private val colCpuCapacity = 18
+ private val colID = 19
+ private val colMemCapacity = 20
+ private val colCpuUsagePct = 21
+ private val colMax = colCpuUsagePct + 1
private enum class State {
- Pending, Active, Closed
+ Pending,
+ Active,
+ Closed,
}
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
index d364694c..6115953f 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
@@ -26,19 +26,19 @@ import org.opendc.trace.TableColumn
import org.opendc.trace.TableColumnType
import org.opendc.trace.TableReader
import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.RESOURCE_CLUSTER_ID
-import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_DEMAND
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_READY_PCT
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT
-import org.opendc.trace.conv.RESOURCE_STATE_DISK_READ
-import org.opendc.trace.conv.RESOURCE_STATE_DISK_WRITE
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
import org.opendc.trace.conv.TABLE_RESOURCE_STATES
+import org.opendc.trace.conv.resourceClusterID
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStateCpuDemand
+import org.opendc.trace.conv.resourceStateCpuReadyPct
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateCpuUsagePct
+import org.opendc.trace.conv.resourceStateDiskRead
+import org.opendc.trace.conv.resourceStateDiskWrite
+import org.opendc.trace.conv.resourceStateTimestamp
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
import org.opendc.trace.util.CompositeTableReader
@@ -64,36 +64,47 @@ public class BitbrainsExTraceFormat : TraceFormat {
override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCE_STATES)
- override fun getDetails(path: Path, table: String): TableDetails {
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
return when (table) {
- TABLE_RESOURCE_STATES -> TableDetails(
- listOf(
- TableColumn(RESOURCE_ID, TableColumnType.String),
- TableColumn(RESOURCE_CLUSTER_ID, TableColumnType.String),
- TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant),
- TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int),
- TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double),
- TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double),
- TableColumn(RESOURCE_STATE_CPU_USAGE_PCT, TableColumnType.Double),
- TableColumn(RESOURCE_STATE_CPU_DEMAND, TableColumnType.Double),
- TableColumn(RESOURCE_STATE_CPU_READY_PCT, TableColumnType.Double),
- TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double),
- TableColumn(RESOURCE_STATE_DISK_READ, TableColumnType.Double),
- TableColumn(RESOURCE_STATE_DISK_WRITE, TableColumnType.Double)
+ TABLE_RESOURCE_STATES ->
+ TableDetails(
+ listOf(
+ TableColumn(resourceID, TableColumnType.String),
+ TableColumn(resourceClusterID, TableColumnType.String),
+ TableColumn(resourceStateTimestamp, TableColumnType.Instant),
+ TableColumn(resourceCpuCount, TableColumnType.Int),
+ TableColumn(resourceCpuCapacity, TableColumnType.Double),
+ TableColumn(resourceStateCpuUsage, TableColumnType.Double),
+ TableColumn(resourceStateCpuUsagePct, TableColumnType.Double),
+ TableColumn(resourceStateCpuDemand, TableColumnType.Double),
+ TableColumn(resourceStateCpuReadyPct, TableColumnType.Double),
+ TableColumn(resourceMemCapacity, TableColumnType.Double),
+ TableColumn(resourceStateDiskRead, TableColumnType.Double),
+ TableColumn(resourceStateDiskWrite, TableColumnType.Double),
+ ),
)
- )
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
return when (table) {
TABLE_RESOURCE_STATES -> newResourceStateReader(path)
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newWriter(path: Path, table: String): TableWriter {
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
throw UnsupportedOperationException("Writing not supported for this format")
}
@@ -101,10 +112,11 @@ public class BitbrainsExTraceFormat : TraceFormat {
* Construct a [TableReader] for reading over all resource state partitions.
*/
private fun newResourceStateReader(path: Path): TableReader {
- val partitions = Files.walk(path, 1)
- .filter { !Files.isDirectory(it) && it.extension == "txt" }
- .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
- .toSortedMap()
+ val partitions =
+ Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "txt" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
val it = partitions.iterator()
return object : CompositeTableReader() {
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
index 65ca8a9c..e264fccb 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt
@@ -27,18 +27,18 @@ import com.fasterxml.jackson.core.JsonToken
import com.fasterxml.jackson.dataformat.csv.CsvParser
import com.fasterxml.jackson.dataformat.csv.CsvSchema
import org.opendc.trace.TableReader
-import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT
-import org.opendc.trace.conv.RESOURCE_STATE_DISK_READ
-import org.opendc.trace.conv.RESOURCE_STATE_DISK_WRITE
-import org.opendc.trace.conv.RESOURCE_STATE_MEM_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_NET_RX
-import org.opendc.trace.conv.RESOURCE_STATE_NET_TX
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateCpuUsagePct
+import org.opendc.trace.conv.resourceStateDiskRead
+import org.opendc.trace.conv.resourceStateDiskWrite
+import org.opendc.trace.conv.resourceStateMemUsage
+import org.opendc.trace.conv.resourceStateNetRx
+import org.opendc.trace.conv.resourceStateNetTx
+import org.opendc.trace.conv.resourceStateTimestamp
import java.text.NumberFormat
import java.time.Duration
import java.time.Instant
@@ -103,20 +103,21 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
when (parser.currentName) {
"Timestamp [ms]" -> {
- timestamp = when (timestampType) {
- TimestampType.UNDECIDED -> {
- try {
- val res = LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC)
- timestampType = TimestampType.DATE_TIME
- res
- } catch (e: DateTimeParseException) {
- timestampType = TimestampType.EPOCH_MILLIS
- Instant.ofEpochSecond(parser.longValue)
+ timestamp =
+ when (timestampType) {
+ TimestampType.UNDECIDED -> {
+ try {
+ val res = LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC)
+ timestampType = TimestampType.DATE_TIME
+ res
+ } catch (e: DateTimeParseException) {
+ timestampType = TimestampType.EPOCH_MILLIS
+ Instant.ofEpochSecond(parser.longValue)
+ }
}
+ TimestampType.DATE_TIME -> LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC)
+ TimestampType.EPOCH_MILLIS -> Instant.ofEpochSecond(parser.longValue)
}
- TimestampType.DATE_TIME -> LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC)
- TimestampType.EPOCH_MILLIS -> Instant.ofEpochSecond(parser.longValue)
- }
}
"CPU cores" -> cpuCores = parser.intValue
"CPU capacity provisioned [MHZ]" -> cpuCapacity = parseSafeDouble()
@@ -134,39 +135,39 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
return true
}
- private val COL_TIMESTAMP = 0
- private val COL_CPU_COUNT = 1
- private val COL_CPU_CAPACITY = 2
- private val COL_CPU_USAGE = 3
- private val COL_CPU_USAGE_PCT = 4
- private val COL_MEM_CAPACITY = 5
- private val COL_MEM_USAGE = 6
- private val COL_DISK_READ = 7
- private val COL_DISK_WRITE = 8
- private val COL_NET_RX = 9
- private val COL_NET_TX = 10
- private val COL_ID = 11
+ private val colTimestamp = 0
+ private val colCpuCount = 1
+ private val colCpuCapacity = 2
+ private val colCpuUsage = 3
+ private val colCpuUsagePct = 4
+ private val colMemCapacity = 5
+ private val colMemUsage = 6
+ private val colDiskRead = 7
+ private val colDiskWrite = 8
+ private val colNetRx = 9
+ private val colNetTx = 10
+ private val colID = 11
override fun resolve(name: String): Int {
return when (name) {
- RESOURCE_ID -> COL_ID
- RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP
- RESOURCE_CPU_COUNT -> COL_CPU_COUNT
- RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY
- RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE
- RESOURCE_STATE_CPU_USAGE_PCT -> COL_CPU_USAGE_PCT
- RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY
- RESOURCE_STATE_MEM_USAGE -> COL_MEM_USAGE
- RESOURCE_STATE_DISK_READ -> COL_DISK_READ
- RESOURCE_STATE_DISK_WRITE -> COL_DISK_WRITE
- RESOURCE_STATE_NET_RX -> COL_NET_RX
- RESOURCE_STATE_NET_TX -> COL_NET_TX
+ resourceID -> colID
+ resourceStateTimestamp -> colTimestamp
+ resourceCpuCount -> colCpuCount
+ resourceCpuCapacity -> colCpuCapacity
+ resourceStateCpuUsage -> colCpuUsage
+ resourceStateCpuUsagePct -> colCpuUsagePct
+ resourceMemCapacity -> colMemCapacity
+ resourceStateMemUsage -> colMemUsage
+ resourceStateDiskRead -> colDiskRead
+ resourceStateDiskWrite -> colDiskWrite
+ resourceStateNetRx -> colNetRx
+ resourceStateNetTx -> colNetTx
else -> -1
}
}
override fun isNull(index: Int): Boolean {
- require(index in 0..COL_ID) { "Invalid column index" }
+ require(index in 0..colID) { "Invalid column index" }
return false
}
@@ -177,7 +178,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun getInt(index: Int): Int {
checkActive()
return when (index) {
- COL_CPU_COUNT -> cpuCores
+ colCpuCount -> cpuCores
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -193,15 +194,15 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun getDouble(index: Int): Double {
checkActive()
return when (index) {
- COL_CPU_CAPACITY -> cpuCapacity
- COL_CPU_USAGE -> cpuUsage
- COL_CPU_USAGE_PCT -> cpuUsagePct
- COL_MEM_CAPACITY -> memCapacity
- COL_MEM_USAGE -> memUsage
- COL_DISK_READ -> diskRead
- COL_DISK_WRITE -> diskWrite
- COL_NET_RX -> netReceived
- COL_NET_TX -> netTransmitted
+ colCpuCapacity -> cpuCapacity
+ colCpuUsage -> cpuUsage
+ colCpuUsagePct -> cpuUsagePct
+ colMemCapacity -> memCapacity
+ colMemUsage -> memUsage
+ colDiskRead -> diskRead
+ colDiskWrite -> diskWrite
+ colNetRx -> netReceived
+ colNetTx -> netTransmitted
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -209,7 +210,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun getString(index: Int): String {
checkActive()
return when (index) {
- COL_ID -> partition
+ colID -> partition
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -221,7 +222,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
override fun getInstant(index: Int): Instant? {
checkActive()
return when (index) {
- COL_TIMESTAMP -> timestamp
+ colTimestamp -> timestamp
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -230,15 +231,25 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
throw IllegalArgumentException("Invalid column")
}
- override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
throw IllegalArgumentException("Invalid column")
}
@@ -322,30 +333,33 @@ internal class BitbrainsResourceStateTableReader(private val partition: String,
* The type of the timestamp in the trace.
*/
private enum class TimestampType {
- UNDECIDED, DATE_TIME, EPOCH_MILLIS
+ UNDECIDED,
+ DATE_TIME,
+ EPOCH_MILLIS,
}
companion object {
/**
* The [CsvSchema] that is used to parse the trace.
*/
- private val schema = CsvSchema.builder()
- .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER_OR_STRING)
- .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER)
- .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Memory usage [%]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Disk size [GB]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
- .setAllowComments(true)
- .setUseHeader(true)
- .setColumnSeparator(';')
- .build()
+ private val schema =
+ CsvSchema.builder()
+ .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER_OR_STRING)
+ .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Memory usage [%]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Disk size [GB]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER)
+ .setAllowComments(true)
+ .setUseHeader(true)
+ .setColumnSeparator(';')
+ .build()
}
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt
index 776a8f86..a12785f0 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt
@@ -24,7 +24,7 @@ package org.opendc.trace.bitbrains
import com.fasterxml.jackson.dataformat.csv.CsvFactory
import org.opendc.trace.TableReader
-import org.opendc.trace.conv.RESOURCE_ID
+import org.opendc.trace.conv.resourceID
import java.nio.file.Path
import java.time.Duration
import java.time.Instant
@@ -56,7 +56,7 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms
val parser = factory.createParser(path.toFile())
val reader = BitbrainsResourceStateTableReader(name, parser)
- val idCol = reader.resolve(RESOURCE_ID)
+ val idCol = reader.resolve(resourceID)
try {
if (!reader.nextRow()) {
@@ -74,17 +74,17 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms
return false
}
- private val COL_ID = 0
+ private val colID = 0
override fun resolve(name: String): Int {
return when (name) {
- RESOURCE_ID -> COL_ID
+ resourceID -> colID
else -> -1
}
}
override fun isNull(index: Int): Boolean {
- require(index in 0..COL_ID) { "Invalid column index" }
+ require(index in 0..colID) { "Invalid column index" }
return false
}
@@ -111,7 +111,7 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms
override fun getString(index: Int): String? {
check(state == State.Active) { "No active row" }
return when (index) {
- COL_ID -> id
+ colID -> id
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -128,15 +128,25 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms
throw IllegalArgumentException("Invalid column")
}
- override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
throw IllegalArgumentException("Invalid column")
}
@@ -158,6 +168,8 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms
}
private enum class State {
- Pending, Active, Closed
+ Pending,
+ Active,
+ Closed,
}
}
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
index b0809735..23853077 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
@@ -28,20 +28,20 @@ import org.opendc.trace.TableColumn
import org.opendc.trace.TableColumnType
import org.opendc.trace.TableReader
import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT
-import org.opendc.trace.conv.RESOURCE_STATE_DISK_READ
-import org.opendc.trace.conv.RESOURCE_STATE_DISK_WRITE
-import org.opendc.trace.conv.RESOURCE_STATE_MEM_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_NET_RX
-import org.opendc.trace.conv.RESOURCE_STATE_NET_TX
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
import org.opendc.trace.conv.TABLE_RESOURCES
import org.opendc.trace.conv.TABLE_RESOURCE_STATES
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateCpuUsagePct
+import org.opendc.trace.conv.resourceStateDiskRead
+import org.opendc.trace.conv.resourceStateDiskWrite
+import org.opendc.trace.conv.resourceStateMemUsage
+import org.opendc.trace.conv.resourceStateNetRx
+import org.opendc.trace.conv.resourceStateNetTx
+import org.opendc.trace.conv.resourceStateTimestamp
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
import org.opendc.trace.util.CompositeTableReader
@@ -63,9 +63,10 @@ public class BitbrainsTraceFormat : TraceFormat {
/**
* The [CsvFactory] used to create the parser.
*/
- private val factory = CsvFactory()
- .enable(CsvParser.Feature.ALLOW_COMMENTS)
- .enable(CsvParser.Feature.TRIM_SPACES)
+ private val factory =
+ CsvFactory()
+ .enable(CsvParser.Feature.ALLOW_COMMENTS)
+ .enable(CsvParser.Feature.TRIM_SPACES)
override fun create(path: Path) {
throw UnsupportedOperationException("Writing not supported for this format")
@@ -73,40 +74,50 @@ public class BitbrainsTraceFormat : TraceFormat {
override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES)
- override fun getDetails(path: Path, table: String): TableDetails {
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
return when (table) {
- TABLE_RESOURCES -> TableDetails(
- listOf(
- TableColumn(RESOURCE_ID, TableColumnType.String)
+ TABLE_RESOURCES ->
+ TableDetails(
+ listOf(
+ TableColumn(resourceID, TableColumnType.String),
+ ),
)
- )
- TABLE_RESOURCE_STATES -> TableDetails(
- listOf(
- TableColumn(RESOURCE_ID, TableColumnType.String),
- TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant),
- TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int),
- TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double),
- TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double),
- TableColumn(RESOURCE_STATE_CPU_USAGE_PCT, TableColumnType.Double),
- TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double),
- TableColumn(RESOURCE_STATE_MEM_USAGE, TableColumnType.Double),
- TableColumn(RESOURCE_STATE_DISK_READ, TableColumnType.Double),
- TableColumn(RESOURCE_STATE_DISK_WRITE, TableColumnType.Double),
- TableColumn(RESOURCE_STATE_NET_RX, TableColumnType.Double),
- TableColumn(RESOURCE_STATE_NET_TX, TableColumnType.Double)
+ TABLE_RESOURCE_STATES ->
+ TableDetails(
+ listOf(
+ TableColumn(resourceID, TableColumnType.String),
+ TableColumn(resourceStateTimestamp, TableColumnType.Instant),
+ TableColumn(resourceCpuCount, TableColumnType.Int),
+ TableColumn(resourceCpuCapacity, TableColumnType.Double),
+ TableColumn(resourceStateCpuUsage, TableColumnType.Double),
+ TableColumn(resourceStateCpuUsagePct, TableColumnType.Double),
+ TableColumn(resourceMemCapacity, TableColumnType.Double),
+ TableColumn(resourceStateMemUsage, TableColumnType.Double),
+ TableColumn(resourceStateDiskRead, TableColumnType.Double),
+ TableColumn(resourceStateDiskWrite, TableColumnType.Double),
+ TableColumn(resourceStateNetRx, TableColumnType.Double),
+ TableColumn(resourceStateNetTx, TableColumnType.Double),
+ ),
)
- )
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
return when (table) {
TABLE_RESOURCES -> {
- val vms = Files.walk(path, 1)
- .filter { !Files.isDirectory(it) && it.extension == "csv" }
- .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
- .toSortedMap()
+ val vms =
+ Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "csv" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
BitbrainsResourceTableReader(factory, vms)
}
TABLE_RESOURCE_STATES -> newResourceStateReader(path)
@@ -114,7 +125,10 @@ public class BitbrainsTraceFormat : TraceFormat {
}
}
- override fun newWriter(path: Path, table: String): TableWriter {
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
throw UnsupportedOperationException("Writing not supported for this format")
}
@@ -122,10 +136,11 @@ public class BitbrainsTraceFormat : TraceFormat {
* Construct a [TableReader] for reading over all resource state partitions.
*/
private fun newResourceStateReader(path: Path): TableReader {
- val partitions = Files.walk(path, 1)
- .filter { !Files.isDirectory(it) && it.extension == "csv" }
- .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
- .toSortedMap()
+ val partitions =
+ Files.walk(path, 1)
+ .filter { !Files.isDirectory(it) && it.extension == "csv" }
+ .collect(Collectors.toMap({ it.nameWithoutExtension }, { it }))
+ .toSortedMap()
val it = partitions.iterator()
return object : CompositeTableReader() {
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
index e8c7094b..18c59fb8 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
@@ -33,9 +33,9 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.trace.TableColumn
import org.opendc.trace.TableReader
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
import org.opendc.trace.conv.TABLE_RESOURCE_STATES
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateTimestamp
import org.opendc.trace.testkit.TableReaderTestKit
import java.nio.file.Paths
@@ -72,8 +72,8 @@ internal class BitbrainsExTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals(1631911500, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) },
- { assertEquals(21.2, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) }
+ { assertEquals(1631911500, reader.getInstant(resourceStateTimestamp)?.epochSecond) },
+ { assertEquals(21.2, reader.getDouble(resourceStateCpuUsage), 0.01) },
)
reader.close()
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
index edab8747..8ff13852 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
@@ -34,11 +34,11 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.trace.TableColumn
import org.opendc.trace.TableReader
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
import org.opendc.trace.conv.TABLE_RESOURCES
import org.opendc.trace.conv.TABLE_RESOURCE_STATES
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateTimestamp
import org.opendc.trace.testkit.TableReaderTestKit
import java.nio.file.Paths
@@ -75,8 +75,8 @@ class BitbrainsTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("bitbrains", reader.getString(RESOURCE_ID)) },
- { assertFalse(reader.nextRow()) }
+ { assertEquals("bitbrains", reader.getString(resourceID)) },
+ { assertFalse(reader.nextRow()) },
)
reader.close()
@@ -89,8 +89,8 @@ class BitbrainsTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals(1376314846, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) },
- { assertEquals(19.066, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) }
+ { assertEquals(1376314846, reader.getInstant(resourceStateTimestamp)?.epochSecond) },
+ { assertEquals(19.066, reader.getDouble(resourceStateCpuUsage), 0.01) },
)
reader.close()
diff --git a/opendc-trace/opendc-trace-calcite/build.gradle.kts b/opendc-trace/opendc-trace-calcite/build.gradle.kts
index 2ffdac3c..848e00da 100644
--- a/opendc-trace/opendc-trace-calcite/build.gradle.kts
+++ b/opendc-trace/opendc-trace-calcite/build.gradle.kts
@@ -22,7 +22,7 @@
description = "Apache Calcite (SQL) integration for the OpenDC trace library"
-/* Build configuration */
+// Build configuration
plugins {
`kotlin-library-conventions`
}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
index 74bd188b..eed52ab3 100644
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
@@ -36,7 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean
internal class TraceReaderEnumerator<E>(
private val reader: TableReader,
private val columns: List<TableColumn>,
- private val cancelFlag: AtomicBoolean
+ private val cancelFlag: AtomicBoolean,
) : Enumerator<E> {
private val columnIndices = columns.map { reader.resolve(it.name) }.toIntArray()
private var current: E? = null
@@ -80,7 +80,11 @@ internal class TraceReaderEnumerator<E>(
return res
}
- private fun convertColumn(reader: TableReader, column: TableColumn, columnIndex: Int): Any? {
+ private fun convertColumn(
+ reader: TableReader,
+ column: TableColumn,
+ columnIndex: Int,
+ ): Any? {
return when (column.type) {
is TableColumnType.Boolean -> reader.getBoolean(columnIndex)
is TableColumnType.Int -> reader.getInt(columnIndex)
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt
index 3c6badc8..cbf7ec43 100644
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt
@@ -36,7 +36,11 @@ import java.nio.file.Paths
* This factory allows users to include a schema that references a trace in a `model.json` file.
*/
public class TraceSchemaFactory : SchemaFactory {
- override fun create(parentSchema: SchemaPlus, name: String, operand: Map<String, Any>): Schema {
+ override fun create(
+ parentSchema: SchemaPlus,
+ name: String,
+ operand: Map<String, Any>,
+ ): Schema {
val base = operand[ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName] as File?
val pathParam = requireNotNull(operand["path"]) { "Trace path not specified" } as String
val path = if (base != null) File(base, pathParam).toPath() else Paths.get(pathParam)
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
index 2dd02710..e74d2ee8 100644
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
@@ -71,7 +71,11 @@ internal class TraceTable(private val table: org.opendc.trace.Table) :
return rowType
}
- override fun scan(root: DataContext, filters: MutableList<RexNode>, projects: IntArray?): Enumerable<Array<Any?>> {
+ override fun scan(
+ root: DataContext,
+ filters: MutableList<RexNode>,
+ projects: IntArray?,
+ ): Enumerable<Array<Any?>> {
// Filters are currently not supported by the OpenDC trace API. By keeping the filters in the list, Calcite
// assumes that they are declined and will perform the filters itself.
@@ -130,14 +134,18 @@ internal class TraceTable(private val table: org.opendc.trace.Table) :
return rowCount
}
- override fun <T> asQueryable(queryProvider: QueryProvider, schema: SchemaPlus, tableName: String): Queryable<T> {
+ override fun <T> asQueryable(
+ queryProvider: QueryProvider,
+ schema: SchemaPlus,
+ tableName: String,
+ ): Queryable<T> {
return object : AbstractTableQueryable<T>(queryProvider, schema, this@TraceTable, tableName) {
override fun enumerator(): Enumerator<T> {
val cancelFlag = AtomicBoolean(false)
return TraceReaderEnumerator(
this@TraceTable.table.newReader(),
this@TraceTable.table.columns,
- cancelFlag
+ cancelFlag,
)
}
@@ -155,7 +163,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) :
operation: TableModify.Operation,
updateColumnList: MutableList<String>?,
sourceExpressionList: MutableList<RexNode>?,
- flattened: Boolean
+ flattened: Boolean,
): TableModify {
cluster.planner.addRule(TraceTableModifyRule.DEFAULT.toRule())
@@ -166,7 +174,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) :
operation,
updateColumnList,
sourceExpressionList,
- flattened
+ flattened,
)
}
@@ -184,7 +192,10 @@ internal class TraceTable(private val table: org.opendc.trace.Table) :
return typeFactory.createStructType(types, names)
}
- private fun mapType(typeFactory: JavaTypeFactory, type: TableColumnType): RelDataType {
+ private fun mapType(
+ typeFactory: JavaTypeFactory,
+ type: TableColumnType,
+ ): RelDataType {
return when (type) {
is TableColumnType.Boolean -> typeFactory.createSqlType(SqlTypeName.BOOLEAN)
is TableColumnType.Int -> typeFactory.createSqlType(SqlTypeName.INTEGER)
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt
index cc23854f..eedff00d 100644
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt
@@ -59,7 +59,7 @@ internal class TraceTableModify(
operation: Operation,
updateColumnList: List<String>?,
sourceExpressionList: List<RexNode>?,
- flattened: Boolean
+ flattened: Boolean,
) : TableModify(cluster, traitSet, table, schema, input, operation, updateColumnList, sourceExpressionList, flattened),
EnumerableRel {
init {
@@ -67,7 +67,10 @@ internal class TraceTableModify(
table.unwrap(ModifiableTable::class.java) ?: throw AssertionError() // TODO: user error in validator
}
- override fun copy(traitSet: RelTraitSet, inputs: List<RelNode>?): RelNode {
+ override fun copy(
+ traitSet: RelTraitSet,
+ inputs: List<RelNode>?,
+ ): RelNode {
return TraceTableModify(
cluster,
traitSet,
@@ -77,40 +80,48 @@ internal class TraceTableModify(
operation,
updateColumnList,
sourceExpressionList,
- isFlattened
+ isFlattened,
)
}
- override fun computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery?): RelOptCost {
+ override fun computeSelfCost(
+ planner: RelOptPlanner,
+ mq: RelMetadataQuery?,
+ ): RelOptCost {
// Prefer this plan compared to the standard EnumerableTableModify.
return super.computeSelfCost(planner, mq)!!.multiplyBy(.1)
}
- override fun implement(implementor: EnumerableRelImplementor, pref: Prefer): EnumerableRel.Result {
+ override fun implement(
+ implementor: EnumerableRelImplementor,
+ pref: Prefer,
+ ): EnumerableRel.Result {
val builder = BlockBuilder()
val result = implementor.visitChild(this, 0, getInput() as EnumerableRel, pref)
val childExp = builder.append("child", result.block)
- val convertedChildExpr = if (getInput().rowType != rowType) {
- val typeFactory = cluster.typeFactory as JavaTypeFactory
- val format = EnumerableTableScan.deduceFormat(table)
- val physType = PhysTypeImpl.of(typeFactory, table.rowType, format)
- val childPhysType = result.physType
- val o = Expressions.parameter(childPhysType.javaRowType, "o")
- val expressionList = List(childPhysType.rowType.fieldCount) { i ->
- childPhysType.fieldReference(o, i, physType.getJavaFieldType(i))
- }
+ val convertedChildExpr =
+ if (getInput().rowType != rowType) {
+ val typeFactory = cluster.typeFactory as JavaTypeFactory
+ val format = EnumerableTableScan.deduceFormat(table)
+ val physType = PhysTypeImpl.of(typeFactory, table.rowType, format)
+ val childPhysType = result.physType
+ val o = Expressions.parameter(childPhysType.javaRowType, "o")
+ val expressionList =
+ List(childPhysType.rowType.fieldCount) { i ->
+ childPhysType.fieldReference(o, i, physType.getJavaFieldType(i))
+ }
- builder.append(
- "convertedChild",
- Expressions.call(
- childExp,
- BuiltInMethod.SELECT.method,
- Expressions.lambda<org.apache.calcite.linq4j.function.Function<*>>(physType.record(expressionList), o)
+ builder.append(
+ "convertedChild",
+ Expressions.call(
+ childExp,
+ BuiltInMethod.SELECT.method,
+ Expressions.lambda<org.apache.calcite.linq4j.function.Function<*>>(physType.record(expressionList), o),
+ ),
)
- )
- } else {
- childExp
- }
+ } else {
+ childExp
+ }
if (!isInsert) {
throw UnsupportedOperationException("Deletion and update not supported")
@@ -126,10 +137,10 @@ internal class TraceTableModify(
Long::class.java,
expression,
INSERT_METHOD,
- convertedChildExpr
- )
- )
- )
+ convertedChildExpr,
+ ),
+ ),
+ ),
)
val rowFormat = if (pref === Prefer.ARRAY) JavaRowFormat.ARRAY else JavaRowFormat.SCALAR
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt
index 7572e381..9c560984 100644
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt
@@ -52,14 +52,20 @@ internal class TraceTableModifyRule(config: Config) : ConverterRule(config) {
modify.operation,
modify.updateColumnList,
modify.sourceExpressionList,
- modify.isFlattened
+ modify.isFlattened,
)
}
companion object {
/** Default configuration. */
- val DEFAULT: Config = Config.INSTANCE
- .withConversion(LogicalTableModify::class.java, Convention.NONE, EnumerableConvention.INSTANCE, "TraceTableModificationRule")
- .withRuleFactory { config: Config -> TraceTableModifyRule(config) }
+ val DEFAULT: Config =
+ Config.INSTANCE
+ .withConversion(
+ LogicalTableModify::class.java,
+ Convention.NONE,
+ EnumerableConvention.INSTANCE,
+ "TraceTableModificationRule",
+ )
+ .withRuleFactory { config: Config -> TraceTableModifyRule(config) }
}
}
diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt
index 64bb31c9..93b15e5f 100644
--- a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt
+++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt
@@ -71,7 +71,7 @@ class CalciteTest {
{ assertEquals("1052", rs.getString("id")) },
{ assertTrue(rs.next()) },
{ assertEquals("1073", rs.getString("id")) },
- { assertFalse(rs.next()) }
+ { assertFalse(rs.next()) },
)
}
}
@@ -86,7 +86,7 @@ class CalciteTest {
{ assertEquals(300000, rs.getLong("duration")) },
{ assertEquals(0.0, rs.getDouble("cpu_usage")) },
{ assertTrue(rs.next()) },
- { assertEquals("1019", rs.getString("id")) }
+ { assertEquals("1019", rs.getString("id")) },
)
}
}
@@ -98,7 +98,7 @@ class CalciteTest {
{ assertTrue(rs.next()) },
{ assertArrayEquals(arrayOf("1019", "1023", "1052"), rs.getArray("members").array as Array<*>) },
{ assertEquals(0.0, rs.getDouble("target")) },
- { assertEquals(0.8830158730158756, rs.getDouble("score")) }
+ { assertEquals(0.8830158730158756, rs.getDouble("score")) },
)
}
}
@@ -109,7 +109,7 @@ class CalciteTest {
assertAll(
{ assertTrue(rs.next()) },
{ assertEquals(249.59993808, rs.getDouble("max_cpu_usage")) },
- { assertEquals(5.387240309118493, rs.getDouble("avg_cpu_usage")) }
+ { assertEquals(5.387240309118493, rs.getDouble("avg_cpu_usage")) },
)
}
}
@@ -120,12 +120,13 @@ class CalciteTest {
val newTrace = Trace.create(tmp, "opendc-vm")
runStatement(newTrace) { stmt ->
- val count = stmt.executeUpdate(
- """
- INSERT INTO trace.resources (id, start_time, stop_time, cpu_count, cpu_capacity, mem_capacity)
- VALUES (1234, '2013-08-12 13:35:46.0', '2013-09-11 13:39:58.0', 1, 2926.0, 1024.0)
- """.trimIndent()
- )
+ val count =
+ stmt.executeUpdate(
+ """
+ INSERT INTO trace.resources (id, start_time, stop_time, cpu_count, cpu_capacity, mem_capacity)
+ VALUES (1234, '2013-08-12 13:35:46.0', '2013-09-11 13:39:58.0', 1, 2926.0, 1024.0)
+ """.trimIndent(),
+ )
assertEquals(1, count)
}
@@ -136,7 +137,7 @@ class CalciteTest {
{ assertEquals(1, rs.getInt("cpu_count")) },
{ assertEquals(Timestamp.valueOf("2013-08-12 13:35:46.0"), rs.getTimestamp("start_time")) },
{ assertEquals(2926.0, rs.getDouble("cpu_capacity")) },
- { assertEquals(1024.0, rs.getDouble("mem_capacity")) }
+ { assertEquals(1024.0, rs.getDouble("mem_capacity")) },
)
}
}
@@ -145,9 +146,10 @@ class CalciteTest {
fun testUUID() {
val trace = mockk<Trace>()
every { trace.tables } returns listOf(TABLE_RESOURCES)
- every { trace.getTable(TABLE_RESOURCES)!!.columns } returns listOf(
- TableColumn("id", TableColumnType.UUID)
- )
+ every { trace.getTable(TABLE_RESOURCES)!!.columns } returns
+ listOf(
+ TableColumn("id", TableColumnType.UUID),
+ )
every { trace.getTable(TABLE_RESOURCES)!!.newReader() } answers {
object : TableReader {
override fun nextRow(): Boolean = true
@@ -195,15 +197,25 @@ class CalciteTest {
TODO("not implemented")
}
- override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
TODO("not implemented")
}
- override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
TODO("not implemented")
}
- override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
TODO("not implemented")
}
@@ -214,7 +226,7 @@ class CalciteTest {
runQuery(trace, "SELECT id FROM trace.resources") { rs ->
assertAll(
{ assertTrue(rs.next()) },
- { assertArrayEquals(byteArrayOf(0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2), rs.getBytes("id")) }
+ { assertArrayEquals(byteArrayOf(0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2), rs.getBytes("id")) },
)
}
}
@@ -222,7 +234,11 @@ class CalciteTest {
/**
* Helper function to run statement for the specified trace.
*/
- private fun runQuery(trace: Trace, query: String, block: (ResultSet) -> Unit) {
+ private fun runQuery(
+ trace: Trace,
+ query: String,
+ block: (ResultSet) -> Unit,
+ ) {
runStatement(trace) { stmt ->
val rs = stmt.executeQuery(query)
rs.use { block(rs) }
@@ -232,7 +248,10 @@ class CalciteTest {
/**
* Helper function to run statement for the specified trace.
*/
- private fun runStatement(trace: Trace, block: (Statement) -> Unit) {
+ private fun runStatement(
+ trace: Trace,
+ block: (Statement) -> Unit,
+ ) {
val info = Properties()
info.setProperty("lex", "JAVA")
val connection = DriverManager.getConnection("jdbc:calcite:", info).unwrap(CalciteConnection::class.java)
diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt
index 735cedce..eb4bc769 100644
--- a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt
+++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt
@@ -48,7 +48,7 @@ class TraceSchemaFactoryTest {
{ assertEquals("1019", rs.getString("id")) },
{ assertEquals(1, rs.getInt("cpu_count")) },
{ assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("start_time")) },
- { assertEquals(181352.0, rs.getDouble("mem_capacity")) }
+ { assertEquals(181352.0, rs.getDouble("mem_capacity")) },
)
} finally {
rs.close()
diff --git a/opendc-trace/opendc-trace-gwf/build.gradle.kts b/opendc-trace/opendc-trace-gwf/build.gradle.kts
index 0c041439..4d0bd796 100644
--- a/opendc-trace/opendc-trace-gwf/build.gradle.kts
+++ b/opendc-trace/opendc-trace-gwf/build.gradle.kts
@@ -22,7 +22,7 @@
description = "Support for GWF traces in OpenDC"
-/* Build configuration */
+// Build configuration
plugins {
`kotlin-library-conventions`
}
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
index 78ce6ad4..8a2a99cb 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt
@@ -88,19 +88,19 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
override fun resolve(name: String): Int {
return when (name) {
- TASK_ID -> COL_JOB_ID
- TASK_WORKFLOW_ID -> COL_WORKFLOW_ID
- TASK_SUBMIT_TIME -> COL_SUBMIT_TIME
- TASK_RUNTIME -> COL_RUNTIME
- TASK_ALLOC_NCPUS -> COL_NPROC
- TASK_REQ_NCPUS -> COL_REQ_NPROC
- TASK_PARENTS -> COL_DEPS
+ TASK_ID -> colJobID
+ TASK_WORKFLOW_ID -> colWorkflowID
+ TASK_SUBMIT_TIME -> colSubmitTime
+ TASK_RUNTIME -> colRuntime
+ TASK_ALLOC_NCPUS -> colNproc
+ TASK_REQ_NCPUS -> colReqNproc
+ TASK_PARENTS -> colDeps
else -> -1
}
}
override fun isNull(index: Int): Boolean {
- require(index in 0..COL_DEPS) { "Invalid column" }
+ require(index in 0..colDeps) { "Invalid column" }
return false
}
@@ -111,8 +111,8 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
override fun getInt(index: Int): Int {
checkActive()
return when (index) {
- COL_REQ_NPROC -> reqNProcs
- COL_NPROC -> nProcs
+ colReqNproc -> reqNProcs
+ colNproc -> nProcs
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -132,8 +132,8 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
override fun getString(index: Int): String? {
checkActive()
return when (index) {
- COL_JOB_ID -> jobId
- COL_WORKFLOW_ID -> workflowId
+ colJobID -> jobId
+ colWorkflowID -> workflowId
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -145,7 +145,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
override fun getInstant(index: Int): Instant? {
checkActive()
return when (index) {
- COL_SUBMIT_TIME -> submitTime
+ colSubmitTime -> submitTime
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -153,23 +153,33 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
override fun getDuration(index: Int): Duration? {
checkActive()
return when (index) {
- COL_RUNTIME -> runtime
+ colRuntime -> runtime
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
checkActive()
return when (index) {
- COL_DEPS -> TYPE_DEPS.convertTo(dependencies, elementType)
+ colDeps -> typeDeps.convertTo(dependencies, elementType)
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -245,31 +255,32 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader {
dependencies = emptySet()
}
- private val COL_WORKFLOW_ID = 0
- private val COL_JOB_ID = 1
- private val COL_SUBMIT_TIME = 2
- private val COL_RUNTIME = 3
- private val COL_NPROC = 4
- private val COL_REQ_NPROC = 5
- private val COL_DEPS = 6
+ private val colWorkflowID = 0
+ private val colJobID = 1
+ private val colSubmitTime = 2
+ private val colRuntime = 3
+ private val colNproc = 4
+ private val colReqNproc = 5
+ private val colDeps = 6
- private val TYPE_DEPS = TableColumnType.Set(TableColumnType.String)
+ private val typeDeps = TableColumnType.Set(TableColumnType.String)
companion object {
/**
* The [CsvSchema] that is used to parse the trace.
*/
- private val schema = CsvSchema.builder()
- .addColumn("WorkflowID", CsvSchema.ColumnType.NUMBER)
- .addColumn("JobID", CsvSchema.ColumnType.NUMBER)
- .addColumn("SubmitTime", CsvSchema.ColumnType.NUMBER)
- .addColumn("RunTime", CsvSchema.ColumnType.NUMBER)
- .addColumn("NProcs", CsvSchema.ColumnType.NUMBER)
- .addColumn("ReqNProcs", CsvSchema.ColumnType.NUMBER)
- .addColumn("Dependencies", CsvSchema.ColumnType.STRING)
- .setAllowComments(true)
- .setUseHeader(true)
- .setColumnSeparator(',')
- .build()
+ private val schema =
+ CsvSchema.builder()
+ .addColumn("WorkflowID", CsvSchema.ColumnType.NUMBER)
+ .addColumn("JobID", CsvSchema.ColumnType.NUMBER)
+ .addColumn("SubmitTime", CsvSchema.ColumnType.NUMBER)
+ .addColumn("RunTime", CsvSchema.ColumnType.NUMBER)
+ .addColumn("NProcs", CsvSchema.ColumnType.NUMBER)
+ .addColumn("ReqNProcs", CsvSchema.ColumnType.NUMBER)
+ .addColumn("Dependencies", CsvSchema.ColumnType.STRING)
+ .setAllowComments(true)
+ .setUseHeader(true)
+ .setColumnSeparator(',')
+ .build()
}
}
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
index d2ded0ee..097c5593 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
@@ -52,9 +52,10 @@ public class GwfTraceFormat : TraceFormat {
/**
* The [CsvFactory] used to create the parser.
*/
- private val factory = CsvFactory()
- .enable(CsvParser.Feature.ALLOW_COMMENTS)
- .enable(CsvParser.Feature.TRIM_SPACES)
+ private val factory =
+ CsvFactory()
+ .enable(CsvParser.Feature.ALLOW_COMMENTS)
+ .enable(CsvParser.Feature.TRIM_SPACES)
override fun create(path: Path) {
throw UnsupportedOperationException("Writing not supported for this format")
@@ -62,31 +63,42 @@ public class GwfTraceFormat : TraceFormat {
override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
- override fun getDetails(path: Path, table: String): TableDetails {
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
return when (table) {
- TABLE_TASKS -> TableDetails(
- listOf(
- TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
- TableColumn(TASK_ID, TableColumnType.String),
- TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
- TableColumn(TASK_RUNTIME, TableColumnType.Duration),
- TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
- TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int),
- TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String))
+ TABLE_TASKS ->
+ TableDetails(
+ listOf(
+ TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
+ TableColumn(TASK_ID, TableColumnType.String),
+ TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
+ TableColumn(TASK_RUNTIME, TableColumnType.Duration),
+ TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
+ ),
)
- )
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
return when (table) {
TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile()))
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newWriter(path: Path, table: String): TableWriter {
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
throw UnsupportedOperationException("Writing not supported for this format")
}
}
diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
index c75e86df..9c97547a 100644
--- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
@@ -82,7 +82,7 @@ internal class GwfTraceFormatTest {
{ assertEquals("1", reader.getString(TASK_ID)) },
{ assertEquals(Instant.ofEpochSecond(16), reader.getInstant(TASK_SUBMIT_TIME)) },
{ assertEquals(Duration.ofSeconds(11), reader.getDuration(TASK_RUNTIME)) },
- { assertEquals(emptySet<String>(), reader.getSet(TASK_PARENTS, String::class.java)) }
+ { assertEquals(emptySet<String>(), reader.getSet(TASK_PARENTS, String::class.java)) },
)
}
@@ -101,7 +101,7 @@ internal class GwfTraceFormatTest {
{ assertEquals("7", reader.getString(TASK_ID)) },
{ assertEquals(Instant.ofEpochSecond(87), reader.getInstant(TASK_SUBMIT_TIME)) },
{ assertEquals(Duration.ofSeconds(11), reader.getDuration(TASK_RUNTIME)) },
- { assertEquals(setOf("4", "5", "6"), reader.getSet(TASK_PARENTS, String::class.java)) }
+ { assertEquals(setOf("4", "5", "6"), reader.getSet(TASK_PARENTS, String::class.java)) },
)
}
diff --git a/opendc-trace/opendc-trace-opendc/build.gradle.kts b/opendc-trace/opendc-trace-opendc/build.gradle.kts
index 18967136..d9ed15f8 100644
--- a/opendc-trace/opendc-trace-opendc/build.gradle.kts
+++ b/opendc-trace/opendc-trace-opendc/build.gradle.kts
@@ -22,7 +22,7 @@
description = "Support for OpenDC-specific trace formats"
-/* Build configuration */
+// Build configuration
plugins {
`kotlin-library-conventions`
`benchmark-conventions`
diff --git a/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt b/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt
index e504cf2f..e179e261 100644
--- a/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt
+++ b/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt
@@ -23,10 +23,10 @@
package org.opendc.trace.opendc
import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
-import org.opendc.trace.conv.RESOURCE_ID
import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS
import org.opendc.trace.conv.TABLE_RESOURCES
import org.opendc.trace.conv.TABLE_RESOURCE_STATES
+import org.opendc.trace.conv.resourceID
import org.opendc.trace.spi.TraceFormat
import org.openjdk.jmh.annotations.Benchmark
import org.openjdk.jmh.annotations.Fork
@@ -60,7 +60,7 @@ class OdcVmTraceBenchmarks {
fun benchmarkResourcesReader(bh: Blackhole) {
val reader = format.newReader(path, TABLE_RESOURCES, null)
try {
- val idColumn = reader.resolve(RESOURCE_ID)
+ val idColumn = reader.resolve(resourceID)
while (reader.nextRow()) {
bh.consume(reader.getString(idColumn))
}
@@ -73,7 +73,7 @@ class OdcVmTraceBenchmarks {
fun benchmarkResourceStatesReader(bh: Blackhole) {
val reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
try {
- val idColumn = reader.resolve(RESOURCE_ID)
+ val idColumn = reader.resolve(resourceID)
while (reader.nextRow()) {
bh.consume(reader.getString(idColumn))
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt
index 3e1fca06..7bf48f1a 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt
@@ -65,24 +65,24 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser)
}
}
- private val COL_MEMBERS = 0
- private val COL_TARGET = 1
- private val COL_SCORE = 2
+ private val colMembers = 0
+ private val colTarget = 1
+ private val colScore = 2
- private val TYPE_MEMBERS = TableColumnType.Set(TableColumnType.String)
+ private val typeMembers = TableColumnType.Set(TableColumnType.String)
override fun resolve(name: String): Int {
return when (name) {
- INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS
- INTERFERENCE_GROUP_TARGET -> COL_TARGET
- INTERFERENCE_GROUP_SCORE -> COL_SCORE
+ INTERFERENCE_GROUP_MEMBERS -> colMembers
+ INTERFERENCE_GROUP_TARGET -> colTarget
+ INTERFERENCE_GROUP_SCORE -> colScore
else -> -1
}
}
override fun isNull(index: Int): Boolean {
return when (index) {
- COL_MEMBERS, COL_TARGET, COL_SCORE -> false
+ colMembers, colTarget, colScore -> false
else -> throw IllegalArgumentException("Invalid column index $index")
}
}
@@ -106,8 +106,8 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser)
override fun getDouble(index: Int): Double {
checkActive()
return when (index) {
- COL_TARGET -> targetLoad
- COL_SCORE -> score
+ colTarget -> targetLoad
+ colScore -> score
else -> throw IllegalArgumentException("Invalid column $index")
}
}
@@ -128,19 +128,29 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser)
throw IllegalArgumentException("Invalid column $index")
}
- override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
throw IllegalArgumentException("Invalid column $index")
}
- override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
checkActive()
return when (index) {
- COL_MEMBERS -> TYPE_MEMBERS.convertTo(members, elementType)
+ colMembers -> typeMembers.convertTo(members, elementType)
else -> throw IllegalArgumentException("Invalid column $index")
}
}
- override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
throw IllegalArgumentException("Invalid column $index")
}
@@ -196,7 +206,10 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser)
/**
* Parse the members of a group.
*/
- private fun parseGroupMembers(parser: JsonParser, members: MutableSet<String>) {
+ private fun parseGroupMembers(
+ parser: JsonParser,
+ members: MutableSet<String>,
+ ) {
if (!parser.isExpectedStartArrayToken) {
throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}")
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt
index c6905c5b..93f5a976 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt
@@ -70,70 +70,106 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener
override fun resolve(name: String): Int {
return when (name) {
- INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS
- INTERFERENCE_GROUP_TARGET -> COL_TARGET
- INTERFERENCE_GROUP_SCORE -> COL_SCORE
+ INTERFERENCE_GROUP_MEMBERS -> colMembers
+ INTERFERENCE_GROUP_TARGET -> colTarget
+ INTERFERENCE_GROUP_SCORE -> colScore
else -> -1
}
}
- override fun setBoolean(index: Int, value: Boolean) {
+ override fun setBoolean(
+ index: Int,
+ value: Boolean,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setInt(index: Int, value: Int) {
+ override fun setInt(
+ index: Int,
+ value: Int,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setLong(index: Int, value: Long) {
+ override fun setLong(
+ index: Int,
+ value: Long,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setFloat(index: Int, value: Float) {
+ override fun setFloat(
+ index: Int,
+ value: Float,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setDouble(index: Int, value: Double) {
+ override fun setDouble(
+ index: Int,
+ value: Double,
+ ) {
check(isRowActive) { "No active row" }
when (index) {
- COL_TARGET -> targetLoad = (value as Number).toDouble()
- COL_SCORE -> score = (value as Number).toDouble()
+ colTarget -> targetLoad = (value as Number).toDouble()
+ colScore -> score = (value as Number).toDouble()
else -> throw IllegalArgumentException("Invalid column $index")
}
}
- override fun setString(index: Int, value: String) {
+ override fun setString(
+ index: Int,
+ value: String,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setUUID(index: Int, value: UUID) {
+ override fun setUUID(
+ index: Int,
+ value: UUID,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setInstant(index: Int, value: Instant) {
+ override fun setInstant(
+ index: Int,
+ value: Instant,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setDuration(index: Int, value: Duration) {
+ override fun setDuration(
+ index: Int,
+ value: Duration,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun <T> setList(index: Int, value: List<T>) {
+ override fun <T> setList(
+ index: Int,
+ value: List<T>,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun <T> setSet(index: Int, value: Set<T>) {
+ override fun <T> setSet(
+ index: Int,
+ value: Set<T>,
+ ) {
check(isRowActive) { "No active row" }
@Suppress("UNCHECKED_CAST")
when (index) {
- COL_MEMBERS -> members = value as Set<String>
+ colMembers -> members = value as Set<String>
else -> throw IllegalArgumentException("Invalid column index $index")
}
}
- override fun <K, V> setMap(index: Int, value: Map<K, V>) {
+ override fun <K, V> setMap(
+ index: Int,
+ value: Map<K, V>,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
@@ -146,9 +182,9 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener
generator.close()
}
- private val COL_MEMBERS = 0
- private val COL_TARGET = 1
- private val COL_SCORE = 2
+ private val colMembers = 0
+ private val colTarget = 1
+ private val colScore = 2
private var members = emptySet<String>()
private var targetLoad = Double.POSITIVE_INFINITY
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
index ff9a98d7..8e54f2b0 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
@@ -23,11 +23,11 @@
package org.opendc.trace.opendc
import org.opendc.trace.TableReader
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_DURATION
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateDuration
+import org.opendc.trace.conv.resourceStateTimestamp
import org.opendc.trace.opendc.parquet.ResourceState
import org.opendc.trace.util.parquet.LocalParquetReader
import java.time.Duration
@@ -55,25 +55,25 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
}
}
- private val COL_ID = 0
- private val COL_TIMESTAMP = 1
- private val COL_DURATION = 2
- private val COL_CPU_COUNT = 3
- private val COL_CPU_USAGE = 4
+ private val colID = 0
+ private val colTimestamp = 1
+ private val colDuration = 2
+ private val colCpuCount = 3
+ private val colCpuUsage = 4
override fun resolve(name: String): Int {
return when (name) {
- RESOURCE_ID -> COL_ID
- RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP
- RESOURCE_STATE_DURATION -> COL_DURATION
- RESOURCE_CPU_COUNT -> COL_CPU_COUNT
- RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE
+ resourceID -> colID
+ resourceStateTimestamp -> colTimestamp
+ resourceStateDuration -> colDuration
+ resourceCpuCount -> colCpuCount
+ resourceStateCpuUsage -> colCpuUsage
else -> -1
}
}
override fun isNull(index: Int): Boolean {
- require(index in 0..COL_CPU_USAGE) { "Invalid column index" }
+ require(index in 0..colCpuUsage) { "Invalid column index" }
return false
}
@@ -84,7 +84,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
override fun getInt(index: Int): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_COUNT -> record.cpuCount
+ colCpuCount -> record.cpuCount
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
@@ -100,7 +100,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
override fun getDouble(index: Int): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_USAGE -> record.cpuUsage
+ colCpuUsage -> record.cpuUsage
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
@@ -109,7 +109,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_ID -> record.id
+ colID -> record.id
else -> throw IllegalArgumentException("Invalid column index $index")
}
}
@@ -122,7 +122,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_TIMESTAMP -> record.timestamp
+ colTimestamp -> record.timestamp
else -> throw IllegalArgumentException("Invalid column index $index")
}
}
@@ -131,20 +131,30 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_DURATION -> record.duration
+ colDuration -> record.duration
else -> throw IllegalArgumentException("Invalid column index $index")
}
}
- override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
index cf0a401b..01cd13c8 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
@@ -24,11 +24,11 @@ package org.opendc.trace.opendc
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_DURATION
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateDuration
+import org.opendc.trace.conv.resourceStateTimestamp
import org.opendc.trace.opendc.parquet.ResourceState
import java.time.Duration
import java.time.Instant
@@ -41,113 +41,149 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
/**
* The current state for the record that is being written.
*/
- private var _isActive = false
- private var _id: String = ""
- private var _timestamp: Instant = Instant.MIN
- private var _duration: Duration = Duration.ZERO
- private var _cpuCount: Int = 0
- private var _cpuUsage: Double = Double.NaN
+ private var localIsActive = false
+ private var localID: String = ""
+ private var localTimestamp: Instant = Instant.MIN
+ private var localDuration: Duration = Duration.ZERO
+ private var localCpuCount: Int = 0
+ private var localCpuUsage: Double = Double.NaN
override fun startRow() {
- _isActive = true
- _id = ""
- _timestamp = Instant.MIN
- _duration = Duration.ZERO
- _cpuCount = 0
- _cpuUsage = Double.NaN
+ localIsActive = true
+ localID = ""
+ localTimestamp = Instant.MIN
+ localDuration = Duration.ZERO
+ localCpuCount = 0
+ localCpuUsage = Double.NaN
}
override fun endRow() {
- check(_isActive) { "No active row" }
- _isActive = false
+ check(localIsActive) { "No active row" }
+ localIsActive = false
- check(lastId != _id || _timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" }
+ check(lastId != localID || localTimestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" }
- writer.write(ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage))
+ writer.write(ResourceState(localID, localTimestamp, localDuration, localCpuCount, localCpuUsage))
- lastId = _id
- lastTimestamp = _timestamp
+ lastId = localID
+ lastTimestamp = localTimestamp
}
override fun resolve(name: String): Int {
return when (name) {
- RESOURCE_ID -> COL_ID
- RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP
- RESOURCE_STATE_DURATION -> COL_DURATION
- RESOURCE_CPU_COUNT -> COL_CPU_COUNT
- RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE
+ resourceID -> colID
+ resourceStateTimestamp -> colTimestamp
+ resourceStateDuration -> colDuration
+ resourceCpuCount -> colCpuCount
+ resourceStateCpuUsage -> colCpuUsage
else -> -1
}
}
- override fun setBoolean(index: Int, value: Boolean) {
+ override fun setBoolean(
+ index: Int,
+ value: Boolean,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun setInt(index: Int, value: Int) {
- check(_isActive) { "No active row" }
+ override fun setInt(
+ index: Int,
+ value: Int,
+ ) {
+ check(localIsActive) { "No active row" }
when (index) {
- COL_CPU_COUNT -> _cpuCount = value
+ colCpuCount -> localCpuCount = value
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
- override fun setLong(index: Int, value: Long) {
+ override fun setLong(
+ index: Int,
+ value: Long,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun setFloat(index: Int, value: Float) {
+ override fun setFloat(
+ index: Int,
+ value: Float,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun setDouble(index: Int, value: Double) {
- check(_isActive) { "No active row" }
+ override fun setDouble(
+ index: Int,
+ value: Double,
+ ) {
+ check(localIsActive) { "No active row" }
when (index) {
- COL_CPU_USAGE -> _cpuUsage = value
+ colCpuUsage -> localCpuUsage = value
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
- override fun setString(index: Int, value: String) {
- check(_isActive) { "No active row" }
+ override fun setString(
+ index: Int,
+ value: String,
+ ) {
+ check(localIsActive) { "No active row" }
when (index) {
- COL_ID -> _id = value
+ colID -> localID = value
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
- override fun setUUID(index: Int, value: UUID) {
+ override fun setUUID(
+ index: Int,
+ value: UUID,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun setInstant(index: Int, value: Instant) {
- check(_isActive) { "No active row" }
+ override fun setInstant(
+ index: Int,
+ value: Instant,
+ ) {
+ check(localIsActive) { "No active row" }
when (index) {
- COL_TIMESTAMP -> _timestamp = value
+ colTimestamp -> localTimestamp = value
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
- override fun setDuration(index: Int, value: Duration) {
- check(_isActive) { "No active row" }
+ override fun setDuration(
+ index: Int,
+ value: Duration,
+ ) {
+ check(localIsActive) { "No active row" }
when (index) {
- COL_DURATION -> _duration = value
+ colDuration -> localDuration = value
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
- override fun <T> setList(index: Int, value: List<T>) {
+ override fun <T> setList(
+ index: Int,
+ value: List<T>,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun <T> setSet(index: Int, value: Set<T>) {
+ override fun <T> setSet(
+ index: Int,
+ value: Set<T>,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun <K, V> setMap(index: Int, value: Map<K, V>) {
+ override fun <K, V> setMap(
+ index: Int,
+ value: Map<K, V>,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
@@ -165,9 +201,9 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
private var lastId: String? = null
private var lastTimestamp: Instant = Instant.MAX
- private val COL_ID = 0
- private val COL_TIMESTAMP = 1
- private val COL_DURATION = 2
- private val COL_CPU_COUNT = 3
- private val COL_CPU_USAGE = 4
+ private val colID = 0
+ private val colTimestamp = 1
+ private val colDuration = 2
+ private val colCpuCount = 3
+ private val colCpuUsage = 4
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
index d4613158..195929aa 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
@@ -23,12 +23,12 @@
package org.opendc.trace.opendc
import org.opendc.trace.TableReader
-import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY
-import org.opendc.trace.conv.RESOURCE_START_TIME
-import org.opendc.trace.conv.RESOURCE_STOP_TIME
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStartTime
+import org.opendc.trace.conv.resourceStopTime
import org.opendc.trace.opendc.parquet.Resource
import org.opendc.trace.util.parquet.LocalParquetReader
import java.time.Duration
@@ -56,27 +56,27 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
}
}
- private val COL_ID = 0
- private val COL_START_TIME = 1
- private val COL_STOP_TIME = 2
- private val COL_CPU_COUNT = 3
- private val COL_CPU_CAPACITY = 4
- private val COL_MEM_CAPACITY = 5
+ private val colID = 0
+ private val colStartTime = 1
+ private val colStopTime = 2
+ private val colCpuCount = 3
+ private val colCpuCapacity = 4
+ private val colMemCapacity = 5
override fun resolve(name: String): Int {
return when (name) {
- RESOURCE_ID -> COL_ID
- RESOURCE_START_TIME -> COL_START_TIME
- RESOURCE_STOP_TIME -> COL_STOP_TIME
- RESOURCE_CPU_COUNT -> COL_CPU_COUNT
- RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY
- RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY
+ resourceID -> colID
+ resourceStartTime -> colStartTime
+ resourceStopTime -> colStopTime
+ resourceCpuCount -> colCpuCount
+ resourceCpuCapacity -> colCpuCapacity
+ resourceMemCapacity -> colMemCapacity
else -> -1
}
}
override fun isNull(index: Int): Boolean {
- require(index in 0..COL_MEM_CAPACITY) { "Invalid column index" }
+ require(index in 0..colMemCapacity) { "Invalid column index" }
return false
}
@@ -88,7 +88,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_COUNT -> record.cpuCount
+ colCpuCount -> record.cpuCount
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -105,8 +105,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_CAPACITY -> record.cpuCapacity
- COL_MEM_CAPACITY -> record.memCapacity
+ colCpuCapacity -> record.cpuCapacity
+ colMemCapacity -> record.memCapacity
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -115,7 +115,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_ID -> record.id
+ colID -> record.id
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -128,8 +128,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_START_TIME -> record.startTime
- COL_STOP_TIME -> record.stopTime
+ colStartTime -> record.startTime
+ colStopTime -> record.stopTime
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -138,15 +138,25 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
throw IllegalArgumentException("Invalid column")
}
- override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
throw IllegalArgumentException("Invalid column")
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
index 73a03891..5bbc2f3f 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
@@ -24,12 +24,12 @@ package org.opendc.trace.opendc
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY
-import org.opendc.trace.conv.RESOURCE_START_TIME
-import org.opendc.trace.conv.RESOURCE_STOP_TIME
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStartTime
+import org.opendc.trace.conv.resourceStopTime
import org.opendc.trace.opendc.parquet.Resource
import java.time.Duration
import java.time.Instant
@@ -42,105 +42,141 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
/**
* The current state for the record that is being written.
*/
- private var _isActive = false
- private var _id: String = ""
- private var _startTime: Instant = Instant.MIN
- private var _stopTime: Instant = Instant.MIN
- private var _cpuCount: Int = 0
- private var _cpuCapacity: Double = Double.NaN
- private var _memCapacity: Double = Double.NaN
+ private var localIsActive = false
+ private var localId: String = ""
+ private var localStartTime: Instant = Instant.MIN
+ private var localStopTime: Instant = Instant.MIN
+ private var localCpuCount: Int = 0
+ private var localCpuCapacity: Double = Double.NaN
+ private var localMemCapacity: Double = Double.NaN
override fun startRow() {
- _isActive = true
- _id = ""
- _startTime = Instant.MIN
- _stopTime = Instant.MIN
- _cpuCount = 0
- _cpuCapacity = Double.NaN
- _memCapacity = Double.NaN
+ localIsActive = true
+ localId = ""
+ localStartTime = Instant.MIN
+ localStopTime = Instant.MIN
+ localCpuCount = 0
+ localCpuCapacity = Double.NaN
+ localMemCapacity = Double.NaN
}
override fun endRow() {
- check(_isActive) { "No active row" }
- _isActive = false
- writer.write(Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity))
+ check(localIsActive) { "No active row" }
+ localIsActive = false
+ writer.write(Resource(localId, localStartTime, localStopTime, localCpuCount, localCpuCapacity, localMemCapacity))
}
override fun resolve(name: String): Int {
return when (name) {
- RESOURCE_ID -> COL_ID
- RESOURCE_START_TIME -> COL_START_TIME
- RESOURCE_STOP_TIME -> COL_STOP_TIME
- RESOURCE_CPU_COUNT -> COL_CPU_COUNT
- RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY
- RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY
+ resourceID -> colID
+ resourceStartTime -> colStartTime
+ resourceStopTime -> colStopTime
+ resourceCpuCount -> colCpuCount
+ resourceCpuCapacity -> colCpuCapacity
+ resourceMemCapacity -> colMemCapacity
else -> -1
}
}
- override fun setBoolean(index: Int, value: Boolean) {
+ override fun setBoolean(
+ index: Int,
+ value: Boolean,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun setInt(index: Int, value: Int) {
- check(_isActive) { "No active row" }
+ override fun setInt(
+ index: Int,
+ value: Int,
+ ) {
+ check(localIsActive) { "No active row" }
when (index) {
- COL_CPU_COUNT -> _cpuCount = value
+ colCpuCount -> localCpuCount = value
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
- override fun setLong(index: Int, value: Long) {
+ override fun setLong(
+ index: Int,
+ value: Long,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun setFloat(index: Int, value: Float) {
+ override fun setFloat(
+ index: Int,
+ value: Float,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun setDouble(index: Int, value: Double) {
- check(_isActive) { "No active row" }
+ override fun setDouble(
+ index: Int,
+ value: Double,
+ ) {
+ check(localIsActive) { "No active row" }
when (index) {
- COL_CPU_CAPACITY -> _cpuCapacity = value
- COL_MEM_CAPACITY -> _memCapacity = value
+ colCpuCapacity -> localCpuCapacity = value
+ colMemCapacity -> localMemCapacity = value
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
- override fun setString(index: Int, value: String) {
- check(_isActive) { "No active row" }
+ override fun setString(
+ index: Int,
+ value: String,
+ ) {
+ check(localIsActive) { "No active row" }
when (index) {
- COL_ID -> _id = value
+ colID -> localId = value
else -> throw IllegalArgumentException("Invalid column index $index")
}
}
- override fun setUUID(index: Int, value: UUID) {
+ override fun setUUID(
+ index: Int,
+ value: UUID,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun setInstant(index: Int, value: Instant) {
- check(_isActive) { "No active row" }
+ override fun setInstant(
+ index: Int,
+ value: Instant,
+ ) {
+ check(localIsActive) { "No active row" }
when (index) {
- COL_START_TIME -> _startTime = value
- COL_STOP_TIME -> _stopTime = value
+ colStartTime -> localStartTime = value
+ colStopTime -> localStopTime = value
else -> throw IllegalArgumentException("Invalid column index $index")
}
}
- override fun setDuration(index: Int, value: Duration) {
+ override fun setDuration(
+ index: Int,
+ value: Duration,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun <T> setList(index: Int, value: List<T>) {
+ override fun <T> setList(
+ index: Int,
+ value: List<T>,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun <T> setSet(index: Int, value: Set<T>) {
+ override fun <T> setSet(
+ index: Int,
+ value: Set<T>,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun <K, V> setMap(index: Int, value: Map<K, V>) {
+ override fun <K, V> setMap(
+ index: Int,
+ value: Map<K, V>,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
@@ -152,10 +188,10 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
writer.close()
}
- private val COL_ID = 0
- private val COL_START_TIME = 1
- private val COL_STOP_TIME = 2
- private val COL_CPU_COUNT = 3
- private val COL_CPU_CAPACITY = 4
- private val COL_MEM_CAPACITY = 5
+ private val colID = 0
+ private val colStartTime = 1
+ private val colStopTime = 2
+ private val colCpuCount = 3
+ private val colCpuCapacity = 4
+ private val colMemCapacity = 5
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index c4790538..9abe872f 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -34,18 +34,18 @@ import org.opendc.trace.TableWriter
import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
-import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY
-import org.opendc.trace.conv.RESOURCE_START_TIME
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_DURATION
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
-import org.opendc.trace.conv.RESOURCE_STOP_TIME
import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS
import org.opendc.trace.conv.TABLE_RESOURCES
import org.opendc.trace.conv.TABLE_RESOURCE_STATES
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStartTime
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateDuration
+import org.opendc.trace.conv.resourceStateTimestamp
+import org.opendc.trace.conv.resourceStopTime
import org.opendc.trace.opendc.parquet.ResourceReadSupport
import org.opendc.trace.opendc.parquet.ResourceStateReadSupport
import org.opendc.trace.opendc.parquet.ResourceStateWriteSupport
@@ -86,39 +86,49 @@ public class OdcVmTraceFormat : TraceFormat {
override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES, TABLE_INTERFERENCE_GROUPS)
- override fun getDetails(path: Path, table: String): TableDetails {
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
return when (table) {
- TABLE_RESOURCES -> TableDetails(
- listOf(
- TableColumn(RESOURCE_ID, TableColumnType.String),
- TableColumn(RESOURCE_START_TIME, TableColumnType.Instant),
- TableColumn(RESOURCE_STOP_TIME, TableColumnType.Instant),
- TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int),
- TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double),
- TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double)
+ TABLE_RESOURCES ->
+ TableDetails(
+ listOf(
+ TableColumn(resourceID, TableColumnType.String),
+ TableColumn(resourceStartTime, TableColumnType.Instant),
+ TableColumn(resourceStopTime, TableColumnType.Instant),
+ TableColumn(resourceCpuCount, TableColumnType.Int),
+ TableColumn(resourceCpuCapacity, TableColumnType.Double),
+ TableColumn(resourceMemCapacity, TableColumnType.Double),
+ ),
)
- )
- TABLE_RESOURCE_STATES -> TableDetails(
- listOf(
- TableColumn(RESOURCE_ID, TableColumnType.String),
- TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant),
- TableColumn(RESOURCE_STATE_DURATION, TableColumnType.Duration),
- TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int),
- TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double)
+ TABLE_RESOURCE_STATES ->
+ TableDetails(
+ listOf(
+ TableColumn(resourceID, TableColumnType.String),
+ TableColumn(resourceStateTimestamp, TableColumnType.Instant),
+ TableColumn(resourceStateDuration, TableColumnType.Duration),
+ TableColumn(resourceCpuCount, TableColumnType.Int),
+ TableColumn(resourceStateCpuUsage, TableColumnType.Double),
+ ),
)
- )
- TABLE_INTERFERENCE_GROUPS -> TableDetails(
- listOf(
- TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)),
- TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double),
- TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double)
+ TABLE_INTERFERENCE_GROUPS ->
+ TableDetails(
+ listOf(
+ TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double),
+ TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double),
+ ),
)
- )
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
return when (table) {
TABLE_RESOURCES -> {
val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection))
@@ -130,11 +140,12 @@ public class OdcVmTraceFormat : TraceFormat {
}
TABLE_INTERFERENCE_GROUPS -> {
val modelPath = path.resolve("interference-model.json")
- val parser = if (modelPath.exists()) {
- jsonFactory.createParser(modelPath.toFile())
- } else {
- jsonFactory.createParser("[]") // If model does not exist, return empty model
- }
+ val parser =
+ if (modelPath.exists()) {
+ jsonFactory.createParser(modelPath.toFile())
+ } else {
+ jsonFactory.createParser("[]") // If model does not exist, return empty model
+ }
OdcVmInterferenceJsonTableReader(parser)
}
@@ -142,26 +153,31 @@ public class OdcVmTraceFormat : TraceFormat {
}
}
- override fun newWriter(path: Path, table: String): TableWriter {
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
return when (table) {
TABLE_RESOURCES -> {
- val writer = LocalParquetWriter.builder(path.resolve("meta.parquet"), ResourceWriteSupport())
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .withPageWriteChecksumEnabled(true)
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- .build()
+ val writer =
+ LocalParquetWriter.builder(path.resolve("meta.parquet"), ResourceWriteSupport())
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withPageWriteChecksumEnabled(true)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .build()
OdcVmResourceTableWriter(writer)
}
TABLE_RESOURCE_STATES -> {
- val writer = LocalParquetWriter.builder(path.resolve("trace.parquet"), ResourceStateWriteSupport())
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .withDictionaryEncoding("id", true)
- .withBloomFilterEnabled("id", true)
- .withPageWriteChecksumEnabled(true)
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- .build()
+ val writer =
+ LocalParquetWriter.builder(path.resolve("trace.parquet"), ResourceStateWriteSupport())
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withDictionaryEncoding("id", true)
+ .withBloomFilterEnabled("id", true)
+ .withPageWriteChecksumEnabled(true)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .build()
OdcVmResourceStateTableWriter(writer)
}
TABLE_INTERFERENCE_GROUPS -> {
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt
index c6db45b5..13eefe72 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt
@@ -33,5 +33,5 @@ internal data class Resource(
val stopTime: Instant,
val cpuCount: Int,
val cpuCapacity: Double,
- val memCapacity: Double
+ val memCapacity: Double,
)
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
index 52911d5f..8bada02e 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
@@ -31,12 +31,12 @@ import org.apache.parquet.schema.MessageType
import org.apache.parquet.schema.PrimitiveType
import org.apache.parquet.schema.Types
import org.opendc.trace.TableColumn
-import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY
-import org.opendc.trace.conv.RESOURCE_START_TIME
-import org.opendc.trace.conv.RESOURCE_STOP_TIME
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStartTime
+import org.opendc.trace.conv.resourceStopTime
/**
* A [ReadSupport] instance for [Resource] objects.
@@ -45,18 +45,19 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read
/**
* Mapping from field names to [TableColumn]s.
*/
- private val fieldMap = mapOf(
- "id" to RESOURCE_ID,
- "submissionTime" to RESOURCE_START_TIME,
- "start_time" to RESOURCE_START_TIME,
- "endTime" to RESOURCE_STOP_TIME,
- "stop_time" to RESOURCE_STOP_TIME,
- "maxCores" to RESOURCE_CPU_COUNT,
- "cpu_count" to RESOURCE_CPU_COUNT,
- "cpu_capacity" to RESOURCE_CPU_CAPACITY,
- "requiredMemory" to RESOURCE_MEM_CAPACITY,
- "mem_capacity" to RESOURCE_MEM_CAPACITY
- )
+ private val fieldMap =
+ mapOf(
+ "id" to resourceID,
+ "submissionTime" to resourceStartTime,
+ "start_time" to resourceStartTime,
+ "endTime" to resourceStopTime,
+ "stop_time" to resourceStopTime,
+ "maxCores" to resourceCpuCount,
+ "cpu_count" to resourceCpuCount,
+ "cpu_capacity" to resourceCpuCapacity,
+ "requiredMemory" to resourceMemCapacity,
+ "mem_capacity" to resourceMemCapacity,
+ )
override fun init(context: InitContext): ReadContext {
val projectedSchema =
@@ -84,7 +85,7 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read
configuration: Configuration,
keyValueMetaData: Map<String, String>,
fileSchema: MessageType,
- readContext: ReadContext
+ readContext: ReadContext,
): RecordMaterializer<Resource> = ResourceRecordMaterializer(readContext.requestedSchema)
companion object {
@@ -92,64 +93,67 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read
* Parquet read schema (version 2.0) for the "resources" table in the trace.
*/
@JvmStatic
- val READ_SCHEMA_V2_0: MessageType = Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("submissionTime"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("endTime"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("maxCores"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("requiredMemory")
- )
- .named("resource")
+ val READ_SCHEMA_V2_0: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("submissionTime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("endTime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("maxCores"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("requiredMemory"),
+ )
+ .named("resource")
/**
* Parquet read schema (version 2.1) for the "resources" table in the trace.
*/
@JvmStatic
- val READ_SCHEMA_V2_1: MessageType = Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("start_time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("stop_time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_capacity"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("mem_capacity")
- )
- .named("resource")
+ val READ_SCHEMA_V2_1: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("start_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("stop_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ )
+ .named("resource")
/**
* Parquet read schema for the "resources" table in the trace.
*/
@JvmStatic
- val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0
- .union(READ_SCHEMA_V2_1)
+ val READ_SCHEMA: MessageType =
+ READ_SCHEMA_V2_0
+ .union(READ_SCHEMA_V2_1)
}
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt
index 936a684a..6e2afa7a 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt
@@ -37,75 +37,91 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali
/**
* State of current record being read.
*/
- private var _id = ""
- private var _startTime = Instant.MIN
- private var _stopTime = Instant.MIN
- private var _cpuCount = 0
- private var _cpuCapacity = 0.0
- private var _memCapacity = 0.0
+ private var localId = ""
+ private var localStartTime = Instant.MIN
+ private var localStopTime = Instant.MIN
+ private var localCpuCount = 0
+ private var localCpuCapacity = 0.0
+ private var localMemCapacity = 0.0
/**
* Root converter for the record.
*/
- private val root = object : GroupConverter() {
- /**
- * The converters for the columns of the schema.
- */
- private val converters = schema.fields.map { type ->
- when (type.name) {
- "id" -> object : PrimitiveConverter() {
- override fun addBinary(value: Binary) {
- _id = value.toStringUsingUTF8()
- }
- }
- "start_time", "submissionTime" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- _startTime = Instant.ofEpochMilli(value)
- }
- }
- "stop_time", "endTime" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- _stopTime = Instant.ofEpochMilli(value)
- }
- }
- "cpu_count", "maxCores" -> object : PrimitiveConverter() {
- override fun addInt(value: Int) {
- _cpuCount = value
- }
- }
- "cpu_capacity" -> object : PrimitiveConverter() {
- override fun addDouble(value: Double) {
- _cpuCapacity = value
- }
- }
- "mem_capacity", "requiredMemory" -> object : PrimitiveConverter() {
- override fun addDouble(value: Double) {
- _memCapacity = value
- }
+ private val root =
+ object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters =
+ schema.fields.map { type ->
+ when (type.name) {
+ "id" ->
+ object : PrimitiveConverter() {
+ override fun addBinary(value: Binary) {
+ localId = value.toStringUsingUTF8()
+ }
+ }
+ "start_time", "submissionTime" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localStartTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "stop_time", "endTime" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localStopTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "cpu_count", "maxCores" ->
+ object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ localCpuCount = value
+ }
+ }
+ "cpu_capacity" ->
+ object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ localCpuCapacity = value
+ }
+ }
+ "mem_capacity", "requiredMemory" ->
+ object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ localMemCapacity = value
+ }
- override fun addLong(value: Long) {
- _memCapacity = value.toDouble()
+ override fun addLong(value: Long) {
+ localMemCapacity = value.toDouble()
+ }
+ }
+ else -> error("Unknown column $type")
}
}
- else -> error("Unknown column $type")
- }
- }
- override fun start() {
- _id = ""
- _startTime = Instant.MIN
- _stopTime = Instant.MIN
- _cpuCount = 0
- _cpuCapacity = 0.0
- _memCapacity = 0.0
- }
+ override fun start() {
+ localId = ""
+ localStartTime = Instant.MIN
+ localStopTime = Instant.MIN
+ localCpuCount = 0
+ localCpuCapacity = 0.0
+ localMemCapacity = 0.0
+ }
- override fun end() {}
+ override fun end() {}
- override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
- }
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
- override fun getCurrentRecord(): Resource = Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity)
+ override fun getCurrentRecord(): Resource =
+ Resource(
+ localId,
+ localStartTime,
+ localStopTime,
+ localCpuCount,
+ localCpuCapacity,
+ localMemCapacity,
+ )
override fun getRootConverter(): GroupConverter = root
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt
index 9ad58764..483f444c 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt
@@ -30,5 +30,5 @@ internal class ResourceState(
val timestamp: Instant,
val duration: Duration,
val cpuCount: Int,
- val cpuUsage: Double
+ val cpuUsage: Double,
)
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
index 56366cd8..21e206a9 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
@@ -31,11 +31,11 @@ import org.apache.parquet.schema.MessageType
import org.apache.parquet.schema.PrimitiveType
import org.apache.parquet.schema.Types
import org.opendc.trace.TableColumn
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_DURATION
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateDuration
+import org.opendc.trace.conv.resourceStateTimestamp
/**
* A [ReadSupport] instance for [ResourceState] objects.
@@ -44,16 +44,17 @@ internal class ResourceStateReadSupport(private val projection: List<String>?) :
/**
* Mapping from field names to [TableColumn]s.
*/
- private val fieldMap = mapOf(
- "id" to RESOURCE_ID,
- "time" to RESOURCE_STATE_TIMESTAMP,
- "timestamp" to RESOURCE_STATE_TIMESTAMP,
- "duration" to RESOURCE_STATE_DURATION,
- "cores" to RESOURCE_CPU_COUNT,
- "cpu_count" to RESOURCE_CPU_COUNT,
- "cpuUsage" to RESOURCE_STATE_CPU_USAGE,
- "cpu_usage" to RESOURCE_STATE_CPU_USAGE
- )
+ private val fieldMap =
+ mapOf(
+ "id" to resourceID,
+ "time" to resourceStateTimestamp,
+ "timestamp" to resourceStateTimestamp,
+ "duration" to resourceStateDuration,
+ "cores" to resourceCpuCount,
+ "cpu_count" to resourceCpuCount,
+ "cpuUsage" to resourceStateCpuUsage,
+ "cpu_usage" to resourceStateCpuUsage,
+ )
override fun init(context: InitContext): ReadContext {
val projectedSchema =
@@ -81,7 +82,7 @@ internal class ResourceStateReadSupport(private val projection: List<String>?) :
configuration: Configuration,
keyValueMetaData: Map<String, String>,
fileSchema: MessageType,
- readContext: ReadContext
+ readContext: ReadContext,
): RecordMaterializer<ResourceState> = ResourceStateRecordMaterializer(readContext.requestedSchema)
companion object {
@@ -89,53 +90,55 @@ internal class ResourceStateReadSupport(private val projection: List<String>?) :
* Parquet read schema (version 2.0) for the "resource states" table in the trace.
*/
@JvmStatic
- val READ_SCHEMA_V2_0: MessageType = Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("duration"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cores"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpuUsage")
- )
- .named("resource_state")
+ val READ_SCHEMA_V2_0: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cores"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpuUsage"),
+ )
+ .named("resource_state")
/**
* Parquet read schema (version 2.1) for the "resource states" table in the trace.
*/
@JvmStatic
- val READ_SCHEMA_V2_1: MessageType = Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("timestamp"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("duration"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_usage")
- )
- .named("resource_state")
+ val READ_SCHEMA_V2_1: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_usage"),
+ )
+ .named("resource_state")
/**
* Parquet read schema for the "resource states" table in the trace.
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt
index a813a5af..72d24e78 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt
@@ -38,69 +38,77 @@ internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMate
/**
* State of current record being read.
*/
- private var _id = ""
- private var _timestamp = Instant.MIN
- private var _duration = Duration.ZERO
- private var _cpuCount = 0
- private var _cpuUsage = 0.0
+ private var localId = ""
+ private var localTimestamp = Instant.MIN
+ private var localDuration = Duration.ZERO
+ private var localCpuCount = 0
+ private var localCpuUsage = 0.0
/**
* Root converter for the record.
*/
- private val root = object : GroupConverter() {
- /**
- * The converters for the columns of the schema.
- */
- private val converters = schema.fields.map { type ->
- when (type.name) {
- "id" -> object : PrimitiveConverter() {
- override fun addBinary(value: Binary) {
- _id = value.toStringUsingUTF8()
+ private val root =
+ object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters =
+ schema.fields.map { type ->
+ when (type.name) {
+ "id" ->
+ object : PrimitiveConverter() {
+ override fun addBinary(value: Binary) {
+ localId = value.toStringUsingUTF8()
+ }
+ }
+ "timestamp", "time" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localTimestamp = Instant.ofEpochMilli(value)
+ }
+ }
+ "duration" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localDuration = Duration.ofMillis(value)
+ }
+ }
+ "cpu_count", "cores" ->
+ object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ localCpuCount = value
+ }
+ }
+ "cpu_usage", "cpuUsage" ->
+ object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ localCpuUsage = value
+ }
+ }
+ "flops" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ // Ignore to support v1 format
+ }
+ }
+ else -> error("Unknown column $type")
}
}
- "timestamp", "time" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- _timestamp = Instant.ofEpochMilli(value)
- }
- }
- "duration" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- _duration = Duration.ofMillis(value)
- }
- }
- "cpu_count", "cores" -> object : PrimitiveConverter() {
- override fun addInt(value: Int) {
- _cpuCount = value
- }
- }
- "cpu_usage", "cpuUsage" -> object : PrimitiveConverter() {
- override fun addDouble(value: Double) {
- _cpuUsage = value
- }
- }
- "flops" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- // Ignore to support v1 format
- }
- }
- else -> error("Unknown column $type")
- }
- }
- override fun start() {
- _id = ""
- _timestamp = Instant.MIN
- _duration = Duration.ZERO
- _cpuCount = 0
- _cpuUsage = 0.0
- }
+ override fun start() {
+ localId = ""
+ localTimestamp = Instant.MIN
+ localDuration = Duration.ZERO
+ localCpuCount = 0
+ localCpuUsage = 0.0
+ }
- override fun end() {}
+ override fun end() {}
- override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
- }
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
- override fun getCurrentRecord(): ResourceState = ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage)
+ override fun getCurrentRecord(): ResourceState = ResourceState(localId, localTimestamp, localDuration, localCpuCount, localCpuUsage)
override fun getRootConverter(): GroupConverter = root
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt
index 0bbec4d2..2a6d8c12 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt
@@ -52,7 +52,10 @@ internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() {
write(recordConsumer, record)
}
- private fun write(consumer: RecordConsumer, record: ResourceState) {
+ private fun write(
+ consumer: RecordConsumer,
+ record: ResourceState,
+ ) {
consumer.startMessage()
consumer.startField("id", 0)
@@ -83,26 +86,27 @@ internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() {
* Parquet schema for the "resource states" table in the trace.
*/
@JvmStatic
- val WRITE_SCHEMA: MessageType = Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("timestamp"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("duration"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_usage")
- )
- .named("resource_state")
+ val WRITE_SCHEMA: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_usage"),
+ )
+ .named("resource_state")
}
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt
index cd428754..ed62e2ce 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt
@@ -53,7 +53,10 @@ internal class ResourceWriteSupport : WriteSupport<Resource>() {
write(recordConsumer, record)
}
- private fun write(consumer: RecordConsumer, record: Resource) {
+ private fun write(
+ consumer: RecordConsumer,
+ record: Resource,
+ ) {
consumer.startMessage()
consumer.startField("id", 0)
@@ -88,30 +91,31 @@ internal class ResourceWriteSupport : WriteSupport<Resource>() {
* Parquet schema for the "resources" table in the trace.
*/
@JvmStatic
- val WRITE_SCHEMA: MessageType = Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("start_time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("stop_time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_capacity"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("mem_capacity")
- )
- .named("resource")
+ val WRITE_SCHEMA: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("start_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("stop_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ )
+ .named("resource")
}
}
diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
index d3c3b35b..c9fa21c3 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
@@ -40,17 +40,17 @@ import org.opendc.trace.TableWriter
import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
-import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY
-import org.opendc.trace.conv.RESOURCE_START_TIME
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
-import org.opendc.trace.conv.RESOURCE_STOP_TIME
import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS
import org.opendc.trace.conv.TABLE_RESOURCES
import org.opendc.trace.conv.TABLE_RESOURCE_STATES
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStartTime
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateTimestamp
+import org.opendc.trace.conv.resourceStopTime
import org.opendc.trace.testkit.TableReaderTestKit
import org.opendc.trace.testkit.TableWriterTestKit
import java.nio.file.Files
@@ -88,19 +88,19 @@ internal class OdcVmTraceFormatTest {
@ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
fun testResources(name: String) {
val path = Paths.get("src/test/resources/$name")
- val reader = format.newReader(path, TABLE_RESOURCES, listOf(RESOURCE_ID, RESOURCE_START_TIME))
+ val reader = format.newReader(path, TABLE_RESOURCES, listOf(resourceID, resourceStartTime))
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.getString(RESOURCE_ID)) },
- { assertEquals(Instant.ofEpochMilli(1376314846000), reader.getInstant(RESOURCE_START_TIME)) },
+ { assertEquals("1019", reader.getString(resourceID)) },
+ { assertEquals(Instant.ofEpochMilli(1376314846000), reader.getInstant(resourceStartTime)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals("1023", reader.getString(RESOURCE_ID)) },
+ { assertEquals("1023", reader.getString(resourceID)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals("1052", reader.getString(RESOURCE_ID)) },
+ { assertEquals("1052", reader.getString(resourceID)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals("1073", reader.getString(RESOURCE_ID)) },
- { assertFalse(reader.nextRow()) }
+ { assertEquals("1073", reader.getString(resourceID)) },
+ { assertFalse(reader.nextRow()) },
)
reader.close()
@@ -112,12 +112,12 @@ internal class OdcVmTraceFormatTest {
val writer = format.newWriter(path, TABLE_RESOURCES)
writer.startRow()
- writer.setString(RESOURCE_ID, "1019")
- writer.setInstant(RESOURCE_START_TIME, Instant.EPOCH)
- writer.setInstant(RESOURCE_STOP_TIME, Instant.EPOCH)
- writer.setInt(RESOURCE_CPU_COUNT, 1)
- writer.setDouble(RESOURCE_CPU_CAPACITY, 1024.0)
- writer.setDouble(RESOURCE_MEM_CAPACITY, 1024.0)
+ writer.setString(resourceID, "1019")
+ writer.setInstant(resourceStartTime, Instant.EPOCH)
+ writer.setInstant(resourceStopTime, Instant.EPOCH)
+ writer.setInt(resourceCpuCount, 1)
+ writer.setDouble(resourceCpuCapacity, 1024.0)
+ writer.setDouble(resourceMemCapacity, 1024.0)
writer.endRow()
writer.close()
@@ -125,13 +125,13 @@ internal class OdcVmTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.getString(RESOURCE_ID)) },
- { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_START_TIME)) },
- { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_STOP_TIME)) },
- { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
- { assertEquals(1024.0, reader.getDouble(RESOURCE_CPU_CAPACITY)) },
- { assertEquals(1024.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) },
- { assertFalse(reader.nextRow()) }
+ { assertEquals("1019", reader.getString(resourceID)) },
+ { assertEquals(Instant.EPOCH, reader.getInstant(resourceStartTime)) },
+ { assertEquals(Instant.EPOCH, reader.getInstant(resourceStopTime)) },
+ { assertEquals(1, reader.getInt(resourceCpuCount)) },
+ { assertEquals(1024.0, reader.getDouble(resourceCpuCapacity)) },
+ { assertEquals(1024.0, reader.getDouble(resourceMemCapacity)) },
+ { assertFalse(reader.nextRow()) },
)
reader.close()
@@ -141,17 +141,18 @@ internal class OdcVmTraceFormatTest {
@ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
fun testSmoke(name: String) {
val path = Paths.get("src/test/resources/$name")
- val reader = format.newReader(
- path,
- TABLE_RESOURCE_STATES,
- listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_CPU_USAGE)
- )
+ val reader =
+ format.newReader(
+ path,
+ TABLE_RESOURCE_STATES,
+ listOf(resourceID, resourceStateTimestamp, resourceStateCpuUsage),
+ )
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.getString(RESOURCE_ID)) },
- { assertEquals(1376314846, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) },
- { assertEquals(0.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) }
+ { assertEquals("1019", reader.getString(resourceID)) },
+ { assertEquals(1376314846, reader.getInstant(resourceStateTimestamp)?.epochSecond) },
+ { assertEquals(0.0, reader.getDouble(resourceStateCpuUsage), 0.01) },
)
reader.close()
@@ -163,10 +164,10 @@ internal class OdcVmTraceFormatTest {
val writer = format.newWriter(path, TABLE_RESOURCE_STATES)
writer.startRow()
- writer.setString(RESOURCE_ID, "1019")
- writer.setInstant(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH)
- writer.setDouble(RESOURCE_STATE_CPU_USAGE, 23.0)
- writer.setInt(RESOURCE_CPU_COUNT, 1)
+ writer.setString(resourceID, "1019")
+ writer.setInstant(resourceStateTimestamp, Instant.EPOCH)
+ writer.setDouble(resourceStateCpuUsage, 23.0)
+ writer.setInt(resourceCpuCount, 1)
writer.endRow()
writer.close()
@@ -174,11 +175,11 @@ internal class OdcVmTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.getString(RESOURCE_ID)) },
- { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_STATE_TIMESTAMP)) },
- { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
- { assertEquals(23.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE)) },
- { assertFalse(reader.nextRow()) }
+ { assertEquals("1019", reader.getString(resourceID)) },
+ { assertEquals(Instant.EPOCH, reader.getInstant(resourceStateTimestamp)) },
+ { assertEquals(1, reader.getInt(resourceCpuCount)) },
+ { assertEquals(23.0, reader.getDouble(resourceStateCpuUsage)) },
+ { assertFalse(reader.nextRow()) },
)
reader.close()
@@ -187,11 +188,12 @@ internal class OdcVmTraceFormatTest {
@Test
fun testInterferenceGroups() {
val path = Paths.get("src/test/resources/trace-v2.1")
- val reader = format.newReader(
- path,
- TABLE_INTERFERENCE_GROUPS,
- listOf(INTERFERENCE_GROUP_MEMBERS, INTERFERENCE_GROUP_TARGET, INTERFERENCE_GROUP_SCORE)
- )
+ val reader =
+ format.newReader(
+ path,
+ TABLE_INTERFERENCE_GROUPS,
+ listOf(INTERFERENCE_GROUP_MEMBERS, INTERFERENCE_GROUP_TARGET, INTERFERENCE_GROUP_SCORE),
+ )
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -202,7 +204,7 @@ internal class OdcVmTraceFormatTest {
{ assertEquals(setOf("1023", "1052", "1073"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) },
{ assertEquals(0.0, reader.getDouble(INTERFERENCE_GROUP_TARGET)) },
{ assertEquals(0.7133055555552751, reader.getDouble(INTERFERENCE_GROUP_SCORE)) },
- { assertFalse(reader.nextRow()) }
+ { assertFalse(reader.nextRow()) },
)
reader.close()
@@ -247,7 +249,7 @@ internal class OdcVmTraceFormatTest {
{ assertEquals(setOf("a", "b", "d"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) },
{ assertEquals(0.5, reader.getDouble(INTERFERENCE_GROUP_TARGET)) },
{ assertEquals(0.9, reader.getDouble(INTERFERENCE_GROUP_SCORE)) },
- { assertFalse(reader.nextRow()) }
+ { assertFalse(reader.nextRow()) },
)
reader.close()
diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts
index 2217a017..4cdd4350 100644
--- a/opendc-trace/opendc-trace-parquet/build.gradle.kts
+++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts
@@ -22,13 +22,13 @@
description = "Parquet helpers for traces in OpenDC"
-/* Build configuration */
+// Build configuration
plugins {
`kotlin-library-conventions`
}
dependencies {
- /* This configuration is necessary for a slim dependency on Apache Parquet */
+ // This configuration is necessary for a slim dependency on Apache Parquet
api(libs.parquet) {
exclude(group = "org.apache.hadoop")
}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt
index fd2e00cd..a60b426a 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt
@@ -47,61 +47,66 @@ public class LocalInputFile(private val path: Path) : InputFile {
override fun getLength(): Long = channel.size()
- override fun newStream(): SeekableInputStream = object : SeekableInputStream() {
- override fun read(buf: ByteBuffer): Int {
- return channel.read(buf)
- }
+ override fun newStream(): SeekableInputStream =
+ object : SeekableInputStream() {
+ override fun read(buf: ByteBuffer): Int {
+ return channel.read(buf)
+ }
- override fun read(): Int {
- val single = ByteBuffer.allocate(1)
- var read: Int
+ override fun read(): Int {
+ val single = ByteBuffer.allocate(1)
+ var read: Int
- // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte
- do {
- read = channel.read(single)
- } while (read == 0)
+ // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte
+ do {
+ read = channel.read(single)
+ } while (read == 0)
- return if (read == -1) {
- read
- } else {
- single.get(0).toInt() and 0xff
+ return if (read == -1) {
+ read
+ } else {
+ single.get(0).toInt() and 0xff
+ }
}
- }
- override fun getPos(): Long {
- return channel.position()
- }
+ override fun getPos(): Long {
+ return channel.position()
+ }
- override fun seek(newPos: Long) {
- channel.position(newPos)
- }
+ override fun seek(newPos: Long) {
+ channel.position(newPos)
+ }
- override fun readFully(bytes: ByteArray) {
- readFully(ByteBuffer.wrap(bytes))
- }
+ override fun readFully(bytes: ByteArray) {
+ readFully(ByteBuffer.wrap(bytes))
+ }
- override fun readFully(bytes: ByteArray, start: Int, len: Int) {
- readFully(ByteBuffer.wrap(bytes, start, len))
- }
+ override fun readFully(
+ bytes: ByteArray,
+ start: Int,
+ len: Int,
+ ) {
+ readFully(ByteBuffer.wrap(bytes, start, len))
+ }
- override fun readFully(buf: ByteBuffer) {
- var remainder = buf.remaining()
- while (remainder > 0) {
- val read = channel.read(buf)
- remainder -= read
+ override fun readFully(buf: ByteBuffer) {
+ var remainder = buf.remaining()
+ while (remainder > 0) {
+ val read = channel.read(buf)
+ remainder -= read
- if (read == -1 && remainder > 0) {
- throw EOFException()
+ if (read == -1 && remainder > 0) {
+ throw EOFException()
+ }
}
}
- }
- override fun close() {
- channel.close()
- }
+ override fun close() {
+ channel.close()
+ }
- override fun toString(): String = "NioSeekableInputStream"
- }
+ override fun toString(): String = "NioSeekableInputStream"
+ }
override fun toString(): String = "LocalInputFile[path=$path]"
}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt
index 1b17ae5d..24627b45 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt
@@ -51,8 +51,7 @@ public class LocalOutputFile(private val path: Path) : OutputFile {
override fun supportsBlockSize(): Boolean = false
- override fun defaultBlockSize(): Long =
- throw UnsupportedOperationException("Local filesystem does not have default block size")
+ override fun defaultBlockSize(): Long = throw UnsupportedOperationException("Local filesystem does not have default block size")
override fun getPath(): String = path.toString()
@@ -77,7 +76,11 @@ public class LocalOutputFile(private val path: Path) : OutputFile {
_pos += b.size
}
- override fun write(b: ByteArray, off: Int, len: Int) {
+ override fun write(
+ b: ByteArray,
+ off: Int,
+ len: Int,
+ ) {
output.write(b, off, len)
_pos += len
}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
index de8a56d0..b503254e 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
@@ -43,20 +43,21 @@ import kotlin.io.path.isDirectory
public class LocalParquetReader<out T>(
path: Path,
private val readSupport: ReadSupport<T>,
- private val strictTyping: Boolean = true
+ private val strictTyping: Boolean = true,
) : AutoCloseable {
/**
* The input files to process.
*/
- private val filesIterator = if (path.isDirectory()) {
- Files.list(path)
- .filter { !it.isDirectory() }
- .sorted()
- .map { LocalInputFile(it) }
- .iterator()
- } else {
- listOf(LocalInputFile(path)).iterator()
- }
+ private val filesIterator =
+ if (path.isDirectory()) {
+ Files.list(path)
+ .filter { !it.isDirectory() }
+ .sorted()
+ .map { LocalInputFile(it) }
+ .iterator()
+ } else {
+ listOf(LocalInputFile(path)).iterator()
+ }
/**
* The Parquet reader to use.
@@ -104,11 +105,12 @@ public class LocalParquetReader<out T>(
reader?.close()
try {
- this.reader = if (filesIterator.hasNext()) {
- createReader(filesIterator.next())
- } else {
- null
- }
+ this.reader =
+ if (filesIterator.hasNext()) {
+ createReader(filesIterator.next())
+ } else {
+ null
+ }
} catch (e: Throwable) {
this.reader = null
throw e
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt
index b5eb1deb..c7028fc3 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt
@@ -37,7 +37,7 @@ public class LocalParquetWriter {
*/
public class Builder<T> internal constructor(
output: OutputFile,
- private val writeSupport: WriteSupport<T>
+ private val writeSupport: WriteSupport<T>,
) : ParquetWriter.Builder<T, Builder<T>>(output) {
override fun self(): Builder<T> = this
@@ -49,7 +49,9 @@ public class LocalParquetWriter {
* Create a [Builder] instance that writes a Parquet file at the specified [path].
*/
@JvmStatic
- public fun <T> builder(path: Path, writeSupport: WriteSupport<T>): Builder<T> =
- Builder(LocalOutputFile(path), writeSupport)
+ public fun <T> builder(
+ path: Path,
+ writeSupport: WriteSupport<T>,
+ ): Builder<T> = Builder(LocalOutputFile(path), writeSupport)
}
}
diff --git a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
index b6c5a423..fc90aded 100644
--- a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
+++ b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
@@ -51,49 +51,52 @@ import java.nio.file.Path
internal class ParquetTest {
private lateinit var path: Path
- private val schema = Types.buildMessage()
- .addField(
- Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED)
- .named("field")
- )
- .named("test")
- private val writeSupport = object : WriteSupport<Int>() {
- lateinit var recordConsumer: RecordConsumer
-
- override fun init(configuration: Configuration): WriteContext {
- return WriteContext(schema, emptyMap())
- }
+ private val schema =
+ Types.buildMessage()
+ .addField(
+ Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED)
+ .named("field"),
+ )
+ .named("test")
+ private val writeSupport =
+ object : WriteSupport<Int>() {
+ lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(schema, emptyMap())
+ }
- override fun prepareForWrite(recordConsumer: RecordConsumer) {
- this.recordConsumer = recordConsumer
- }
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
- override fun write(record: Int) {
- val consumer = recordConsumer
+ override fun write(record: Int) {
+ val consumer = recordConsumer
- consumer.startMessage()
- consumer.startField("field", 0)
- consumer.addInteger(record)
- consumer.endField("field", 0)
- consumer.endMessage()
+ consumer.startMessage()
+ consumer.startField("field", 0)
+ consumer.addInteger(record)
+ consumer.endField("field", 0)
+ consumer.endMessage()
+ }
}
- }
- private val readSupport = object : ReadSupport<Int>() {
- @Suppress("OVERRIDE_DEPRECATION")
- override fun init(
- configuration: Configuration,
- keyValueMetaData: Map<String, String>,
- fileSchema: MessageType
- ): ReadContext = ReadContext(fileSchema)
-
- override fun prepareForRead(
- configuration: Configuration,
- keyValueMetaData: Map<String, String>,
- fileSchema: MessageType,
- readContext: ReadContext
- ): RecordMaterializer<Int> = TestRecordMaterializer()
- }
+ private val readSupport =
+ object : ReadSupport<Int>() {
+ @Suppress("OVERRIDE_DEPRECATION")
+ override fun init(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ ): ReadContext = ReadContext(fileSchema)
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext,
+ ): RecordMaterializer<Int> = TestRecordMaterializer()
+ }
/**
* Set up the test
@@ -117,9 +120,10 @@ internal class ParquetTest {
@Test
fun testSmoke() {
val n = 4
- val writer = LocalParquetWriter.builder(path, writeSupport)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- .build()
+ val writer =
+ LocalParquetWriter.builder(path, writeSupport)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .build()
try {
repeat(n) { i ->
@@ -166,19 +170,23 @@ internal class ParquetTest {
private class TestRecordMaterializer : RecordMaterializer<Int>() {
private var current: Int = 0
- private val fieldConverter = object : PrimitiveConverter() {
- override fun addInt(value: Int) {
- current = value
+ private val fieldConverter =
+ object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ current = value
+ }
}
- }
- private val root = object : GroupConverter() {
- override fun getConverter(fieldIndex: Int): Converter {
- require(fieldIndex == 0)
- return fieldConverter
+ private val root =
+ object : GroupConverter() {
+ override fun getConverter(fieldIndex: Int): Converter {
+ require(fieldIndex == 0)
+ return fieldConverter
+ }
+
+ override fun start() {}
+
+ override fun end() {}
}
- override fun start() {}
- override fun end() {}
- }
override fun getCurrentRecord(): Int = current
diff --git a/opendc-trace/opendc-trace-swf/build.gradle.kts b/opendc-trace/opendc-trace-swf/build.gradle.kts
index d3bc5aa6..2798cdb1 100644
--- a/opendc-trace/opendc-trace-swf/build.gradle.kts
+++ b/opendc-trace/opendc-trace-swf/build.gradle.kts
@@ -22,7 +22,7 @@
description = "Support for Standard Workload Format (SWF) traces in OpenDC"
-/* Build configuration */
+// Build configuration
plugins {
`kotlin-library-conventions`
}
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
index 2465fb47..5a79fd6f 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt
@@ -99,22 +99,22 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
override fun resolve(name: String): Int {
return when (name) {
- TASK_ID -> COL_JOB_ID
- TASK_SUBMIT_TIME -> COL_SUBMIT_TIME
- TASK_WAIT_TIME -> COL_WAIT_TIME
- TASK_RUNTIME -> COL_RUN_TIME
- TASK_ALLOC_NCPUS -> COL_ALLOC_NCPUS
- TASK_REQ_NCPUS -> COL_REQ_NCPUS
- TASK_STATUS -> COL_STATUS
- TASK_USER_ID -> COL_USER_ID
- TASK_GROUP_ID -> COL_GROUP_ID
- TASK_PARENTS -> COL_PARENT_JOB
+ TASK_ID -> colJobID
+ TASK_SUBMIT_TIME -> colSubmitTime
+ TASK_WAIT_TIME -> colWaitTime
+ TASK_RUNTIME -> colRunTime
+ TASK_ALLOC_NCPUS -> colAllocNcpus
+ TASK_REQ_NCPUS -> colReqNcpus
+ TASK_STATUS -> colStatus
+ TASK_USER_ID -> colUserID
+ TASK_GROUP_ID -> colGroupID
+ TASK_PARENTS -> colParentJob
else -> -1
}
}
override fun isNull(index: Int): Boolean {
- require(index in COL_JOB_ID..COL_PARENT_THINK_TIME) { "Invalid column index" }
+ require(index in colJobID..colParentThinkTime) { "Invalid column index" }
return false
}
@@ -125,7 +125,7 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
override fun getInt(index: Int): Int {
check(state == State.Active) { "No active row" }
return when (index) {
- COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> fields[index].toInt(10)
+ colReqNcpus, colAllocNcpus, colStatus, colGroupID, colUserID -> fields[index].toInt(10)
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -145,7 +145,7 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
override fun getString(index: Int): String {
check(state == State.Active) { "No active row" }
return when (index) {
- COL_JOB_ID -> fields[index]
+ colJobID -> fields[index]
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -157,7 +157,7 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
override fun getInstant(index: Int): Instant? {
check(state == State.Active) { "No active row" }
return when (index) {
- COL_SUBMIT_TIME -> Instant.ofEpochSecond(fields[index].toLong(10))
+ colSubmitTime -> Instant.ofEpochSecond(fields[index].toLong(10))
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -165,20 +165,26 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
override fun getDuration(index: Int): Duration? {
check(state == State.Active) { "No active row" }
return when (index) {
- COL_WAIT_TIME, COL_RUN_TIME -> Duration.ofSeconds(fields[index].toLong(10))
+ colWaitTime, colRunTime -> Duration.ofSeconds(fields[index].toLong(10))
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
check(state == State.Active) { "No active row" }
@Suppress("UNCHECKED_CAST")
return when (index) {
- COL_PARENT_JOB -> {
+ colParentJob -> {
require(elementType.isAssignableFrom(String::class.java))
val parent = fields[index].toLong(10)
if (parent < 0) emptySet() else setOf(parent)
@@ -187,7 +193,11 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
} as Set<T>?
}
- override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
throw IllegalArgumentException("Invalid column")
}
@@ -199,26 +209,28 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea
/**
* Default column indices for the SWF format.
*/
- private val COL_JOB_ID = 0
- private val COL_SUBMIT_TIME = 1
- private val COL_WAIT_TIME = 2
- private val COL_RUN_TIME = 3
- private val COL_ALLOC_NCPUS = 4
- private val COL_AVG_CPU_TIME = 5
- private val COL_USED_MEM = 6
- private val COL_REQ_NCPUS = 7
- private val COL_REQ_TIME = 8
- private val COL_REQ_MEM = 9
- private val COL_STATUS = 10
- private val COL_USER_ID = 11
- private val COL_GROUP_ID = 12
- private val COL_EXEC_NUM = 13
- private val COL_QUEUE_NUM = 14
- private val COL_PART_NUM = 15
- private val COL_PARENT_JOB = 16
- private val COL_PARENT_THINK_TIME = 17
+ private val colJobID = 0
+ private val colSubmitTime = 1
+ private val colWaitTime = 2
+ private val colRunTime = 3
+ private val colAllocNcpus = 4
+ private val colAvgCpuTime = 5
+ private val colUsedMem = 6
+ private val colReqNcpus = 7
+ private val colReqTime = 8
+ private val colReqMem = 9
+ private val colStatus = 10
+ private val colUserID = 11
+ private val colGroupID = 12
+ private val colExecNum = 13
+ private val colQueueNum = 14
+ private val colPartNum = 15
+ private val colParentJob = 16
+ private val colParentThinkTime = 17
private enum class State {
- Pending, Active, Closed
+ Pending,
+ Active,
+ Closed,
}
}
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
index c51805d7..d59b07b4 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
@@ -56,34 +56,45 @@ public class SwfTraceFormat : TraceFormat {
override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
- override fun getDetails(path: Path, table: String): TableDetails {
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
return when (table) {
- TABLE_TASKS -> TableDetails(
- listOf(
- TableColumn(TASK_ID, TableColumnType.String),
- TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
- TableColumn(TASK_WAIT_TIME, TableColumnType.Duration),
- TableColumn(TASK_RUNTIME, TableColumnType.Duration),
- TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
- TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int),
- TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
- TableColumn(TASK_STATUS, TableColumnType.Int),
- TableColumn(TASK_GROUP_ID, TableColumnType.Int),
- TableColumn(TASK_USER_ID, TableColumnType.Int)
+ TABLE_TASKS ->
+ TableDetails(
+ listOf(
+ TableColumn(TASK_ID, TableColumnType.String),
+ TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
+ TableColumn(TASK_WAIT_TIME, TableColumnType.Duration),
+ TableColumn(TASK_RUNTIME, TableColumnType.Duration),
+ TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_STATUS, TableColumnType.Int),
+ TableColumn(TASK_GROUP_ID, TableColumnType.Int),
+ TableColumn(TASK_USER_ID, TableColumnType.Int),
+ ),
)
- )
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
return when (table) {
TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader())
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newWriter(path: Path, table: String): TableWriter {
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
throw UnsupportedOperationException("Writing not supported for this format")
}
}
diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
index 71d6dee3..436f2572 100644
--- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
@@ -77,7 +77,7 @@ internal class SwfTraceFormatTest {
{ assertEquals(306, reader.getInt(TASK_ALLOC_NCPUS)) },
{ assertTrue(reader.nextRow()) },
{ assertEquals("2", reader.getString(TASK_ID)) },
- { assertEquals(17, reader.getInt(TASK_ALLOC_NCPUS)) }
+ { assertEquals(17, reader.getInt(TASK_ALLOC_NCPUS)) },
)
reader.close()
diff --git a/opendc-trace/opendc-trace-testkit/build.gradle.kts b/opendc-trace/opendc-trace-testkit/build.gradle.kts
index f6b7222c..e75ffc8c 100644
--- a/opendc-trace/opendc-trace-testkit/build.gradle.kts
+++ b/opendc-trace/opendc-trace-testkit/build.gradle.kts
@@ -22,7 +22,7 @@
description = "Reusable test suite for implementors"
-/* Build configuration */
+// Build configuration
plugins {
`kotlin-library-conventions`
}
diff --git a/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableReaderTestKit.kt b/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableReaderTestKit.kt
index 4624cba0..e5808f81 100644
--- a/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableReaderTestKit.kt
+++ b/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableReaderTestKit.kt
@@ -89,7 +89,7 @@ public abstract class TableReaderTestKit {
{ assertThrows<IllegalArgumentException> { reader.getDuration(-1) } },
{ assertThrows<IllegalArgumentException> { reader.getList(-1, Any::class.java) } },
{ assertThrows<IllegalArgumentException> { reader.getSet(-1, Any::class.java) } },
- { assertThrows<IllegalArgumentException> { reader.getMap(-1, Any::class.java, Any::class.java) } }
+ { assertThrows<IllegalArgumentException> { reader.getMap(-1, Any::class.java, Any::class.java) } },
)
}
@@ -111,13 +111,25 @@ public abstract class TableReaderTestKit {
is TableColumnType.String -> assertFalse(reader.isNull(column.name) && reader.getString(column.name) != null)
is TableColumnType.UUID -> assertFalse(reader.isNull(column.name) && reader.getUUID(column.name) != null)
is TableColumnType.Instant -> assertFalse(reader.isNull(column.name) && reader.getInstant(column.name) != null)
- is TableColumnType.Duration -> assertFalse(reader.isNull(column.name) && reader.getDuration(column.name) != null)
- is TableColumnType.List -> assertFalse(reader.isNull(column.name) && reader.getList(column.name, Any::class.java) != null)
- is TableColumnType.Set -> assertFalse(reader.isNull(column.name) && reader.getSet(column.name, Any::class.java) != null)
- is TableColumnType.Map -> assertFalse(reader.isNull(column.name) && reader.getMap(column.name, Any::class.java, Any::class.java) != null)
+ is TableColumnType.Duration ->
+ assertFalse(
+ reader.isNull(column.name) && reader.getDuration(column.name) != null,
+ )
+ is TableColumnType.List ->
+ assertFalse(
+ reader.isNull(column.name) && reader.getList(column.name, Any::class.java) != null,
+ )
+ is TableColumnType.Set ->
+ assertFalse(
+ reader.isNull(column.name) && reader.getSet(column.name, Any::class.java) != null,
+ )
+ is TableColumnType.Map ->
+ assertFalse(
+ reader.isNull(column.name) && reader.getMap(column.name, Any::class.java, Any::class.java) != null,
+ )
}
}
- }
+ },
)
}
}
diff --git a/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableWriterTestKit.kt b/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableWriterTestKit.kt
index 3cd05f50..2b4adf19 100644
--- a/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableWriterTestKit.kt
+++ b/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableWriterTestKit.kt
@@ -88,7 +88,7 @@ public abstract class TableWriterTestKit {
{ assertThrows<IllegalArgumentException> { writer.setDuration(-1, Duration.ofMinutes(5)) } },
{ assertThrows<IllegalArgumentException> { writer.setList(-1, listOf("test")) } },
{ assertThrows<IllegalArgumentException> { writer.setSet(-1, setOf("test")) } },
- { assertThrows<IllegalArgumentException> { writer.setMap(-1, mapOf("test" to "test")) } }
+ { assertThrows<IllegalArgumentException> { writer.setMap(-1, mapOf("test" to "test")) } },
)
}
@@ -117,7 +117,7 @@ public abstract class TableWriterTestKit {
}
}
}
- }
+ },
)
}
diff --git a/opendc-trace/opendc-trace-tools/build.gradle.kts b/opendc-trace/opendc-trace-tools/build.gradle.kts
index db11059b..002ab8cc 100644
--- a/opendc-trace/opendc-trace-tools/build.gradle.kts
+++ b/opendc-trace/opendc-trace-tools/build.gradle.kts
@@ -22,7 +22,7 @@
description = "Tools for working with workload traces"
-/* Build configuration */
+// Build configuration
plugins {
`kotlin-conventions`
application
diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt
index bf0e2e3b..17ff0c90 100644
--- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt
+++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt
@@ -39,24 +39,23 @@ import com.github.ajalt.clikt.parameters.types.restrictTo
import mu.KotlinLogging
import org.opendc.trace.TableWriter
import org.opendc.trace.Trace
-import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY
-import org.opendc.trace.conv.RESOURCE_START_TIME
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT
-import org.opendc.trace.conv.RESOURCE_STATE_DURATION
-import org.opendc.trace.conv.RESOURCE_STATE_MEM_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
-import org.opendc.trace.conv.RESOURCE_STOP_TIME
import org.opendc.trace.conv.TABLE_RESOURCES
import org.opendc.trace.conv.TABLE_RESOURCE_STATES
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStartTime
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateCpuUsagePct
+import org.opendc.trace.conv.resourceStateDuration
+import org.opendc.trace.conv.resourceStateMemUsage
+import org.opendc.trace.conv.resourceStateTimestamp
+import org.opendc.trace.conv.resourceStopTime
import java.io.File
import java.time.Duration
import java.time.Instant
import java.util.Random
-import kotlin.collections.HashMap
import kotlin.math.abs
import kotlin.math.max
import kotlin.math.min
@@ -105,7 +104,7 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
*/
private val converter by option("-c", "--converter", help = "converter strategy to use").groupChoice(
"default" to DefaultTraceConverter(),
- "azure" to AzureTraceConverter()
+ "azure" to AzureTraceConverter(),
).defaultByName("default")
override fun run() {
@@ -174,7 +173,11 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
* @param samplingOptions The sampling options to use.
* @return The map of resources that have been selected.
*/
- abstract fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map<String, Resource>
+ abstract fun convertResources(
+ trace: Trace,
+ writer: TableWriter,
+ samplingOptions: SamplingOptions?,
+ ): Map<String, Resource>
/**
* Convert the resource states table for the trace.
@@ -184,7 +187,11 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
* @param selected The set of virtual machines that have been selected.
* @return The number of rows written.
*/
- abstract fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map<String, Resource>): Int
+ abstract fun convertResourceStates(
+ trace: Trace,
+ writer: TableWriter,
+ selected: Map<String, Resource>,
+ ): Int
/**
* A resource in the resource table.
@@ -195,7 +202,7 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
val stopTime: Instant,
val cpuCount: Int,
val cpuCapacity: Double,
- val memCapacity: Double
+ val memCapacity: Double,
)
}
@@ -211,14 +218,18 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
/**
* The interval at which the samples where taken.
*/
- private val SAMPLE_INTERVAL = Duration.ofMinutes(5)
+ private val sampleInterval = Duration.ofMinutes(5)
/**
* The difference in CPU usage for the algorithm to cascade samples.
*/
- private val SAMPLE_CASCADE_DIFF = 0.1
+ private val sampleCascadeDiff = 0.1
- override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map<String, Resource> {
+ override fun convertResources(
+ trace: Trace,
+ writer: TableWriter,
+ samplingOptions: SamplingOptions?,
+ ): Map<String, Resource> {
val random = samplingOptions?.let { Random(it.seed) }
val samplingFraction = samplingOptions?.fraction ?: 1.0
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
@@ -226,12 +237,12 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
var hasNextRow = reader.nextRow()
val selectedVms = mutableMapOf<String, Resource>()
- val idCol = reader.resolve(RESOURCE_ID)
- val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP)
- val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT)
- val cpuCapacityCol = reader.resolve(RESOURCE_CPU_CAPACITY)
- val memCapacityCol = reader.resolve(RESOURCE_MEM_CAPACITY)
- val memUsageCol = reader.resolve(RESOURCE_STATE_MEM_USAGE)
+ val idCol = reader.resolve(resourceID)
+ val timestampCol = reader.resolve(resourceStateTimestamp)
+ val cpuCountCol = reader.resolve(resourceCpuCount)
+ val cpuCapacityCol = reader.resolve(resourceCpuCapacity)
+ val memCapacityCol = reader.resolve(resourceMemCapacity)
+ val memUsageCol = reader.resolve(resourceStateMemUsage)
while (hasNextRow) {
var id: String
@@ -257,7 +268,7 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
}
hasNextRow = reader.nextRow()
- } while (hasNextRow && id == reader.getString(RESOURCE_ID))
+ } while (hasNextRow && id == reader.getString(resourceID))
// Sample only a fraction of the VMs
if (random != null && random.nextDouble() > samplingFraction) {
@@ -266,7 +277,7 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
logger.info { "Selecting VM $id" }
- val startInstant = Instant.ofEpochMilli(startTime) - SAMPLE_INTERVAL // Offset by sample interval
+ val startInstant = Instant.ofEpochMilli(startTime) - sampleInterval // Offset by sample interval
val stopInstant = Instant.ofEpochMilli(stopTime)
selectedVms.computeIfAbsent(id) {
@@ -274,26 +285,30 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
}
writer.startRow()
- writer.setString(RESOURCE_ID, id)
- writer.setInstant(RESOURCE_START_TIME, startInstant)
- writer.setInstant(RESOURCE_STOP_TIME, stopInstant)
- writer.setInt(RESOURCE_CPU_COUNT, cpuCount)
- writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity)
- writer.setDouble(RESOURCE_MEM_CAPACITY, max(memCapacity, memUsage))
+ writer.setString(resourceID, id)
+ writer.setInstant(resourceStartTime, startInstant)
+ writer.setInstant(resourceStopTime, stopInstant)
+ writer.setInt(resourceCpuCount, cpuCount)
+ writer.setDouble(resourceCpuCapacity, cpuCapacity)
+ writer.setDouble(resourceMemCapacity, max(memCapacity, memUsage))
writer.endRow()
}
return selectedVms
}
- override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map<String, Resource>): Int {
+ override fun convertResourceStates(
+ trace: Trace,
+ writer: TableWriter,
+ selected: Map<String, Resource>,
+ ): Int {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
- val sampleInterval = SAMPLE_INTERVAL.toMillis()
+ val sampleInterval = sampleInterval.toMillis()
- val idCol = reader.resolve(RESOURCE_ID)
- val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP)
- val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT)
- val cpuUsageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE)
+ val idCol = reader.resolve(resourceID)
+ val timestampCol = reader.resolve(resourceStateTimestamp)
+ val cpuCountCol = reader.resolve(resourceCpuCount)
+ val cpuUsageCol = reader.resolve(resourceStateCpuUsage)
var hasNextRow = reader.nextRow()
var count = 0
@@ -315,9 +330,10 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
// Attempt to cascade further samples into one if they share the same CPU usage
while (reader.nextRow().also { hasNextRow = it }) {
- val shouldCascade = id == reader.getString(idCol) &&
- abs(cpuUsage - reader.getDouble(cpuUsageCol)) < SAMPLE_CASCADE_DIFF &&
- cpuCount == reader.getInt(cpuCountCol)
+ val shouldCascade =
+ id == reader.getString(idCol) &&
+ abs(cpuUsage - reader.getDouble(cpuUsageCol)) < sampleCascadeDiff &&
+ cpuCount == reader.getInt(cpuCountCol)
// Check whether the next sample can be cascaded with the current sample:
// (1) The VM identifier of both samples matches
@@ -339,11 +355,11 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
}
writer.startRow()
- writer.setString(RESOURCE_ID, id)
- writer.setInstant(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(timestamp))
- writer.setDuration(RESOURCE_STATE_DURATION, Duration.ofMillis(duration))
- writer.setInt(RESOURCE_CPU_COUNT, cpuCount)
- writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage)
+ writer.setString(resourceID, id)
+ writer.setInstant(resourceStateTimestamp, Instant.ofEpochMilli(timestamp))
+ writer.setDuration(resourceStateDuration, Duration.ofMillis(duration))
+ writer.setInt(resourceCpuCount, cpuCount)
+ writer.setDouble(resourceStateCpuUsage, cpuUsage)
writer.endRow()
count++
@@ -365,28 +381,32 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
/**
* CPU capacity of the machines used by Azure.
*/
- private val CPU_CAPACITY = 2500.0
+ private val cpuCapacity = 2500.0
/**
* The interval at which the samples where taken.
*/
- private val SAMPLE_INTERVAL = Duration.ofMinutes(5)
+ private val sampleInterval = Duration.ofMinutes(5)
/**
* The difference in CPU usage for the algorithm to cascade samples.
*/
- private val SAMPLE_CASCADE_DIFF = 0.1
+ private val sampleCascadeDiff = 0.1
- override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map<String, Resource> {
+ override fun convertResources(
+ trace: Trace,
+ writer: TableWriter,
+ samplingOptions: SamplingOptions?,
+ ): Map<String, Resource> {
val random = samplingOptions?.let { Random(it.seed) }
val samplingFraction = samplingOptions?.fraction ?: 1.0
val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader()
- val idCol = reader.resolve(RESOURCE_ID)
- val startTimeCol = reader.resolve(RESOURCE_START_TIME)
- val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME)
- val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT)
- val memCapacityCol = reader.resolve(RESOURCE_MEM_CAPACITY)
+ val idCol = reader.resolve(resourceID)
+ val startTimeCol = reader.resolve(resourceStartTime)
+ val stopTimeCol = reader.resolve(resourceStopTime)
+ val cpuCountCol = reader.resolve(resourceCpuCount)
+ val memCapacityCol = reader.resolve(resourceMemCapacity)
val selectedVms = mutableMapOf<String, Resource>()
@@ -406,33 +426,37 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
val startInstant = Instant.ofEpochMilli(startTime)
val stopInstant = Instant.ofEpochMilli(stopTime)
- val cpuCapacity = cpuCount * CPU_CAPACITY
+ val cpuCapacity = cpuCount * cpuCapacity
selectedVms.computeIfAbsent(id) {
Resource(it, startInstant, stopInstant, cpuCount, cpuCapacity, memCapacity)
}
writer.startRow()
- writer.setString(RESOURCE_ID, id)
- writer.setInstant(RESOURCE_START_TIME, startInstant)
- writer.setInstant(RESOURCE_STOP_TIME, stopInstant)
- writer.setInt(RESOURCE_CPU_COUNT, cpuCount)
- writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity)
- writer.setDouble(RESOURCE_MEM_CAPACITY, memCapacity)
+ writer.setString(resourceID, id)
+ writer.setInstant(resourceStartTime, startInstant)
+ writer.setInstant(resourceStopTime, stopInstant)
+ writer.setInt(resourceCpuCount, cpuCount)
+ writer.setDouble(resourceCpuCapacity, cpuCapacity)
+ writer.setDouble(resourceMemCapacity, memCapacity)
writer.endRow()
}
return selectedVms
}
- override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map<String, Resource>): Int {
+ override fun convertResourceStates(
+ trace: Trace,
+ writer: TableWriter,
+ selected: Map<String, Resource>,
+ ): Int {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
val states = HashMap<String, State>()
- val sampleInterval = SAMPLE_INTERVAL.toMillis()
+ val sampleInterval = sampleInterval.toMillis()
- val idCol = reader.resolve(RESOURCE_ID)
- val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP)
- val cpuUsageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE_PCT)
+ val idCol = reader.resolve(resourceID)
+ val timestampCol = reader.resolve(resourceStateTimestamp)
+ val cpuUsageCol = reader.resolve(resourceStateCpuUsagePct)
var count = 0
@@ -448,7 +472,7 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
// Check whether the next sample can be cascaded with the current sample:
// (1) The CPU usage is almost identical (lower than `SAMPLE_CASCADE_DIFF`)
// (2) The interval between both samples is not higher than `SAMPLE_INTERVAL`
- if (abs(cpuUsage - state.cpuUsage) <= SAMPLE_CASCADE_DIFF && delta <= sampleInterval) {
+ if (abs(cpuUsage - state.cpuUsage) <= sampleCascadeDiff && delta <= sampleInterval) {
state.time = timestamp
state.duration += delta
continue
@@ -470,7 +494,11 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
return count
}
- private class State(@JvmField val resource: Resource, @JvmField var cpuUsage: Double, @JvmField var duration: Long) {
+ private class State(
+ @JvmField val resource: Resource,
+ @JvmField var cpuUsage: Double,
+ @JvmField var duration: Long,
+ ) {
@JvmField var time: Long = resource.startTime.toEpochMilli()
private var lastWrite: Long = Long.MIN_VALUE
@@ -482,11 +510,11 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b
lastWrite = time
writer.startRow()
- writer.setString(RESOURCE_ID, resource.id)
- writer.setInstant(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(time))
- writer.setDuration(RESOURCE_STATE_DURATION, Duration.ofMillis(duration))
- writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage)
- writer.setInt(RESOURCE_CPU_COUNT, resource.cpuCount)
+ writer.setString(resourceID, resource.id)
+ writer.setInstant(resourceStateTimestamp, Instant.ofEpochMilli(time))
+ writer.setDuration(resourceStateDuration, Duration.ofMillis(duration))
+ writer.setDouble(resourceStateCpuUsage, cpuUsage)
+ writer.setInt(resourceCpuCount, resource.cpuCount)
writer.endRow()
}
}
diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt
index 98b4cdf5..7b7a2a64 100644
--- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt
+++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt
@@ -67,11 +67,12 @@ internal class QueryCommand : CliktCommand(name = "query", help = "Query workloa
/**
* Access to the terminal.
*/
- private val terminal = TerminalBuilder.builder()
- .system(false)
- .streams(System.`in`, System.out)
- .encoding(StandardCharsets.UTF_8)
- .build()
+ private val terminal =
+ TerminalBuilder.builder()
+ .system(false)
+ .streams(System.`in`, System.out)
+ .encoding(StandardCharsets.UTF_8)
+ .build()
/**
* Helper class to print results to console.
@@ -119,10 +120,11 @@ internal class QueryCommand : CliktCommand(name = "query", help = "Query workloa
var count = 0
val meta: ResultSetMetaData = rs.metaData
- val options = mapOf(
- Printer.COLUMNS to List(meta.columnCount) { meta.getColumnName(it + 1) },
- Printer.BORDER to "|"
- )
+ val options =
+ mapOf(
+ Printer.COLUMNS to List(meta.columnCount) { meta.getColumnName(it + 1) },
+ Printer.BORDER to "|",
+ )
val data = mutableListOf<Map<String, Any>>()
while (rs.next()) {
@@ -146,7 +148,10 @@ internal class QueryCommand : CliktCommand(name = "query", help = "Query workloa
private class QueryPrinter(private val terminal: Terminal) : DefaultPrinter(null) {
override fun terminal(): Terminal = terminal
- override fun highlightAndPrint(options: MutableMap<String, Any>, exception: Throwable) {
+ override fun highlightAndPrint(
+ options: MutableMap<String, Any>,
+ exception: Throwable,
+ ) {
if (options.getOrDefault("exception", "stack") == "stack") {
exception.printStackTrace()
} else {
diff --git a/opendc-trace/opendc-trace-wfformat/build.gradle.kts b/opendc-trace/opendc-trace-wfformat/build.gradle.kts
index a0e22b16..57313a73 100644
--- a/opendc-trace/opendc-trace-wfformat/build.gradle.kts
+++ b/opendc-trace/opendc-trace-wfformat/build.gradle.kts
@@ -22,7 +22,7 @@
description = "Support for WfCommons workload traces in OpenDC"
-/* Build configuration */
+// Build configuration
plugins {
`kotlin-library-conventions`
}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
index e0cbd305..8f84e51f 100644
--- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt
@@ -81,13 +81,14 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe
}
ParserLevel.WORKFLOW -> {
// Seek for the jobs object in the file
- level = if (!seekJobs()) {
- ParserLevel.TRACE
- } else if (!parser.isExpectedStartArrayToken) {
- throw JsonParseException(parser, "Expected array", parser.currentLocation)
- } else {
- ParserLevel.JOB
- }
+ level =
+ if (!seekJobs()) {
+ ParserLevel.TRACE
+ } else if (!parser.isExpectedStartArrayToken) {
+ throw JsonParseException(parser, "Expected array", parser.currentLocation)
+ } else {
+ ParserLevel.JOB
+ }
}
ParserLevel.JOB -> {
when (parser.nextToken()) {
@@ -108,18 +109,18 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe
override fun resolve(name: String): Int {
return when (name) {
- TASK_ID -> COL_ID
- TASK_WORKFLOW_ID -> COL_WORKFLOW_ID
- TASK_RUNTIME -> COL_RUNTIME
- TASK_REQ_NCPUS -> COL_NPROC
- TASK_PARENTS -> COL_PARENTS
- TASK_CHILDREN -> COL_CHILDREN
+ TASK_ID -> colID
+ TASK_WORKFLOW_ID -> colWorkflowID
+ TASK_RUNTIME -> colRuntime
+ TASK_REQ_NCPUS -> colNproc
+ TASK_PARENTS -> colParents
+ TASK_CHILDREN -> colChildren
else -> -1
}
}
override fun isNull(index: Int): Boolean {
- require(index in 0..COL_CHILDREN) { "Invalid column value" }
+ require(index in 0..colChildren) { "Invalid column value" }
return false
}
@@ -130,7 +131,7 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe
override fun getInt(index: Int): Int {
checkActive()
return when (index) {
- COL_NPROC -> cores
+ colNproc -> cores
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -150,8 +151,8 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe
override fun getString(index: Int): String? {
checkActive()
return when (index) {
- COL_ID -> id
- COL_WORKFLOW_ID -> workflowId
+ colID -> id
+ colWorkflowID -> workflowId
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -167,25 +168,35 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe
override fun getDuration(index: Int): Duration? {
checkActive()
return when (index) {
- COL_RUNTIME -> runtime
+ colRuntime -> runtime
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
checkActive()
return when (index) {
- COL_PARENTS -> TYPE_PARENTS.convertTo(parents, elementType)
- COL_CHILDREN -> TYPE_CHILDREN.convertTo(children, elementType)
+ colParents -> typeParents.convertTo(parents, elementType)
+ colChildren -> typeChildren.convertTo(children, elementType)
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
throw IllegalArgumentException("Invalid column")
}
@@ -267,7 +278,10 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe
}
private enum class ParserLevel {
- TOP, TRACE, WORKFLOW, JOB
+ TOP,
+ TRACE,
+ WORKFLOW,
+ JOB,
}
/**
@@ -288,13 +302,13 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe
cores = -1
}
- private val COL_ID = 0
- private val COL_WORKFLOW_ID = 1
- private val COL_RUNTIME = 3
- private val COL_NPROC = 4
- private val COL_PARENTS = 5
- private val COL_CHILDREN = 6
+ private val colID = 0
+ private val colWorkflowID = 1
+ private val colRuntime = 3
+ private val colNproc = 4
+ private val colParents = 5
+ private val colChildren = 6
- private val TYPE_PARENTS = TableColumnType.Set(TableColumnType.String)
- private val TYPE_CHILDREN = TableColumnType.Set(TableColumnType.String)
+ private val typeParents = TableColumnType.Set(TableColumnType.String)
+ private val typeChildren = TableColumnType.Set(TableColumnType.String)
}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
index 35fb883a..2178fac6 100644
--- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
@@ -55,30 +55,41 @@ public class WfFormatTraceFormat : TraceFormat {
override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
- override fun getDetails(path: Path, table: String): TableDetails {
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
return when (table) {
- TABLE_TASKS -> TableDetails(
- listOf(
- TableColumn(TASK_ID, TableColumnType.String),
- TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
- TableColumn(TASK_RUNTIME, TableColumnType.Duration),
- TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
- TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
- TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String))
+ TABLE_TASKS ->
+ TableDetails(
+ listOf(
+ TableColumn(TASK_ID, TableColumnType.String),
+ TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
+ TableColumn(TASK_RUNTIME, TableColumnType.Duration),
+ TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)),
+ ),
)
- )
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
return when (table) {
TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile()))
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newWriter(path: Path, table: String): TableWriter {
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
throw UnsupportedOperationException("Writing not supported for this format")
}
}
diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt
index 0560d642..618cdf7d 100644
--- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt
@@ -69,11 +69,12 @@ internal class WfFormatTaskTableReaderTest {
@Test
fun testNoWorkflow() {
- val content = """
- {
- "name": "eager-nextflow-chameleon"
- }
- """.trimIndent()
+ val content =
+ """
+ {
+ "name": "eager-nextflow-chameleon"
+ }
+ """.trimIndent()
val parser = factory.createParser(content)
val reader = WfFormatTaskTableReader(parser)
@@ -88,12 +89,13 @@ internal class WfFormatTaskTableReaderTest {
@Test
fun testWorkflowArrayType() {
- val content = """
- {
- "name": "eager-nextflow-chameleon",
- "workflow": []
- }
- """.trimIndent()
+ val content =
+ """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": []
+ }
+ """.trimIndent()
val parser = factory.createParser(content)
val reader = WfFormatTaskTableReader(parser)
@@ -108,12 +110,13 @@ internal class WfFormatTaskTableReaderTest {
@Test
fun testWorkflowNullType() {
- val content = """
- {
- "name": "eager-nextflow-chameleon",
- "workflow": null
- }
- """.trimIndent()
+ val content =
+ """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": null
+ }
+ """.trimIndent()
val parser = factory.createParser(content)
val reader = WfFormatTaskTableReader(parser)
@@ -128,14 +131,15 @@ internal class WfFormatTaskTableReaderTest {
@Test
fun testNoJobs() {
- val content = """
- {
- "name": "eager-nextflow-chameleon",
- "workflow": {
+ val content =
+ """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+ }
}
- }
- """.trimIndent()
+ """.trimIndent()
val parser = factory.createParser(content)
val reader = WfFormatTaskTableReader(parser)
@@ -146,12 +150,13 @@ internal class WfFormatTaskTableReaderTest {
@Test
fun testJobsObjectType() {
- val content = """
- {
- "name": "eager-nextflow-chameleon",
- "workflow": { "jobs": {} }
- }
- """.trimIndent()
+ val content =
+ """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": { "jobs": {} }
+ }
+ """.trimIndent()
val parser = factory.createParser(content)
val reader = WfFormatTaskTableReader(parser)
@@ -162,12 +167,13 @@ internal class WfFormatTaskTableReaderTest {
@Test
fun testJobsNullType() {
- val content = """
- {
- "name": "eager-nextflow-chameleon",
- "workflow": { "jobs": null }
- }
- """.trimIndent()
+ val content =
+ """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": { "jobs": null }
+ }
+ """.trimIndent()
val parser = factory.createParser(content)
val reader = WfFormatTaskTableReader(parser)
@@ -178,14 +184,15 @@ internal class WfFormatTaskTableReaderTest {
@Test
fun testJobsInvalidChildType() {
- val content = """
- {
- "name": "eager-nextflow-chameleon",
- "workflow": {
- "jobs": [1]
+ val content =
+ """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+ "jobs": [1]
+ }
}
- }
- """.trimIndent()
+ """.trimIndent()
val parser = factory.createParser(content)
val reader = WfFormatTaskTableReader(parser)
@@ -196,18 +203,19 @@ internal class WfFormatTaskTableReaderTest {
@Test
fun testJobsValidChildType() {
- val content = """
- {
- "name": "eager-nextflow-chameleon",
- "workflow": {
- "jobs": [
- {
- "name": "test"
- }
- ]
+ val content =
+ """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+ "jobs": [
+ {
+ "name": "test"
+ }
+ ]
+ }
}
- }
- """.trimIndent()
+ """.trimIndent()
val parser = factory.createParser(content)
val reader = WfFormatTaskTableReader(parser)
@@ -220,19 +228,20 @@ internal class WfFormatTaskTableReaderTest {
@Test
fun testJobsInvalidParents() {
- val content = """
- {
- "name": "eager-nextflow-chameleon",
- "workflow": {
- "jobs": [
- {
- "name": "test",
- "parents": 1,
- }
- ]
+ val content =
+ """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+ "jobs": [
+ {
+ "name": "test",
+ "parents": 1,
+ }
+ ]
+ }
}
- }
- """.trimIndent()
+ """.trimIndent()
val parser = factory.createParser(content)
val reader = WfFormatTaskTableReader(parser)
@@ -243,19 +252,20 @@ internal class WfFormatTaskTableReaderTest {
@Test
fun testJobsInvalidParentsItem() {
- val content = """
- {
- "name": "eager-nextflow-chameleon",
- "workflow": {
- "jobs": [
- {
- "name": "test",
- "parents": [1],
- }
- ]
+ val content =
+ """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+ "jobs": [
+ {
+ "name": "test",
+ "parents": [1],
+ }
+ ]
+ }
}
- }
- """.trimIndent()
+ """.trimIndent()
val parser = factory.createParser(content)
val reader = WfFormatTaskTableReader(parser)
@@ -266,19 +276,20 @@ internal class WfFormatTaskTableReaderTest {
@Test
fun testJobsValidParents() {
- val content = """
- {
- "name": "eager-nextflow-chameleon",
- "workflow": {
- "jobs": [
- {
- "name": "test",
- "parents": ["1"]
- }
- ]
+ val content =
+ """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+ "jobs": [
+ {
+ "name": "test",
+ "parents": ["1"]
+ }
+ ]
+ }
}
- }
- """.trimIndent()
+ """.trimIndent()
val parser = factory.createParser(content)
val reader = WfFormatTaskTableReader(parser)
@@ -291,19 +302,20 @@ internal class WfFormatTaskTableReaderTest {
@Test
fun testJobsInvalidSecondEntry() {
- val content = """
- {
- "workflow": {
- "jobs": [
- {
- "name": "test",
- "parents": ["1"]
- },
- "test"
- ]
+ val content =
+ """
+ {
+ "workflow": {
+ "jobs": [
+ {
+ "name": "test",
+ "parents": ["1"]
+ },
+ "test"
+ ]
+ }
}
- }
- """.trimIndent()
+ """.trimIndent()
val parser = factory.createParser(content)
val reader = WfFormatTaskTableReader(parser)
@@ -315,25 +327,26 @@ internal class WfFormatTaskTableReaderTest {
@Test
fun testDuplicateJobsArray() {
- val content = """
- {
- "name": "eager-nextflow-chameleon",
- "workflow": {
- "jobs": [
- {
- "name": "test",
- "parents": ["1"]
- }
- ],
- "jobs": [
- {
- "name": "test2",
- "parents": ["test"]
- }
- ]
+ val content =
+ """
+ {
+ "name": "eager-nextflow-chameleon",
+ "workflow": {
+ "jobs": [
+ {
+ "name": "test",
+ "parents": ["1"]
+ }
+ ],
+ "jobs": [
+ {
+ "name": "test2",
+ "parents": ["test"]
+ }
+ ]
+ }
}
- }
- """.trimIndent()
+ """.trimIndent()
val parser = factory.createParser(content)
val reader = WfFormatTaskTableReader(parser)
diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
index 75f4b413..80a9d80e 100644
--- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
@@ -81,7 +81,7 @@ class WfFormatTraceFormatTest {
{ assertEquals("makebwaindex_mammoth_mt_krause.fasta", reader.getString(TASK_ID)) },
{ assertEquals("eager-nextflow-chameleon", reader.getString(TASK_WORKFLOW_ID)) },
{ assertEquals(172000, reader.getDuration(TASK_RUNTIME)?.toMillis()) },
- { assertEquals(emptySet<String>(), reader.getSet(TASK_PARENTS, String::class.java)) }
+ { assertEquals(emptySet<String>(), reader.getSet(TASK_PARENTS, String::class.java)) },
)
assertAll(
@@ -89,7 +89,7 @@ class WfFormatTraceFormatTest {
{ assertEquals("makeseqdict_mammoth_mt_krause.fasta", reader.getString(TASK_ID)) },
{ assertEquals("eager-nextflow-chameleon", reader.getString(TASK_WORKFLOW_ID)) },
{ assertEquals(175000, reader.getDuration(TASK_RUNTIME)?.toMillis()) },
- { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.getSet(TASK_PARENTS, String::class.java)) }
+ { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.getSet(TASK_PARENTS, String::class.java)) },
)
reader.close()
diff --git a/opendc-trace/opendc-trace-wtf/build.gradle.kts b/opendc-trace/opendc-trace-wtf/build.gradle.kts
index 599087e1..a3119e5e 100644
--- a/opendc-trace/opendc-trace-wtf/build.gradle.kts
+++ b/opendc-trace/opendc-trace-wtf/build.gradle.kts
@@ -22,7 +22,7 @@
description = "Support for Workflow Trace Format (WTF) traces in OpenDC"
-/* Build configuration */
+// Build configuration
plugins {
`kotlin-library-conventions`
}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
index 73c1b8a9..95582388 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
@@ -62,38 +62,38 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>)
}
}
- private val COL_ID = 0
- private val COL_WORKFLOW_ID = 1
- private val COL_SUBMIT_TIME = 2
- private val COL_WAIT_TIME = 3
- private val COL_RUNTIME = 4
- private val COL_REQ_NCPUS = 5
- private val COL_PARENTS = 6
- private val COL_CHILDREN = 7
- private val COL_GROUP_ID = 8
- private val COL_USER_ID = 9
-
- private val TYPE_PARENTS = TableColumnType.Set(TableColumnType.String)
- private val TYPE_CHILDREN = TableColumnType.Set(TableColumnType.String)
+ private val colID = 0
+ private val colWorkflowID = 1
+ private val colSubmitTime = 2
+ private val colWaitTime = 3
+ private val colRuntime = 4
+ private val colReqNcpus = 5
+ private val colParents = 6
+ private val colChildren = 7
+ private val colGroupID = 8
+ private val colUserID = 9
+
+ private val typeParents = TableColumnType.Set(TableColumnType.String)
+ private val typeChildren = TableColumnType.Set(TableColumnType.String)
override fun resolve(name: String): Int {
return when (name) {
- TASK_ID -> COL_ID
- TASK_WORKFLOW_ID -> COL_WORKFLOW_ID
- TASK_SUBMIT_TIME -> COL_SUBMIT_TIME
- TASK_WAIT_TIME -> COL_WAIT_TIME
- TASK_RUNTIME -> COL_RUNTIME
- TASK_REQ_NCPUS -> COL_REQ_NCPUS
- TASK_PARENTS -> COL_PARENTS
- TASK_CHILDREN -> COL_CHILDREN
- TASK_GROUP_ID -> COL_GROUP_ID
- TASK_USER_ID -> COL_USER_ID
+ TASK_ID -> colID
+ TASK_WORKFLOW_ID -> colWorkflowID
+ TASK_SUBMIT_TIME -> colSubmitTime
+ TASK_WAIT_TIME -> colWaitTime
+ TASK_RUNTIME -> colRuntime
+ TASK_REQ_NCPUS -> colReqNcpus
+ TASK_PARENTS -> colParents
+ TASK_CHILDREN -> colChildren
+ TASK_GROUP_ID -> colGroupID
+ TASK_USER_ID -> colUserID
else -> -1
}
}
override fun isNull(index: Int): Boolean {
- require(index in COL_ID..COL_USER_ID) { "Invalid column index" }
+ require(index in colID..colUserID) { "Invalid column index" }
return false
}
@@ -105,9 +105,9 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>)
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_REQ_NCPUS -> record.requestedCpus
- COL_GROUP_ID -> record.groupId
- COL_USER_ID -> record.userId
+ colReqNcpus -> record.requestedCpus
+ colGroupID -> record.groupId
+ colUserID -> record.userId
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -127,8 +127,8 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>)
override fun getString(index: Int): String {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_ID -> record.id
- COL_WORKFLOW_ID -> record.workflowId
+ colID -> record.id
+ colWorkflowID -> record.workflowId
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -140,7 +140,7 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>)
override fun getInstant(index: Int): Instant {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_SUBMIT_TIME -> record.submitTime
+ colSubmitTime -> record.submitTime
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -148,26 +148,36 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>)
override fun getDuration(index: Int): Duration {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_WAIT_TIME -> record.waitTime
- COL_RUNTIME -> record.runtime
+ colWaitTime -> record.waitTime
+ colRuntime -> record.runtime
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_PARENTS -> TYPE_PARENTS.convertTo(record.parents, elementType)
- COL_CHILDREN -> TYPE_CHILDREN.convertTo(record.children, elementType)
+ colParents -> typeParents.convertTo(record.parents, elementType)
+ colChildren -> typeChildren.convertTo(record.children, elementType)
else -> throw IllegalArgumentException("Invalid column")
}
}
- override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
throw IllegalArgumentException("Invalid column")
}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
index c25b512c..1386d2ef 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
@@ -55,27 +55,35 @@ public class WtfTraceFormat : TraceFormat {
override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS)
- override fun getDetails(path: Path, table: String): TableDetails {
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
return when (table) {
- TABLE_TASKS -> TableDetails(
- listOf(
- TableColumn(TASK_ID, TableColumnType.String),
- TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
- TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
- TableColumn(TASK_WAIT_TIME, TableColumnType.Duration),
- TableColumn(TASK_RUNTIME, TableColumnType.Duration),
- TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
- TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
- TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)),
- TableColumn(TASK_GROUP_ID, TableColumnType.Int),
- TableColumn(TASK_USER_ID, TableColumnType.Int)
+ TABLE_TASKS ->
+ TableDetails(
+ listOf(
+ TableColumn(TASK_ID, TableColumnType.String),
+ TableColumn(TASK_WORKFLOW_ID, TableColumnType.String),
+ TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant),
+ TableColumn(TASK_WAIT_TIME, TableColumnType.Duration),
+ TableColumn(TASK_RUNTIME, TableColumnType.Duration),
+ TableColumn(TASK_REQ_NCPUS, TableColumnType.Int),
+ TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(TASK_GROUP_ID, TableColumnType.Int),
+ TableColumn(TASK_USER_ID, TableColumnType.Int),
+ ),
)
- )
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
return when (table) {
TABLE_TASKS -> {
val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection), strictTyping = false)
@@ -85,7 +93,10 @@ public class WtfTraceFormat : TraceFormat {
}
}
- override fun newWriter(path: Path, table: String): TableWriter {
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
throw UnsupportedOperationException("Writing not supported for this format")
}
}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt
index 71557f96..a1db0cab 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt
@@ -38,5 +38,5 @@ internal data class Task(
val groupId: Int,
val userId: Int,
val parents: Set<String>,
- val children: Set<String>
+ val children: Set<String>,
)
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
index 33be38d4..1f9c506d 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
@@ -51,18 +51,19 @@ internal class TaskReadSupport(private val projection: List<String>?) : ReadSupp
/**
* Mapping of table columns to their Parquet column names.
*/
- private val colMap = mapOf(
- TASK_ID to "id",
- TASK_WORKFLOW_ID to "workflow_id",
- TASK_SUBMIT_TIME to "ts_submit",
- TASK_WAIT_TIME to "wait_time",
- TASK_RUNTIME to "runtime",
- TASK_REQ_NCPUS to "resource_amount_requested",
- TASK_PARENTS to "parents",
- TASK_CHILDREN to "children",
- TASK_GROUP_ID to "group_id",
- TASK_USER_ID to "user_id"
- )
+ private val colMap =
+ mapOf(
+ TASK_ID to "id",
+ TASK_WORKFLOW_ID to "workflow_id",
+ TASK_SUBMIT_TIME to "ts_submit",
+ TASK_WAIT_TIME to "wait_time",
+ TASK_RUNTIME to "runtime",
+ TASK_REQ_NCPUS to "resource_amount_requested",
+ TASK_PARENTS to "parents",
+ TASK_CHILDREN to "children",
+ TASK_GROUP_ID to "group_id",
+ TASK_USER_ID to "user_id",
+ )
override fun init(context: InitContext): ReadContext {
val projectedSchema =
@@ -87,7 +88,7 @@ internal class TaskReadSupport(private val projection: List<String>?) : ReadSupp
configuration: Configuration,
keyValueMetaData: Map<String, String>,
fileSchema: MessageType,
- readContext: ReadContext
+ readContext: ReadContext,
): RecordMaterializer<Task> = TaskRecordMaterializer(readContext.requestedSchema)
companion object {
@@ -95,52 +96,53 @@ internal class TaskReadSupport(private val projection: List<String>?) : ReadSupp
* Parquet read schema for the "tasks" table in the trace.
*/
@JvmStatic
- val READ_SCHEMA: MessageType = Types.buildMessage()
- .addFields(
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("id"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("workflow_id"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("ts_submit"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("wait_time"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
- .named("runtime"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("resource_amount_requested"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT32)
- .named("user_id"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT32)
- .named("group_id"),
- Types
- .buildGroup(Type.Repetition.OPTIONAL)
- .addField(
- Types.repeatedGroup()
- .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
- .named("list")
- )
- .`as`(LogicalTypeAnnotation.listType())
- .named("children"),
- Types
- .buildGroup(Type.Repetition.OPTIONAL)
- .addField(
- Types.repeatedGroup()
- .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
- .named("list")
- )
- .`as`(LogicalTypeAnnotation.listType())
- .named("parents")
- )
- .named("task")
+ val READ_SCHEMA: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("workflow_id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("ts_submit"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("wait_time"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("runtime"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("resource_amount_requested"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("user_id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("group_id"),
+ Types
+ .buildGroup(Type.Repetition.OPTIONAL)
+ .addField(
+ Types.repeatedGroup()
+ .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
+ .named("list"),
+ )
+ .`as`(LogicalTypeAnnotation.listType())
+ .named("children"),
+ Types
+ .buildGroup(Type.Repetition.OPTIONAL)
+ .addField(
+ Types.repeatedGroup()
+ .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
+ .named("list"),
+ )
+ .`as`(LogicalTypeAnnotation.listType())
+ .named("parents"),
+ )
+ .named("task")
}
}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt
index 055be0c3..412a4f8b 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt
@@ -39,102 +39,113 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<
/**
* State of current record being read.
*/
- private var _id = ""
- private var _workflowId = ""
- private var _submitTime = Instant.MIN
- private var _waitTime = Duration.ZERO
- private var _runtime = Duration.ZERO
- private var _requestedCpus = 0
- private var _groupId = 0
- private var _userId = 0
- private var _parents = mutableSetOf<String>()
- private var _children = mutableSetOf<String>()
+ private var localID = ""
+ private var localWorkflowID = ""
+ private var localSubmitTime = Instant.MIN
+ private var localWaitTime = Duration.ZERO
+ private var localRuntime = Duration.ZERO
+ private var localRequestedCpus = 0
+ private var localGroupId = 0
+ private var localUserId = 0
+ private var localParents = mutableSetOf<String>()
+ private var localChildren = mutableSetOf<String>()
/**
* Root converter for the record.
*/
- private val root = object : GroupConverter() {
- /**
- * The converters for the columns of the schema.
- */
- private val converters = schema.fields.map { type ->
- when (type.name) {
- "id" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- _id = value.toString()
+ private val root =
+ object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters =
+ schema.fields.map { type ->
+ when (type.name) {
+ "id" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localID = value.toString()
+ }
+ }
+ "workflow_id" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localWorkflowID = value.toString()
+ }
+ }
+ "ts_submit" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localSubmitTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "wait_time" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localWaitTime = Duration.ofMillis(value)
+ }
+ }
+ "runtime" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localRuntime = Duration.ofMillis(value)
+ }
+ }
+ "resource_amount_requested" ->
+ object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ localRequestedCpus = value.roundToInt()
+ }
+ }
+ "group_id" ->
+ object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ localGroupId = value
+ }
+ }
+ "user_id" ->
+ object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ localUserId = value
+ }
+ }
+ "children" -> RelationConverter(localChildren)
+ "parents" -> RelationConverter(localParents)
+ else -> error("Unknown column $type")
}
}
- "workflow_id" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- _workflowId = value.toString()
- }
- }
- "ts_submit" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- _submitTime = Instant.ofEpochMilli(value)
- }
- }
- "wait_time" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- _waitTime = Duration.ofMillis(value)
- }
- }
- "runtime" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- _runtime = Duration.ofMillis(value)
- }
- }
- "resource_amount_requested" -> object : PrimitiveConverter() {
- override fun addDouble(value: Double) {
- _requestedCpus = value.roundToInt()
- }
- }
- "group_id" -> object : PrimitiveConverter() {
- override fun addInt(value: Int) {
- _groupId = value
- }
- }
- "user_id" -> object : PrimitiveConverter() {
- override fun addInt(value: Int) {
- _userId = value
- }
- }
- "children" -> RelationConverter(_children)
- "parents" -> RelationConverter(_parents)
- else -> error("Unknown column $type")
- }
- }
- override fun start() {
- _id = ""
- _workflowId = ""
- _submitTime = Instant.MIN
- _waitTime = Duration.ZERO
- _runtime = Duration.ZERO
- _requestedCpus = 0
- _groupId = 0
- _userId = 0
- _parents.clear()
- _children.clear()
- }
+ override fun start() {
+ localID = ""
+ localWorkflowID = ""
+ localSubmitTime = Instant.MIN
+ localWaitTime = Duration.ZERO
+ localRuntime = Duration.ZERO
+ localRequestedCpus = 0
+ localGroupId = 0
+ localUserId = 0
+ localParents.clear()
+ localChildren.clear()
+ }
- override fun end() {}
+ override fun end() {}
- override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
- }
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
- override fun getCurrentRecord(): Task = Task(
- _id,
- _workflowId,
- _submitTime,
- _waitTime,
- _runtime,
- _requestedCpus,
- _groupId,
- _userId,
- _parents.toSet(),
- _children.toSet()
- )
+ override fun getCurrentRecord(): Task =
+ Task(
+ localID,
+ localWorkflowID,
+ localSubmitTime,
+ localWaitTime,
+ localRuntime,
+ localRequestedCpus,
+ localGroupId,
+ localUserId,
+ localParents.toSet(),
+ localChildren.toSet(),
+ )
override fun getRootConverter(): GroupConverter = root
@@ -142,25 +153,28 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<
* Helper class to convert parent and child relations and add them to [relations].
*/
private class RelationConverter(private val relations: MutableSet<String>) : GroupConverter() {
- private val entryConverter = object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- relations.add(value.toString())
- }
+ private val entryConverter =
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ relations.add(value.toString())
+ }
- override fun addDouble(value: Double) {
- relations.add(value.roundToLong().toString())
+ override fun addDouble(value: Double) {
+ relations.add(value.roundToLong().toString())
+ }
}
- }
- private val listConverter = object : GroupConverter() {
- override fun getConverter(fieldIndex: Int): Converter {
- require(fieldIndex == 0)
- return entryConverter
- }
+ private val listConverter =
+ object : GroupConverter() {
+ override fun getConverter(fieldIndex: Int): Converter {
+ require(fieldIndex == 0)
+ return entryConverter
+ }
- override fun start() {}
- override fun end() {}
- }
+ override fun start() {}
+
+ override fun end() {}
+ }
override fun getConverter(fieldIndex: Int): Converter {
require(fieldIndex == 0)
@@ -168,6 +182,7 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<
}
override fun start() {}
+
override fun end() {}
}
}
diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
index 0457098c..ad49cce0 100644
--- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
@@ -87,9 +87,9 @@ class WtfTraceFormatTest {
{
assertEquals(
setOf("584055316413447529", "133113685133695608", "1008582348422865408"),
- reader.getSet(TASK_PARENTS, String::class.java)
+ reader.getSet(TASK_PARENTS, String::class.java),
)
- }
+ },
)
assertAll(
@@ -101,9 +101,9 @@ class WtfTraceFormatTest {
{
assertEquals(
setOf("584055316413447529", "133113685133695608", "1008582348422865408"),
- reader.getSet(TASK_PARENTS, String::class.java)
+ reader.getSet(TASK_PARENTS, String::class.java),
)
- }
+ },
)
reader.close()