diff options
Diffstat (limited to 'opendc-trace')
82 files changed, 2470 insertions, 1796 deletions
diff --git a/opendc-trace/opendc-trace-api/build.gradle.kts b/opendc-trace/opendc-trace-api/build.gradle.kts index 977eec0d..514cd777 100644 --- a/opendc-trace/opendc-trace-api/build.gradle.kts +++ b/opendc-trace/opendc-trace-api/build.gradle.kts @@ -22,7 +22,7 @@ description = "Workload trace library for OpenDC" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt index 0f75d890..4e82e48a 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt @@ -32,5 +32,5 @@ package org.opendc.trace public data class TableColumn( public val name: String, public val type: TableColumnType, - public val isNullable: Boolean = false + public val isNullable: Boolean = false, ) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt index 42b1c690..95a58935 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableReader.kt @@ -143,7 +143,10 @@ public interface TableReader : AutoCloseable { * @throws IllegalArgumentException if the column index is not valid for this reader or this type. * @return The value of the column as `List` or `null` if the column is null. */ - public fun <T> getList(index: Int, elementType: Class<T>): List<T>? + public fun <T> getList( + index: Int, + elementType: Class<T>, + ): List<T>? /** * Obtain the value of the column with the specified [index] as [Set]. @@ -153,7 +156,10 @@ public interface TableReader : AutoCloseable { * @throws IllegalArgumentException if the column index is not valid for this reader or this type. * @return The value of the column as `Set` or `null` if the column is null. */ - public fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? + public fun <T> getSet( + index: Int, + elementType: Class<T>, + ): Set<T>? /** * Obtain the value of the column with the specified [index] as [Set]. @@ -164,7 +170,11 @@ public interface TableReader : AutoCloseable { * @throws IllegalArgumentException if the column index is not valid for this reader or this type. * @return The value of the column as `Map` or `null` if the column is null. */ - public fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? + public fun <K, V> getMap( + index: Int, + keyType: Class<K>, + valueType: Class<V>, + ): Map<K, V>? /** * Determine whether a column named [name] has a `null` value for the current row. @@ -264,7 +274,10 @@ public interface TableReader : AutoCloseable { * @throws IllegalArgumentException if the column index is not valid for this reader or this type. * @return The value of the column as `List` or `null` if the column is null. */ - public fun <T> getList(name: String, elementType: Class<T>): List<T>? = getList(resolve(name), elementType) + public fun <T> getList( + name: String, + elementType: Class<T>, + ): List<T>? = getList(resolve(name), elementType) /** * Obtain the value of the column named [name] as [Set]. @@ -274,7 +287,10 @@ public interface TableReader : AutoCloseable { * @throws IllegalArgumentException if the column index is not valid for this reader or this type. * @return The value of the column as `Set` or `null` if the column is null. */ - public fun <T> getSet(name: String, elementType: Class<T>): Set<T>? = getSet(resolve(name), elementType) + public fun <T> getSet( + name: String, + elementType: Class<T>, + ): Set<T>? = getSet(resolve(name), elementType) /** * Obtain the value of the column named [name] as [Set]. @@ -285,8 +301,11 @@ public interface TableReader : AutoCloseable { * @throws IllegalArgumentException if the column index is not valid for this reader or this type. * @return The value of the column as `Map` or `null` if the column is null. */ - public fun <K, V> getMap(name: String, keyType: Class<K>, valueType: Class<V>): Map<K, V>? = - getMap(resolve(name), keyType, valueType) + public fun <K, V> getMap( + name: String, + keyType: Class<K>, + valueType: Class<V>, + ): Map<K, V>? = getMap(resolve(name), keyType, valueType) /** * Closes the reader so that no further iteration or data access can be made. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt index 3b02794d..133bd01c 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableWriter.kt @@ -55,7 +55,10 @@ public interface TableWriter : AutoCloseable { * @param value The boolean value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setBoolean(index: Int, value: Boolean) + public fun setBoolean( + index: Int, + value: Boolean, + ) /** * Set the column with index [index] to integer [value]. @@ -64,7 +67,10 @@ public interface TableWriter : AutoCloseable { * @param value The integer value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setInt(index: Int, value: Int) + public fun setInt( + index: Int, + value: Int, + ) /** * Set the column with index [index] to long [value]. @@ -73,7 +79,10 @@ public interface TableWriter : AutoCloseable { * @param value The long value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setLong(index: Int, value: Long) + public fun setLong( + index: Int, + value: Long, + ) /** * Set the column with index [index] to float [value]. @@ -82,7 +91,10 @@ public interface TableWriter : AutoCloseable { * @param value The float value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setFloat(index: Int, value: Float) + public fun setFloat( + index: Int, + value: Float, + ) /** * Set the column with index [index] to double [value]. @@ -91,7 +103,10 @@ public interface TableWriter : AutoCloseable { * @param value The double value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setDouble(index: Int, value: Double) + public fun setDouble( + index: Int, + value: Double, + ) /** * Set the column with index [index] to [String] [value]. @@ -100,7 +115,10 @@ public interface TableWriter : AutoCloseable { * @param value The [String] value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setString(index: Int, value: String) + public fun setString( + index: Int, + value: String, + ) /** * Set the column with index [index] to [UUID] [value]. @@ -109,7 +127,10 @@ public interface TableWriter : AutoCloseable { * @param value The [UUID] value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setUUID(index: Int, value: UUID) + public fun setUUID( + index: Int, + value: UUID, + ) /** * Set the column with index [index] to [Instant] [value]. @@ -118,7 +139,10 @@ public interface TableWriter : AutoCloseable { * @param value The [Instant] value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setInstant(index: Int, value: Instant) + public fun setInstant( + index: Int, + value: Instant, + ) /** * Set the column with index [index] to [Duration] [value]. @@ -127,7 +151,10 @@ public interface TableWriter : AutoCloseable { * @param value The [Duration] value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setDuration(index: Int, value: Duration) + public fun setDuration( + index: Int, + value: Duration, + ) /** * Set the column with index [index] to [List] [value]. @@ -136,7 +163,10 @@ public interface TableWriter : AutoCloseable { * @param value The [Map] value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun <T> setList(index: Int, value: List<T>) + public fun <T> setList( + index: Int, + value: List<T>, + ) /** * Set the column with index [index] to [Set] [value]. @@ -145,7 +175,10 @@ public interface TableWriter : AutoCloseable { * @param value The [Set] value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun <T> setSet(index: Int, value: Set<T>) + public fun <T> setSet( + index: Int, + value: Set<T>, + ) /** * Set the column with index [index] to [Map] [value]. @@ -154,7 +187,10 @@ public interface TableWriter : AutoCloseable { * @param value The [Map] value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun <K, V> setMap(index: Int, value: Map<K, V>) + public fun <K, V> setMap( + index: Int, + value: Map<K, V>, + ) /** * Set the column named [name] to boolean [value]. @@ -163,7 +199,10 @@ public interface TableWriter : AutoCloseable { * @param value The boolean value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setBoolean(name: String, value: Boolean): Unit = setBoolean(resolve(name), value) + public fun setBoolean( + name: String, + value: Boolean, + ): Unit = setBoolean(resolve(name), value) /** * Set the column named [name] to integer [value]. @@ -172,7 +211,10 @@ public interface TableWriter : AutoCloseable { * @param value The integer value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setInt(name: String, value: Int): Unit = setInt(resolve(name), value) + public fun setInt( + name: String, + value: Int, + ): Unit = setInt(resolve(name), value) /** * Set the column named [name] to long [value]. @@ -181,7 +223,10 @@ public interface TableWriter : AutoCloseable { * @param value The long value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setLong(name: String, value: Long): Unit = setLong(resolve(name), value) + public fun setLong( + name: String, + value: Long, + ): Unit = setLong(resolve(name), value) /** * Set the column named [name] to float [value]. @@ -190,7 +235,10 @@ public interface TableWriter : AutoCloseable { * @param value The float value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setFloat(name: String, value: Float): Unit = setFloat(resolve(name), value) + public fun setFloat( + name: String, + value: Float, + ): Unit = setFloat(resolve(name), value) /** * Set the column named [name] to double [value]. @@ -199,7 +247,10 @@ public interface TableWriter : AutoCloseable { * @param value The double value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setDouble(name: String, value: Double): Unit = setDouble(resolve(name), value) + public fun setDouble( + name: String, + value: Double, + ): Unit = setDouble(resolve(name), value) /** * Set the column named [name] to [String] [value]. @@ -208,7 +259,10 @@ public interface TableWriter : AutoCloseable { * @param value The [String] value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setString(name: String, value: String): Unit = setString(resolve(name), value) + public fun setString( + name: String, + value: String, + ): Unit = setString(resolve(name), value) /** * Set the column named [name] to [UUID] [value]. @@ -217,7 +271,10 @@ public interface TableWriter : AutoCloseable { * @param value The [UUID] value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setUUID(name: String, value: UUID): Unit = setUUID(resolve(name), value) + public fun setUUID( + name: String, + value: UUID, + ): Unit = setUUID(resolve(name), value) /** * Set the column named [name] to [Instant] [value]. @@ -226,7 +283,10 @@ public interface TableWriter : AutoCloseable { * @param value The [Instant] value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setInstant(name: String, value: Instant): Unit = setInstant(resolve(name), value) + public fun setInstant( + name: String, + value: Instant, + ): Unit = setInstant(resolve(name), value) /** * Set the column named [name] to [Duration] [value]. @@ -235,7 +295,10 @@ public interface TableWriter : AutoCloseable { * @param value The [Duration] value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun setDuration(name: String, value: Duration): Unit = setDuration(resolve(name), value) + public fun setDuration( + name: String, + value: Duration, + ): Unit = setDuration(resolve(name), value) /** * Set the column named [name] to [List] [value]. @@ -244,7 +307,10 @@ public interface TableWriter : AutoCloseable { * @param value The [List] value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun <T> setList(name: String, value: List<T>): Unit = setList(resolve(name), value) + public fun <T> setList( + name: String, + value: List<T>, + ): Unit = setList(resolve(name), value) /** * Set the column named [name] to [Set] [value]. @@ -253,7 +319,10 @@ public interface TableWriter : AutoCloseable { * @param value The [Set] value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun <T> setSet(name: String, value: Set<T>): Unit = setSet(resolve(name), value) + public fun <T> setSet( + name: String, + value: Set<T>, + ): Unit = setSet(resolve(name), value) /** * Set the column named [name] to [Map] [value]. @@ -262,7 +331,10 @@ public interface TableWriter : AutoCloseable { * @param value The [Map] value to set the column to. * @throws IllegalArgumentException if the column is not valid for this method. */ - public fun <K, V> setMap(name: String, value: Map<K, V>): Unit = setMap(resolve(name), value) + public fun <K, V> setMap( + name: String, + value: Map<K, V>, + ): Unit = setMap(resolve(name), value) /** * Flush any buffered content to the underlying target. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt index 64e8f272..a1059e9e 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Trace.kt @@ -55,7 +55,10 @@ public interface Trace { * @throws IllegalArgumentException if [format] is not supported. */ @JvmStatic - public fun open(path: File, format: String): Trace = open(path.toPath(), format) + public fun open( + path: File, + format: String, + ): Trace = open(path.toPath(), format) /** * Open a [Trace] at the specified [path] in the given [format]. @@ -65,7 +68,10 @@ public interface Trace { * @throws IllegalArgumentException if [format] is not supported. */ @JvmStatic - public fun open(path: Path, format: String): Trace { + public fun open( + path: Path, + format: String, + ): Trace { val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } return TraceImpl(provider, path) } @@ -77,7 +83,10 @@ public interface Trace { * @param format The format of the trace to create. */ @JvmStatic - public fun create(path: File, format: String): Trace = create(path.toPath(), format) + public fun create( + path: File, + format: String, + ): Trace = create(path.toPath(), format) /** * Create a [Trace] at the specified [path] in the given [format]. @@ -86,7 +95,10 @@ public interface Trace { * @param format The format of the trace to create. */ @JvmStatic - public fun create(path: Path, format: String): Trace { + public fun create( + path: Path, + format: String, + ): Trace { val provider = requireNotNull(TraceFormat.byName(format)) { "Unknown format $format" } provider.create(path) return TraceImpl(provider, path) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt index 89f8dbc4..046dd13d 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt @@ -28,40 +28,40 @@ package org.opendc.trace.conv * Identifier of the resource. */ @JvmField -public val RESOURCE_ID: String = "id" +public val resourceID: String = "id" /** * The cluster to which the resource belongs. */ @JvmField -public val RESOURCE_CLUSTER_ID: String = "cluster_id" +public val resourceClusterID: String = "cluster_id" /** * Start time for the resource. */ @JvmField -public val RESOURCE_START_TIME: String = "start_time" +public val resourceStartTime: String = "start_time" /** * End time for the resource. */ @JvmField -public val RESOURCE_STOP_TIME: String = "stop_time" +public val resourceStopTime: String = "stop_time" /** * Number of CPUs for the resource. */ @JvmField -public val RESOURCE_CPU_COUNT: String = "cpu_count" +public val resourceCpuCount: String = "cpu_count" /** * Total CPU capacity of the resource in MHz. */ @JvmField -public val RESOURCE_CPU_CAPACITY: String = "cpu_capacity" +public val resourceCpuCapacity: String = "cpu_capacity" /** * Memory capacity for the resource in KB. */ @JvmField -public val RESOURCE_MEM_CAPACITY: String = "mem_capacity" +public val resourceMemCapacity: String = "mem_capacity" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt index 5187d501..eede6bd6 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt @@ -28,70 +28,70 @@ package org.opendc.trace.conv * The timestamp at which the state was recorded. */ @JvmField -public val RESOURCE_STATE_TIMESTAMP: String = "timestamp" +public val resourceStateTimestamp: String = "timestamp" /** * Duration for the state. */ @JvmField -public val RESOURCE_STATE_DURATION: String = "duration" +public val resourceStateDuration: String = "duration" /** * A flag to indicate that the resource is powered on. */ @JvmField -public val RESOURCE_STATE_POWERED_ON: String = "powered_on" +public val resourceStatePoweredOn: String = "powered_on" /** * Total CPU usage of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE: String = "cpu_usage" +public val resourceStateCpuUsage: String = "cpu_usage" /** * Total CPU usage of the resource in percentage. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE_PCT: String = "cpu_usage_pct" +public val resourceStateCpuUsagePct: String = "cpu_usage_pct" /** * Total CPU demand of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_DEMAND: String = "cpu_demand" +public val resourceStateCpuDemand: String = "cpu_demand" /** * CPU ready percentage. */ @JvmField -public val RESOURCE_STATE_CPU_READY_PCT: String = "cpu_ready_pct" +public val resourceStateCpuReadyPct: String = "cpu_ready_pct" /** * Memory usage of the resource in KB. */ @JvmField -public val RESOURCE_STATE_MEM_USAGE: String = "mem_usage" +public val resourceStateMemUsage: String = "mem_usage" /** * Disk read throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_READ: String = "disk_read" +public val resourceStateDiskRead: String = "disk_read" /** * Disk write throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_WRITE: String = "disk_write" +public val resourceStateDiskWrite: String = "disk_write" /** * Network receive throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_RX: String = "net_rx" +public val resourceStateNetRx: String = "net_rx" /** * Network transmit throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_TX: String = "net_tx" +public val resourceStateNetTx: String = "net_tx" diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt index 46ef051d..83537822 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -61,7 +61,10 @@ public interface TraceFormat { * @throws IllegalArgumentException If [table] does not exist. * @return The [TableDetails] for the specified [table]. */ - public fun getDetails(path: Path, table: String): TableDetails + public fun getDetails( + path: Path, + table: String, + ): TableDetails /** * Open a [TableReader] for the specified [table]. @@ -72,7 +75,11 @@ public interface TraceFormat { * @throws IllegalArgumentException If [table] does not exist. * @return A [TableReader] instance for the table. */ - public fun newReader(path: Path, table: String, projection: List<String>?): TableReader + public fun newReader( + path: Path, + table: String, + projection: List<String>?, + ): TableReader /** * Open a [TableWriter] for the specified [table]. @@ -83,7 +90,10 @@ public interface TraceFormat { * @throws UnsupportedOperationException If the format does not support writing. * @return A [TableWriter] instance for the table. */ - public fun newWriter(path: Path, table: String): TableWriter + public fun newWriter( + path: Path, + table: String, + ): TableWriter /** * A helper object for resolving providers. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt index 2fe820c4..4b9a0d95 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/CompositeTableReader.kt @@ -122,17 +122,27 @@ public abstract class CompositeTableReader : TableReader { return delegate.getDuration(index) } - override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + override fun <T> getList( + index: Int, + elementType: Class<T>, + ): List<T>? { val delegate = checkNotNull(delegate) { "Invalid reader state" } return delegate.getList(index, elementType) } - override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + override fun <T> getSet( + index: Int, + elementType: Class<T>, + ): Set<T>? { val delegate = checkNotNull(delegate) { "Invalid reader state" } return delegate.getSet(index, elementType) } - override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + override fun <K, V> getMap( + index: Int, + keyType: Class<K>, + valueType: Class<V>, + ): Map<K, V>? { val delegate = checkNotNull(delegate) { "Invalid reader state" } return delegate.getMap(index, keyType, valueType) } diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt index 26739e34..fda2bc54 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/util/TableColumnConversion.kt @@ -32,7 +32,10 @@ import java.util.UUID /** * Helper method to convert a [List] into a [List] with elements of [targetElementType]. */ -public fun <T> TableColumnType.List.convertTo(value: List<*>?, targetElementType: Class<T>): List<T>? { +public fun <T> TableColumnType.List.convertTo( + value: List<*>?, + targetElementType: Class<T>, +): List<T>? { require(elementType.isCompatible(targetElementType)) { "Target element type is not compatible with $elementType" } @Suppress("UNCHECKED_CAST") return value as List<T>? @@ -41,7 +44,10 @@ public fun <T> TableColumnType.List.convertTo(value: List<*>?, targetElementType /** * Helper method to convert a [Set] into a [Set] with elements of [targetElementType]. */ -public fun <T> TableColumnType.Set.convertTo(value: Set<*>?, targetElementType: Class<T>): Set<T>? { +public fun <T> TableColumnType.Set.convertTo( + value: Set<*>?, + targetElementType: Class<T>, +): Set<T>? { require(elementType.isCompatible(targetElementType)) { "Target element type is not compatible with $elementType" } @Suppress("UNCHECKED_CAST") return value as Set<T>? @@ -50,7 +56,11 @@ public fun <T> TableColumnType.Set.convertTo(value: Set<*>?, targetElementType: /** * Helper method to convert a [Map] into a [Map] with [targetKeyType] keys and [targetValueType] values. */ -public fun <K, V> TableColumnType.Map.convertTo(value: Map<*, *>?, targetKeyType: Class<K>, targetValueType: Class<V>): Map<K, V>? { +public fun <K, V> TableColumnType.Map.convertTo( + value: Map<*, *>?, + targetKeyType: Class<K>, + targetValueType: Class<V>, +): Map<K, V>? { require(keyType.isCompatible(targetKeyType)) { "Target key type $targetKeyType is not compatible with $keyType" } require(valueType.isCompatible(targetValueType)) { "Target value type $targetValueType is not compatible with $valueType" } @Suppress("UNCHECKED_CAST") diff --git a/opendc-trace/opendc-trace-azure/build.gradle.kts b/opendc-trace/opendc-trace-azure/build.gradle.kts index ee53c583..21b8b439 100644 --- a/opendc-trace/opendc-trace-azure/build.gradle.kts +++ b/opendc-trace/opendc-trace-azure/build.gradle.kts @@ -22,7 +22,7 @@ description = "Support for Azure VM traces in OpenDC" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` `benchmark-conventions` diff --git a/opendc-trace/opendc-trace-azure/src/jmh/kotlin/org/opendc/trace/azure/AzureTraceBenchmarks.kt b/opendc-trace/opendc-trace-azure/src/jmh/kotlin/org/opendc/trace/azure/AzureTraceBenchmarks.kt index 6759f38a..bb3c2450 100644 --- a/opendc-trace/opendc-trace-azure/src/jmh/kotlin/org/opendc/trace/azure/AzureTraceBenchmarks.kt +++ b/opendc-trace/opendc-trace-azure/src/jmh/kotlin/org/opendc/trace/azure/AzureTraceBenchmarks.kt @@ -22,9 +22,9 @@ package org.opendc.trace.azure -import org.opendc.trace.conv.RESOURCE_ID import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceID import org.opendc.trace.spi.TraceFormat import org.openjdk.jmh.annotations.Benchmark import org.openjdk.jmh.annotations.Fork @@ -58,7 +58,7 @@ class AzureTraceBenchmarks { fun benchmarkResourcesReader(bh: Blackhole) { val reader = format.newReader(path, TABLE_RESOURCES, null) try { - val idColumn = reader.resolve(RESOURCE_ID) + val idColumn = reader.resolve(resourceID) while (reader.nextRow()) { bh.consume(reader.getString(idColumn)) } @@ -71,7 +71,7 @@ class AzureTraceBenchmarks { fun benchmarkResourceStatesReader(bh: Blackhole) { val reader = format.newReader(path, TABLE_RESOURCE_STATES, null) try { - val idColumn = reader.resolve(RESOURCE_ID) + val idColumn = reader.resolve(resourceID) while (reader.nextRow()) { bh.consume(reader.getString(idColumn)) } diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt index 0c60c75d..bcf6ff52 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceStateTableReader.kt @@ -26,9 +26,9 @@ import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.TableReader -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceStateCpuUsagePct +import org.opendc.trace.conv.resourceStateTimestamp import java.time.Duration import java.time.Instant import java.util.UUID @@ -74,21 +74,21 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta return true } - private val COL_ID = 0 - private val COL_TIMESTAMP = 1 - private val COL_CPU_USAGE_PCT = 2 + private val colID = 0 + private val colTimestamp = 1 + private val colCpuUsagePct = 2 override fun resolve(name: String): Int { return when (name) { - RESOURCE_ID -> COL_ID - RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP - RESOURCE_STATE_CPU_USAGE_PCT -> COL_CPU_USAGE_PCT + resourceID -> colID + resourceStateTimestamp -> colTimestamp + resourceStateCpuUsagePct -> colCpuUsagePct else -> -1 } } override fun isNull(index: Int): Boolean { - require(index in 0..COL_CPU_USAGE_PCT) { "Invalid column index" } + require(index in 0..colCpuUsagePct) { "Invalid column index" } return false } @@ -111,7 +111,7 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta override fun getDouble(index: Int): Double { checkActive() return when (index) { - COL_CPU_USAGE_PCT -> cpuUsagePct + colCpuUsagePct -> cpuUsagePct else -> throw IllegalArgumentException("Invalid column") } } @@ -119,7 +119,7 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta override fun getString(index: Int): String? { checkActive() return when (index) { - COL_ID -> id + colID -> id else -> throw IllegalArgumentException("Invalid column") } } @@ -131,7 +131,7 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta override fun getInstant(index: Int): Instant? { checkActive() return when (index) { - COL_TIMESTAMP -> timestamp + colTimestamp -> timestamp else -> throw IllegalArgumentException("Invalid column") } } @@ -140,15 +140,25 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta throw IllegalArgumentException("Invalid column") } - override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + override fun <T> getList( + index: Int, + elementType: Class<T>, + ): List<T>? { throw IllegalArgumentException("Invalid column") } - override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + override fun <K, V> getMap( + index: Int, + keyType: Class<K>, + valueType: Class<V>, + ): Map<K, V>? { throw IllegalArgumentException("Invalid column") } - override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + override fun <T> getSet( + index: Int, + elementType: Class<T>, + ): Set<T>? { throw IllegalArgumentException("Invalid column") } @@ -196,13 +206,14 @@ internal class AzureResourceStateTableReader(private val parser: CsvParser) : Ta /** * The [CsvSchema] that is used to parse the trace. */ - private val schema = CsvSchema.builder() - .addColumn("timestamp", CsvSchema.ColumnType.NUMBER) - .addColumn("vm id", CsvSchema.ColumnType.STRING) - .addColumn("CPU min cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU max cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU avg cpu", CsvSchema.ColumnType.NUMBER) - .setAllowComments(true) - .build() + private val schema = + CsvSchema.builder() + .addColumn("timestamp", CsvSchema.ColumnType.NUMBER) + .addColumn("vm id", CsvSchema.ColumnType.STRING) + .addColumn("CPU min cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU max cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU avg cpu", CsvSchema.ColumnType.NUMBER) + .setAllowComments(true) + .build() } } diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt index c0acb67a..d86a0466 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureResourceTableReader.kt @@ -26,11 +26,11 @@ import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.TableReader -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY -import org.opendc.trace.conv.RESOURCE_START_TIME -import org.opendc.trace.conv.RESOURCE_STOP_TIME +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStartTime +import org.opendc.trace.conv.resourceStopTime import java.time.Duration import java.time.Instant import java.util.UUID @@ -78,25 +78,25 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe return true } - private val COL_ID = 0 - private val COL_START_TIME = 1 - private val COL_STOP_TIME = 2 - private val COL_CPU_COUNT = 3 - private val COL_MEM_CAPACITY = 4 + private val colID = 0 + private val colStartTime = 1 + private val colStopTime = 2 + private val colCpuCount = 3 + private val colMemCapacity = 4 override fun resolve(name: String): Int { return when (name) { - RESOURCE_ID -> COL_ID - RESOURCE_START_TIME -> COL_START_TIME - RESOURCE_STOP_TIME -> COL_STOP_TIME - RESOURCE_CPU_COUNT -> COL_CPU_COUNT - RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY + resourceID -> colID + resourceStartTime -> colStartTime + resourceStopTime -> colStopTime + resourceCpuCount -> colCpuCount + resourceMemCapacity -> colMemCapacity else -> -1 } } override fun isNull(index: Int): Boolean { - require(index in 0..COL_MEM_CAPACITY) { "Invalid column index" } + require(index in 0..colMemCapacity) { "Invalid column index" } return false } @@ -107,7 +107,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe override fun getInt(index: Int): Int { checkActive() return when (index) { - COL_CPU_COUNT -> cpuCores + colCpuCount -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } @@ -115,7 +115,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe override fun getLong(index: Int): Long { checkActive() return when (index) { - COL_CPU_COUNT -> cpuCores.toLong() + colCpuCount -> cpuCores.toLong() else -> throw IllegalArgumentException("Invalid column") } } @@ -127,7 +127,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe override fun getDouble(index: Int): Double { checkActive() return when (index) { - COL_MEM_CAPACITY -> memCapacity + colMemCapacity -> memCapacity else -> throw IllegalArgumentException("Invalid column") } } @@ -135,7 +135,7 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe override fun getString(index: Int): String? { checkActive() return when (index) { - COL_ID -> id + colID -> id else -> throw IllegalArgumentException("Invalid column") } } @@ -147,8 +147,8 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe override fun getInstant(index: Int): Instant? { checkActive() return when (index) { - COL_START_TIME -> startTime - COL_STOP_TIME -> stopTime + colStartTime -> startTime + colStopTime -> stopTime else -> throw IllegalArgumentException("Invalid column") } } @@ -157,15 +157,25 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe throw IllegalArgumentException("Invalid column") } - override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + override fun <T> getList( + index: Int, + elementType: Class<T>, + ): List<T>? { throw IllegalArgumentException("Invalid column") } - override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + override fun <T> getSet( + index: Int, + elementType: Class<T>, + ): Set<T>? { throw IllegalArgumentException("Invalid column") } - override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + override fun <K, V> getMap( + index: Int, + keyType: Class<K>, + valueType: Class<V>, + ): Map<K, V>? { throw IllegalArgumentException("Invalid column") } @@ -217,19 +227,20 @@ internal class AzureResourceTableReader(private val parser: CsvParser) : TableRe /** * The [CsvSchema] that is used to parse the trace. */ - private val schema = CsvSchema.builder() - .addColumn("vm id", CsvSchema.ColumnType.NUMBER) - .addColumn("subscription id", CsvSchema.ColumnType.STRING) - .addColumn("deployment id", CsvSchema.ColumnType.NUMBER) - .addColumn("timestamp vm created", CsvSchema.ColumnType.NUMBER) - .addColumn("timestamp vm deleted", CsvSchema.ColumnType.NUMBER) - .addColumn("max cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("avg cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("p95 cpu", CsvSchema.ColumnType.NUMBER) - .addColumn("vm category", CsvSchema.ColumnType.NUMBER) - .addColumn("vm virtual core count", CsvSchema.ColumnType.NUMBER) - .addColumn("vm memory", CsvSchema.ColumnType.NUMBER) - .setAllowComments(true) - .build() + private val schema = + CsvSchema.builder() + .addColumn("vm id", CsvSchema.ColumnType.NUMBER) + .addColumn("subscription id", CsvSchema.ColumnType.STRING) + .addColumn("deployment id", CsvSchema.ColumnType.NUMBER) + .addColumn("timestamp vm created", CsvSchema.ColumnType.NUMBER) + .addColumn("timestamp vm deleted", CsvSchema.ColumnType.NUMBER) + .addColumn("max cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("avg cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("p95 cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("vm category", CsvSchema.ColumnType.NUMBER) + .addColumn("vm virtual core count", CsvSchema.ColumnType.NUMBER) + .addColumn("vm memory", CsvSchema.ColumnType.NUMBER) + .setAllowComments(true) + .build() } } diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt index 3f64c640..a75da9d9 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt @@ -28,15 +28,15 @@ import org.opendc.trace.TableColumn import org.opendc.trace.TableColumnType import org.opendc.trace.TableReader import org.opendc.trace.TableWriter -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY -import org.opendc.trace.conv.RESOURCE_START_TIME -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP -import org.opendc.trace.conv.RESOURCE_STOP_TIME import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStartTime +import org.opendc.trace.conv.resourceStateCpuUsagePct +import org.opendc.trace.conv.resourceStateTimestamp +import org.opendc.trace.conv.resourceStopTime import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.CompositeTableReader @@ -59,9 +59,10 @@ public class AzureTraceFormat : TraceFormat { /** * The [CsvFactory] used to create the parser. */ - private val factory = CsvFactory() - .enable(CsvParser.Feature.ALLOW_COMMENTS) - .enable(CsvParser.Feature.TRIM_SPACES) + private val factory = + CsvFactory() + .enable(CsvParser.Feature.ALLOW_COMMENTS) + .enable(CsvParser.Feature.TRIM_SPACES) override fun create(path: Path) { throw UnsupportedOperationException("Writing not supported for this format") @@ -69,29 +70,38 @@ public class AzureTraceFormat : TraceFormat { override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - override fun getDetails(path: Path, table: String): TableDetails { + override fun getDetails( + path: Path, + table: String, + ): TableDetails { return when (table) { - TABLE_RESOURCES -> TableDetails( - listOf( - TableColumn(RESOURCE_ID, TableColumnType.String), - TableColumn(RESOURCE_START_TIME, TableColumnType.Instant), - TableColumn(RESOURCE_STOP_TIME, TableColumnType.Instant), - TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int), - TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double) + TABLE_RESOURCES -> + TableDetails( + listOf( + TableColumn(resourceID, TableColumnType.String), + TableColumn(resourceStartTime, TableColumnType.Instant), + TableColumn(resourceStopTime, TableColumnType.Instant), + TableColumn(resourceCpuCount, TableColumnType.Int), + TableColumn(resourceMemCapacity, TableColumnType.Double), + ), ) - ) - TABLE_RESOURCE_STATES -> TableDetails( - listOf( - TableColumn(RESOURCE_ID, TableColumnType.String), - TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant), - TableColumn(RESOURCE_STATE_CPU_USAGE_PCT, TableColumnType.Double) + TABLE_RESOURCE_STATES -> + TableDetails( + listOf( + TableColumn(resourceID, TableColumnType.String), + TableColumn(resourceStateTimestamp, TableColumnType.Instant), + TableColumn(resourceStateCpuUsagePct, TableColumnType.Double), + ), ) - ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { + override fun newReader( + path: Path, + table: String, + projection: List<String>?, + ): TableReader { return when (table) { TABLE_RESOURCES -> { val stream = GZIPInputStream(path.resolve("vmtable/vmtable.csv.gz").inputStream()) @@ -102,7 +112,10 @@ public class AzureTraceFormat : TraceFormat { } } - override fun newWriter(path: Path, table: String): TableWriter { + override fun newWriter( + path: Path, + table: String, + ): TableWriter { throw UnsupportedOperationException("Writing not supported for this format") } @@ -110,10 +123,11 @@ public class AzureTraceFormat : TraceFormat { * Construct a [TableReader] for reading over all VM CPU readings. */ private fun newResourceStateReader(path: Path): TableReader { - val partitions = Files.walk(path.resolve("vm_cpu_readings"), 1) - .filter { !Files.isDirectory(it) && it.name.endsWith(".csv.gz") } - .collect(Collectors.toMap({ it.name.removeSuffix(".csv.gz") }, { it })) - .toSortedMap() + val partitions = + Files.walk(path.resolve("vm_cpu_readings"), 1) + .filter { !Files.isDirectory(it) && it.name.endsWith(".csv.gz") } + .collect(Collectors.toMap({ it.name.removeSuffix(".csv.gz") }, { it })) + .toSortedMap() val it = partitions.iterator() return object : CompositeTableReader() { diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt index 00cdc174..4fe96a8e 100644 --- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt @@ -33,13 +33,13 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.trace.TableColumn import org.opendc.trace.TableReader -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStateCpuUsagePct +import org.opendc.trace.conv.resourceStateTimestamp import org.opendc.trace.testkit.TableReaderTestKit import java.nio.file.Paths @@ -76,9 +76,9 @@ class AzureTraceFormatTest { val reader = format.newReader(path, TABLE_RESOURCES, null) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.getString(RESOURCE_ID)) }, - { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, - { assertEquals(1750000.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) } + { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.getString(resourceID)) }, + { assertEquals(1, reader.getInt(resourceCpuCount)) }, + { assertEquals(1750000.0, reader.getDouble(resourceMemCapacity)) }, ) reader.close() @@ -91,9 +91,9 @@ class AzureTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.getString(RESOURCE_ID)) }, - { assertEquals(0, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) }, - { assertEquals(0.0286979, reader.getDouble(RESOURCE_STATE_CPU_USAGE_PCT), 0.01) } + { assertEquals("+ZcrOp5/c/fJ6mVgP5qMZlOAGDwyjaaDNM0WoWOt2IDb47gT0UwK9lFwkPQv3C7Q", reader.getString(resourceID)) }, + { assertEquals(0, reader.getInstant(resourceStateTimestamp)?.epochSecond) }, + { assertEquals(0.0286979, reader.getDouble(resourceStateCpuUsagePct), 0.01) }, ) reader.close() diff --git a/opendc-trace/opendc-trace-bitbrains/build.gradle.kts b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts index 502b052a..6ca40d3d 100644 --- a/opendc-trace/opendc-trace-bitbrains/build.gradle.kts +++ b/opendc-trace/opendc-trace-bitbrains/build.gradle.kts @@ -22,7 +22,7 @@ description = "Support for GWF traces in OpenDC" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt index 511f02db..8387d1ed 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExResourceStateTableReader.kt @@ -23,18 +23,18 @@ package org.opendc.trace.bitbrains import org.opendc.trace.TableReader -import org.opendc.trace.conv.RESOURCE_CLUSTER_ID -import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY -import org.opendc.trace.conv.RESOURCE_STATE_CPU_DEMAND -import org.opendc.trace.conv.RESOURCE_STATE_CPU_READY_PCT -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT -import org.opendc.trace.conv.RESOURCE_STATE_DISK_READ -import org.opendc.trace.conv.RESOURCE_STATE_DISK_WRITE -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP +import org.opendc.trace.conv.resourceClusterID +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStateCpuDemand +import org.opendc.trace.conv.resourceStateCpuReadyPct +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateCpuUsagePct +import org.opendc.trace.conv.resourceStateDiskRead +import org.opendc.trace.conv.resourceStateDiskWrite +import org.opendc.trace.conv.resourceStateTimestamp import java.io.BufferedReader import java.time.Duration import java.time.Instant @@ -99,18 +99,18 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR val field = line.subSequence(start, end) as String when (col++) { - COL_TIMESTAMP -> timestamp = Instant.ofEpochSecond(field.toLong(10)) - COL_CPU_USAGE -> cpuUsage = field.toDouble() - COL_CPU_DEMAND -> cpuDemand = field.toDouble() - COL_DISK_READ -> diskRead = field.toDouble() - COL_DISK_WRITE -> diskWrite = field.toDouble() - COL_CLUSTER_ID -> cluster = field.trim() - COL_NCPUS -> cpuCores = field.toInt(10) - COL_CPU_READY_PCT -> cpuReadyPct = field.toDouble() - COL_POWERED_ON -> poweredOn = field.toInt(10) == 1 - COL_CPU_CAPACITY -> cpuCapacity = field.toDouble() - COL_ID -> id = field.trim() - COL_MEM_CAPACITY -> memCapacity = field.toDouble() * 1000 // Convert from MB to KB + colTimestamp -> timestamp = Instant.ofEpochSecond(field.toLong(10)) + colCpuUsage -> cpuUsage = field.toDouble() + colCpuDemand -> cpuDemand = field.toDouble() + colDiskRead -> diskRead = field.toDouble() + colDiskWrite -> diskWrite = field.toDouble() + colClusterID -> cluster = field.trim() + colNcpus -> cpuCores = field.toInt(10) + colCpuReadyPct -> cpuReadyPct = field.toDouble() + colPoweredOn -> poweredOn = field.toInt(10) == 1 + colCpuCapacity -> cpuCapacity = field.toDouble() + colID -> id = field.trim() + colMemCapacity -> memCapacity = field.toDouble() * 1000 // Convert from MB to KB } } @@ -119,31 +119,31 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR override fun resolve(name: String): Int { return when (name) { - RESOURCE_ID -> COL_ID - RESOURCE_CLUSTER_ID -> COL_CLUSTER_ID - RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP - RESOURCE_CPU_COUNT -> COL_NCPUS - RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY - RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE - RESOURCE_STATE_CPU_USAGE_PCT -> COL_CPU_USAGE_PCT - RESOURCE_STATE_CPU_DEMAND -> COL_CPU_DEMAND - RESOURCE_STATE_CPU_READY_PCT -> COL_CPU_READY_PCT - RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY - RESOURCE_STATE_DISK_READ -> COL_DISK_READ - RESOURCE_STATE_DISK_WRITE -> COL_DISK_WRITE + resourceID -> colID + resourceClusterID -> colClusterID + resourceStateTimestamp -> colTimestamp + resourceCpuCount -> colNcpus + resourceCpuCapacity -> colCpuCapacity + resourceStateCpuUsage -> colCpuUsage + resourceStateCpuUsagePct -> colCpuUsagePct + resourceStateCpuDemand -> colCpuDemand + resourceStateCpuReadyPct -> colCpuReadyPct + resourceMemCapacity -> colMemCapacity + resourceStateDiskRead -> colDiskRead + resourceStateDiskWrite -> colDiskWrite else -> -1 } } override fun isNull(index: Int): Boolean { - require(index in 0 until COL_MAX) { "Invalid column index" } + require(index in 0 until colMax) { "Invalid column index" } return false } override fun getBoolean(index: Int): Boolean { check(state == State.Active) { "No active row" } return when (index) { - COL_POWERED_ON -> poweredOn + colPoweredOn -> poweredOn else -> throw IllegalArgumentException("Invalid column") } } @@ -151,7 +151,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR override fun getInt(index: Int): Int { check(state == State.Active) { "No active row" } return when (index) { - COL_NCPUS -> cpuCores + colNcpus -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } @@ -167,14 +167,14 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR override fun getDouble(index: Int): Double { check(state == State.Active) { "No active row" } return when (index) { - COL_CPU_CAPACITY -> cpuCapacity - COL_CPU_USAGE -> cpuUsage - COL_CPU_USAGE_PCT -> cpuUsage / cpuCapacity - COL_CPU_READY_PCT -> cpuReadyPct - COL_CPU_DEMAND -> cpuDemand - COL_MEM_CAPACITY -> memCapacity - COL_DISK_READ -> diskRead - COL_DISK_WRITE -> diskWrite + colCpuCapacity -> cpuCapacity + colCpuUsage -> cpuUsage + colCpuUsagePct -> cpuUsage / cpuCapacity + colCpuReadyPct -> cpuReadyPct + colCpuDemand -> cpuDemand + colMemCapacity -> memCapacity + colDiskRead -> diskRead + colDiskWrite -> diskWrite else -> throw IllegalArgumentException("Invalid column") } } @@ -182,8 +182,8 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR override fun getString(index: Int): String? { check(state == State.Active) { "No active row" } return when (index) { - COL_ID -> id - COL_CLUSTER_ID -> cluster + colID -> id + colClusterID -> cluster else -> throw IllegalArgumentException("Invalid column") } } @@ -195,7 +195,7 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR override fun getInstant(index: Int): Instant? { check(state == State.Active) { "No active row" } return when (index) { - COL_TIMESTAMP -> timestamp + colTimestamp -> timestamp else -> throw IllegalArgumentException("Invalid column") } } @@ -204,15 +204,25 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR throw IllegalArgumentException("Invalid column") } - override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + override fun <T> getList( + index: Int, + elementType: Class<T>, + ): List<T>? { throw IllegalArgumentException("Invalid column") } - override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + override fun <T> getSet( + index: Int, + elementType: Class<T>, + ): Set<T>? { throw IllegalArgumentException("Invalid column") } - override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + override fun <K, V> getMap( + index: Int, + keyType: Class<K>, + valueType: Class<V>, + ): Map<K, V>? { throw IllegalArgumentException("Invalid column") } @@ -259,22 +269,24 @@ internal class BitbrainsExResourceStateTableReader(private val reader: BufferedR /** * Default column indices for the extended Bitbrains format. */ - private val COL_TIMESTAMP = 0 - private val COL_CPU_USAGE = 1 - private val COL_CPU_DEMAND = 2 - private val COL_DISK_READ = 4 - private val COL_DISK_WRITE = 6 - private val COL_CLUSTER_ID = 10 - private val COL_NCPUS = 12 - private val COL_CPU_READY_PCT = 13 - private val COL_POWERED_ON = 14 - private val COL_CPU_CAPACITY = 18 - private val COL_ID = 19 - private val COL_MEM_CAPACITY = 20 - private val COL_CPU_USAGE_PCT = 21 - private val COL_MAX = COL_CPU_USAGE_PCT + 1 + private val colTimestamp = 0 + private val colCpuUsage = 1 + private val colCpuDemand = 2 + private val colDiskRead = 4 + private val colDiskWrite = 6 + private val colClusterID = 10 + private val colNcpus = 12 + private val colCpuReadyPct = 13 + private val colPoweredOn = 14 + private val colCpuCapacity = 18 + private val colID = 19 + private val colMemCapacity = 20 + private val colCpuUsagePct = 21 + private val colMax = colCpuUsagePct + 1 private enum class State { - Pending, Active, Closed + Pending, + Active, + Closed, } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt index d364694c..6115953f 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt @@ -26,19 +26,19 @@ import org.opendc.trace.TableColumn import org.opendc.trace.TableColumnType import org.opendc.trace.TableReader import org.opendc.trace.TableWriter -import org.opendc.trace.conv.RESOURCE_CLUSTER_ID -import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY -import org.opendc.trace.conv.RESOURCE_STATE_CPU_DEMAND -import org.opendc.trace.conv.RESOURCE_STATE_CPU_READY_PCT -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT -import org.opendc.trace.conv.RESOURCE_STATE_DISK_READ -import org.opendc.trace.conv.RESOURCE_STATE_DISK_WRITE -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceClusterID +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStateCpuDemand +import org.opendc.trace.conv.resourceStateCpuReadyPct +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateCpuUsagePct +import org.opendc.trace.conv.resourceStateDiskRead +import org.opendc.trace.conv.resourceStateDiskWrite +import org.opendc.trace.conv.resourceStateTimestamp import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.CompositeTableReader @@ -64,36 +64,47 @@ public class BitbrainsExTraceFormat : TraceFormat { override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCE_STATES) - override fun getDetails(path: Path, table: String): TableDetails { + override fun getDetails( + path: Path, + table: String, + ): TableDetails { return when (table) { - TABLE_RESOURCE_STATES -> TableDetails( - listOf( - TableColumn(RESOURCE_ID, TableColumnType.String), - TableColumn(RESOURCE_CLUSTER_ID, TableColumnType.String), - TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant), - TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int), - TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double), - TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double), - TableColumn(RESOURCE_STATE_CPU_USAGE_PCT, TableColumnType.Double), - TableColumn(RESOURCE_STATE_CPU_DEMAND, TableColumnType.Double), - TableColumn(RESOURCE_STATE_CPU_READY_PCT, TableColumnType.Double), - TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double), - TableColumn(RESOURCE_STATE_DISK_READ, TableColumnType.Double), - TableColumn(RESOURCE_STATE_DISK_WRITE, TableColumnType.Double) + TABLE_RESOURCE_STATES -> + TableDetails( + listOf( + TableColumn(resourceID, TableColumnType.String), + TableColumn(resourceClusterID, TableColumnType.String), + TableColumn(resourceStateTimestamp, TableColumnType.Instant), + TableColumn(resourceCpuCount, TableColumnType.Int), + TableColumn(resourceCpuCapacity, TableColumnType.Double), + TableColumn(resourceStateCpuUsage, TableColumnType.Double), + TableColumn(resourceStateCpuUsagePct, TableColumnType.Double), + TableColumn(resourceStateCpuDemand, TableColumnType.Double), + TableColumn(resourceStateCpuReadyPct, TableColumnType.Double), + TableColumn(resourceMemCapacity, TableColumnType.Double), + TableColumn(resourceStateDiskRead, TableColumnType.Double), + TableColumn(resourceStateDiskWrite, TableColumnType.Double), + ), ) - ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { + override fun newReader( + path: Path, + table: String, + projection: List<String>?, + ): TableReader { return when (table) { TABLE_RESOURCE_STATES -> newResourceStateReader(path) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newWriter(path: Path, table: String): TableWriter { + override fun newWriter( + path: Path, + table: String, + ): TableWriter { throw UnsupportedOperationException("Writing not supported for this format") } @@ -101,10 +112,11 @@ public class BitbrainsExTraceFormat : TraceFormat { * Construct a [TableReader] for reading over all resource state partitions. */ private fun newResourceStateReader(path: Path): TableReader { - val partitions = Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "txt" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() + val partitions = + Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "txt" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() val it = partitions.iterator() return object : CompositeTableReader() { diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt index 65ca8a9c..e264fccb 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceStateTableReader.kt @@ -27,18 +27,18 @@ import com.fasterxml.jackson.core.JsonToken import com.fasterxml.jackson.dataformat.csv.CsvParser import com.fasterxml.jackson.dataformat.csv.CsvSchema import org.opendc.trace.TableReader -import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT -import org.opendc.trace.conv.RESOURCE_STATE_DISK_READ -import org.opendc.trace.conv.RESOURCE_STATE_DISK_WRITE -import org.opendc.trace.conv.RESOURCE_STATE_MEM_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_NET_RX -import org.opendc.trace.conv.RESOURCE_STATE_NET_TX -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateCpuUsagePct +import org.opendc.trace.conv.resourceStateDiskRead +import org.opendc.trace.conv.resourceStateDiskWrite +import org.opendc.trace.conv.resourceStateMemUsage +import org.opendc.trace.conv.resourceStateNetRx +import org.opendc.trace.conv.resourceStateNetTx +import org.opendc.trace.conv.resourceStateTimestamp import java.text.NumberFormat import java.time.Duration import java.time.Instant @@ -103,20 +103,21 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, when (parser.currentName) { "Timestamp [ms]" -> { - timestamp = when (timestampType) { - TimestampType.UNDECIDED -> { - try { - val res = LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC) - timestampType = TimestampType.DATE_TIME - res - } catch (e: DateTimeParseException) { - timestampType = TimestampType.EPOCH_MILLIS - Instant.ofEpochSecond(parser.longValue) + timestamp = + when (timestampType) { + TimestampType.UNDECIDED -> { + try { + val res = LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC) + timestampType = TimestampType.DATE_TIME + res + } catch (e: DateTimeParseException) { + timestampType = TimestampType.EPOCH_MILLIS + Instant.ofEpochSecond(parser.longValue) + } } + TimestampType.DATE_TIME -> LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC) + TimestampType.EPOCH_MILLIS -> Instant.ofEpochSecond(parser.longValue) } - TimestampType.DATE_TIME -> LocalDateTime.parse(parser.text, formatter).toInstant(ZoneOffset.UTC) - TimestampType.EPOCH_MILLIS -> Instant.ofEpochSecond(parser.longValue) - } } "CPU cores" -> cpuCores = parser.intValue "CPU capacity provisioned [MHZ]" -> cpuCapacity = parseSafeDouble() @@ -134,39 +135,39 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, return true } - private val COL_TIMESTAMP = 0 - private val COL_CPU_COUNT = 1 - private val COL_CPU_CAPACITY = 2 - private val COL_CPU_USAGE = 3 - private val COL_CPU_USAGE_PCT = 4 - private val COL_MEM_CAPACITY = 5 - private val COL_MEM_USAGE = 6 - private val COL_DISK_READ = 7 - private val COL_DISK_WRITE = 8 - private val COL_NET_RX = 9 - private val COL_NET_TX = 10 - private val COL_ID = 11 + private val colTimestamp = 0 + private val colCpuCount = 1 + private val colCpuCapacity = 2 + private val colCpuUsage = 3 + private val colCpuUsagePct = 4 + private val colMemCapacity = 5 + private val colMemUsage = 6 + private val colDiskRead = 7 + private val colDiskWrite = 8 + private val colNetRx = 9 + private val colNetTx = 10 + private val colID = 11 override fun resolve(name: String): Int { return when (name) { - RESOURCE_ID -> COL_ID - RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP - RESOURCE_CPU_COUNT -> COL_CPU_COUNT - RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY - RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE - RESOURCE_STATE_CPU_USAGE_PCT -> COL_CPU_USAGE_PCT - RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY - RESOURCE_STATE_MEM_USAGE -> COL_MEM_USAGE - RESOURCE_STATE_DISK_READ -> COL_DISK_READ - RESOURCE_STATE_DISK_WRITE -> COL_DISK_WRITE - RESOURCE_STATE_NET_RX -> COL_NET_RX - RESOURCE_STATE_NET_TX -> COL_NET_TX + resourceID -> colID + resourceStateTimestamp -> colTimestamp + resourceCpuCount -> colCpuCount + resourceCpuCapacity -> colCpuCapacity + resourceStateCpuUsage -> colCpuUsage + resourceStateCpuUsagePct -> colCpuUsagePct + resourceMemCapacity -> colMemCapacity + resourceStateMemUsage -> colMemUsage + resourceStateDiskRead -> colDiskRead + resourceStateDiskWrite -> colDiskWrite + resourceStateNetRx -> colNetRx + resourceStateNetTx -> colNetTx else -> -1 } } override fun isNull(index: Int): Boolean { - require(index in 0..COL_ID) { "Invalid column index" } + require(index in 0..colID) { "Invalid column index" } return false } @@ -177,7 +178,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun getInt(index: Int): Int { checkActive() return when (index) { - COL_CPU_COUNT -> cpuCores + colCpuCount -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } @@ -193,15 +194,15 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun getDouble(index: Int): Double { checkActive() return when (index) { - COL_CPU_CAPACITY -> cpuCapacity - COL_CPU_USAGE -> cpuUsage - COL_CPU_USAGE_PCT -> cpuUsagePct - COL_MEM_CAPACITY -> memCapacity - COL_MEM_USAGE -> memUsage - COL_DISK_READ -> diskRead - COL_DISK_WRITE -> diskWrite - COL_NET_RX -> netReceived - COL_NET_TX -> netTransmitted + colCpuCapacity -> cpuCapacity + colCpuUsage -> cpuUsage + colCpuUsagePct -> cpuUsagePct + colMemCapacity -> memCapacity + colMemUsage -> memUsage + colDiskRead -> diskRead + colDiskWrite -> diskWrite + colNetRx -> netReceived + colNetTx -> netTransmitted else -> throw IllegalArgumentException("Invalid column") } } @@ -209,7 +210,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun getString(index: Int): String { checkActive() return when (index) { - COL_ID -> partition + colID -> partition else -> throw IllegalArgumentException("Invalid column") } } @@ -221,7 +222,7 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, override fun getInstant(index: Int): Instant? { checkActive() return when (index) { - COL_TIMESTAMP -> timestamp + colTimestamp -> timestamp else -> throw IllegalArgumentException("Invalid column") } } @@ -230,15 +231,25 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, throw IllegalArgumentException("Invalid column") } - override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + override fun <T> getList( + index: Int, + elementType: Class<T>, + ): List<T>? { throw IllegalArgumentException("Invalid column") } - override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + override fun <T> getSet( + index: Int, + elementType: Class<T>, + ): Set<T>? { throw IllegalArgumentException("Invalid column") } - override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + override fun <K, V> getMap( + index: Int, + keyType: Class<K>, + valueType: Class<V>, + ): Map<K, V>? { throw IllegalArgumentException("Invalid column") } @@ -322,30 +333,33 @@ internal class BitbrainsResourceStateTableReader(private val partition: String, * The type of the timestamp in the trace. */ private enum class TimestampType { - UNDECIDED, DATE_TIME, EPOCH_MILLIS + UNDECIDED, + DATE_TIME, + EPOCH_MILLIS, } companion object { /** * The [CsvSchema] that is used to parse the trace. */ - private val schema = CsvSchema.builder() - .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER_OR_STRING) - .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER) - .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER) - .addColumn("Memory usage [%]", CsvSchema.ColumnType.NUMBER) - .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Disk size [GB]", CsvSchema.ColumnType.NUMBER) - .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER) - .setAllowComments(true) - .setUseHeader(true) - .setColumnSeparator(';') - .build() + private val schema = + CsvSchema.builder() + .addColumn("Timestamp [ms]", CsvSchema.ColumnType.NUMBER_OR_STRING) + .addColumn("CPU cores", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU capacity provisioned [MHZ]", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU usage [MHZ]", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU usage [%]", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory capacity provisioned [KB]", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory usage [KB]", CsvSchema.ColumnType.NUMBER) + .addColumn("Memory usage [%]", CsvSchema.ColumnType.NUMBER) + .addColumn("Disk read throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Disk write throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Disk size [GB]", CsvSchema.ColumnType.NUMBER) + .addColumn("Network received throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .addColumn("Network transmitted throughput [KB/s]", CsvSchema.ColumnType.NUMBER) + .setAllowComments(true) + .setUseHeader(true) + .setColumnSeparator(';') + .build() } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt index 776a8f86..a12785f0 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsResourceTableReader.kt @@ -24,7 +24,7 @@ package org.opendc.trace.bitbrains import com.fasterxml.jackson.dataformat.csv.CsvFactory import org.opendc.trace.TableReader -import org.opendc.trace.conv.RESOURCE_ID +import org.opendc.trace.conv.resourceID import java.nio.file.Path import java.time.Duration import java.time.Instant @@ -56,7 +56,7 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms val parser = factory.createParser(path.toFile()) val reader = BitbrainsResourceStateTableReader(name, parser) - val idCol = reader.resolve(RESOURCE_ID) + val idCol = reader.resolve(resourceID) try { if (!reader.nextRow()) { @@ -74,17 +74,17 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms return false } - private val COL_ID = 0 + private val colID = 0 override fun resolve(name: String): Int { return when (name) { - RESOURCE_ID -> COL_ID + resourceID -> colID else -> -1 } } override fun isNull(index: Int): Boolean { - require(index in 0..COL_ID) { "Invalid column index" } + require(index in 0..colID) { "Invalid column index" } return false } @@ -111,7 +111,7 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms override fun getString(index: Int): String? { check(state == State.Active) { "No active row" } return when (index) { - COL_ID -> id + colID -> id else -> throw IllegalArgumentException("Invalid column") } } @@ -128,15 +128,25 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms throw IllegalArgumentException("Invalid column") } - override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + override fun <T> getList( + index: Int, + elementType: Class<T>, + ): List<T>? { throw IllegalArgumentException("Invalid column") } - override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + override fun <T> getSet( + index: Int, + elementType: Class<T>, + ): Set<T>? { throw IllegalArgumentException("Invalid column") } - override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + override fun <K, V> getMap( + index: Int, + keyType: Class<K>, + valueType: Class<V>, + ): Map<K, V>? { throw IllegalArgumentException("Invalid column") } @@ -158,6 +168,8 @@ internal class BitbrainsResourceTableReader(private val factory: CsvFactory, vms } private enum class State { - Pending, Active, Closed + Pending, + Active, + Closed, } } diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt index b0809735..23853077 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt @@ -28,20 +28,20 @@ import org.opendc.trace.TableColumn import org.opendc.trace.TableColumnType import org.opendc.trace.TableReader import org.opendc.trace.TableWriter -import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT -import org.opendc.trace.conv.RESOURCE_STATE_DISK_READ -import org.opendc.trace.conv.RESOURCE_STATE_DISK_WRITE -import org.opendc.trace.conv.RESOURCE_STATE_MEM_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_NET_RX -import org.opendc.trace.conv.RESOURCE_STATE_NET_TX -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateCpuUsagePct +import org.opendc.trace.conv.resourceStateDiskRead +import org.opendc.trace.conv.resourceStateDiskWrite +import org.opendc.trace.conv.resourceStateMemUsage +import org.opendc.trace.conv.resourceStateNetRx +import org.opendc.trace.conv.resourceStateNetTx +import org.opendc.trace.conv.resourceStateTimestamp import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.CompositeTableReader @@ -63,9 +63,10 @@ public class BitbrainsTraceFormat : TraceFormat { /** * The [CsvFactory] used to create the parser. */ - private val factory = CsvFactory() - .enable(CsvParser.Feature.ALLOW_COMMENTS) - .enable(CsvParser.Feature.TRIM_SPACES) + private val factory = + CsvFactory() + .enable(CsvParser.Feature.ALLOW_COMMENTS) + .enable(CsvParser.Feature.TRIM_SPACES) override fun create(path: Path) { throw UnsupportedOperationException("Writing not supported for this format") @@ -73,40 +74,50 @@ public class BitbrainsTraceFormat : TraceFormat { override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) - override fun getDetails(path: Path, table: String): TableDetails { + override fun getDetails( + path: Path, + table: String, + ): TableDetails { return when (table) { - TABLE_RESOURCES -> TableDetails( - listOf( - TableColumn(RESOURCE_ID, TableColumnType.String) + TABLE_RESOURCES -> + TableDetails( + listOf( + TableColumn(resourceID, TableColumnType.String), + ), ) - ) - TABLE_RESOURCE_STATES -> TableDetails( - listOf( - TableColumn(RESOURCE_ID, TableColumnType.String), - TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant), - TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int), - TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double), - TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double), - TableColumn(RESOURCE_STATE_CPU_USAGE_PCT, TableColumnType.Double), - TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double), - TableColumn(RESOURCE_STATE_MEM_USAGE, TableColumnType.Double), - TableColumn(RESOURCE_STATE_DISK_READ, TableColumnType.Double), - TableColumn(RESOURCE_STATE_DISK_WRITE, TableColumnType.Double), - TableColumn(RESOURCE_STATE_NET_RX, TableColumnType.Double), - TableColumn(RESOURCE_STATE_NET_TX, TableColumnType.Double) + TABLE_RESOURCE_STATES -> + TableDetails( + listOf( + TableColumn(resourceID, TableColumnType.String), + TableColumn(resourceStateTimestamp, TableColumnType.Instant), + TableColumn(resourceCpuCount, TableColumnType.Int), + TableColumn(resourceCpuCapacity, TableColumnType.Double), + TableColumn(resourceStateCpuUsage, TableColumnType.Double), + TableColumn(resourceStateCpuUsagePct, TableColumnType.Double), + TableColumn(resourceMemCapacity, TableColumnType.Double), + TableColumn(resourceStateMemUsage, TableColumnType.Double), + TableColumn(resourceStateDiskRead, TableColumnType.Double), + TableColumn(resourceStateDiskWrite, TableColumnType.Double), + TableColumn(resourceStateNetRx, TableColumnType.Double), + TableColumn(resourceStateNetTx, TableColumnType.Double), + ), ) - ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { + override fun newReader( + path: Path, + table: String, + projection: List<String>?, + ): TableReader { return when (table) { TABLE_RESOURCES -> { - val vms = Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() + val vms = + Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() BitbrainsResourceTableReader(factory, vms) } TABLE_RESOURCE_STATES -> newResourceStateReader(path) @@ -114,7 +125,10 @@ public class BitbrainsTraceFormat : TraceFormat { } } - override fun newWriter(path: Path, table: String): TableWriter { + override fun newWriter( + path: Path, + table: String, + ): TableWriter { throw UnsupportedOperationException("Writing not supported for this format") } @@ -122,10 +136,11 @@ public class BitbrainsTraceFormat : TraceFormat { * Construct a [TableReader] for reading over all resource state partitions. */ private fun newResourceStateReader(path: Path): TableReader { - val partitions = Files.walk(path, 1) - .filter { !Files.isDirectory(it) && it.extension == "csv" } - .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) - .toSortedMap() + val partitions = + Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() val it = partitions.iterator() return object : CompositeTableReader() { diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt index e8c7094b..18c59fb8 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt @@ -33,9 +33,9 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.trace.TableColumn import org.opendc.trace.TableReader -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateTimestamp import org.opendc.trace.testkit.TableReaderTestKit import java.nio.file.Paths @@ -72,8 +72,8 @@ internal class BitbrainsExTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(1631911500, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) }, - { assertEquals(21.2, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) } + { assertEquals(1631911500, reader.getInstant(resourceStateTimestamp)?.epochSecond) }, + { assertEquals(21.2, reader.getDouble(resourceStateCpuUsage), 0.01) }, ) reader.close() diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt index edab8747..8ff13852 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt @@ -34,11 +34,11 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.trace.TableColumn import org.opendc.trace.TableReader -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateTimestamp import org.opendc.trace.testkit.TableReaderTestKit import java.nio.file.Paths @@ -75,8 +75,8 @@ class BitbrainsTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("bitbrains", reader.getString(RESOURCE_ID)) }, - { assertFalse(reader.nextRow()) } + { assertEquals("bitbrains", reader.getString(resourceID)) }, + { assertFalse(reader.nextRow()) }, ) reader.close() @@ -89,8 +89,8 @@ class BitbrainsTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals(1376314846, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) }, - { assertEquals(19.066, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) } + { assertEquals(1376314846, reader.getInstant(resourceStateTimestamp)?.epochSecond) }, + { assertEquals(19.066, reader.getDouble(resourceStateCpuUsage), 0.01) }, ) reader.close() diff --git a/opendc-trace/opendc-trace-calcite/build.gradle.kts b/opendc-trace/opendc-trace-calcite/build.gradle.kts index 2ffdac3c..848e00da 100644 --- a/opendc-trace/opendc-trace-calcite/build.gradle.kts +++ b/opendc-trace/opendc-trace-calcite/build.gradle.kts @@ -22,7 +22,7 @@ description = "Apache Calcite (SQL) integration for the OpenDC trace library" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt index 74bd188b..eed52ab3 100644 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt @@ -36,7 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean internal class TraceReaderEnumerator<E>( private val reader: TableReader, private val columns: List<TableColumn>, - private val cancelFlag: AtomicBoolean + private val cancelFlag: AtomicBoolean, ) : Enumerator<E> { private val columnIndices = columns.map { reader.resolve(it.name) }.toIntArray() private var current: E? = null @@ -80,7 +80,11 @@ internal class TraceReaderEnumerator<E>( return res } - private fun convertColumn(reader: TableReader, column: TableColumn, columnIndex: Int): Any? { + private fun convertColumn( + reader: TableReader, + column: TableColumn, + columnIndex: Int, + ): Any? { return when (column.type) { is TableColumnType.Boolean -> reader.getBoolean(columnIndex) is TableColumnType.Int -> reader.getInt(columnIndex) diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt index 3c6badc8..cbf7ec43 100644 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt @@ -36,7 +36,11 @@ import java.nio.file.Paths * This factory allows users to include a schema that references a trace in a `model.json` file. */ public class TraceSchemaFactory : SchemaFactory { - override fun create(parentSchema: SchemaPlus, name: String, operand: Map<String, Any>): Schema { + override fun create( + parentSchema: SchemaPlus, + name: String, + operand: Map<String, Any>, + ): Schema { val base = operand[ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName] as File? val pathParam = requireNotNull(operand["path"]) { "Trace path not specified" } as String val path = if (base != null) File(base, pathParam).toPath() else Paths.get(pathParam) diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt index 2dd02710..e74d2ee8 100644 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt @@ -71,7 +71,11 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : return rowType } - override fun scan(root: DataContext, filters: MutableList<RexNode>, projects: IntArray?): Enumerable<Array<Any?>> { + override fun scan( + root: DataContext, + filters: MutableList<RexNode>, + projects: IntArray?, + ): Enumerable<Array<Any?>> { // Filters are currently not supported by the OpenDC trace API. By keeping the filters in the list, Calcite // assumes that they are declined and will perform the filters itself. @@ -130,14 +134,18 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : return rowCount } - override fun <T> asQueryable(queryProvider: QueryProvider, schema: SchemaPlus, tableName: String): Queryable<T> { + override fun <T> asQueryable( + queryProvider: QueryProvider, + schema: SchemaPlus, + tableName: String, + ): Queryable<T> { return object : AbstractTableQueryable<T>(queryProvider, schema, this@TraceTable, tableName) { override fun enumerator(): Enumerator<T> { val cancelFlag = AtomicBoolean(false) return TraceReaderEnumerator( this@TraceTable.table.newReader(), this@TraceTable.table.columns, - cancelFlag + cancelFlag, ) } @@ -155,7 +163,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : operation: TableModify.Operation, updateColumnList: MutableList<String>?, sourceExpressionList: MutableList<RexNode>?, - flattened: Boolean + flattened: Boolean, ): TableModify { cluster.planner.addRule(TraceTableModifyRule.DEFAULT.toRule()) @@ -166,7 +174,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : operation, updateColumnList, sourceExpressionList, - flattened + flattened, ) } @@ -184,7 +192,10 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : return typeFactory.createStructType(types, names) } - private fun mapType(typeFactory: JavaTypeFactory, type: TableColumnType): RelDataType { + private fun mapType( + typeFactory: JavaTypeFactory, + type: TableColumnType, + ): RelDataType { return when (type) { is TableColumnType.Boolean -> typeFactory.createSqlType(SqlTypeName.BOOLEAN) is TableColumnType.Int -> typeFactory.createSqlType(SqlTypeName.INTEGER) diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt index cc23854f..eedff00d 100644 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt @@ -59,7 +59,7 @@ internal class TraceTableModify( operation: Operation, updateColumnList: List<String>?, sourceExpressionList: List<RexNode>?, - flattened: Boolean + flattened: Boolean, ) : TableModify(cluster, traitSet, table, schema, input, operation, updateColumnList, sourceExpressionList, flattened), EnumerableRel { init { @@ -67,7 +67,10 @@ internal class TraceTableModify( table.unwrap(ModifiableTable::class.java) ?: throw AssertionError() // TODO: user error in validator } - override fun copy(traitSet: RelTraitSet, inputs: List<RelNode>?): RelNode { + override fun copy( + traitSet: RelTraitSet, + inputs: List<RelNode>?, + ): RelNode { return TraceTableModify( cluster, traitSet, @@ -77,40 +80,48 @@ internal class TraceTableModify( operation, updateColumnList, sourceExpressionList, - isFlattened + isFlattened, ) } - override fun computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery?): RelOptCost { + override fun computeSelfCost( + planner: RelOptPlanner, + mq: RelMetadataQuery?, + ): RelOptCost { // Prefer this plan compared to the standard EnumerableTableModify. return super.computeSelfCost(planner, mq)!!.multiplyBy(.1) } - override fun implement(implementor: EnumerableRelImplementor, pref: Prefer): EnumerableRel.Result { + override fun implement( + implementor: EnumerableRelImplementor, + pref: Prefer, + ): EnumerableRel.Result { val builder = BlockBuilder() val result = implementor.visitChild(this, 0, getInput() as EnumerableRel, pref) val childExp = builder.append("child", result.block) - val convertedChildExpr = if (getInput().rowType != rowType) { - val typeFactory = cluster.typeFactory as JavaTypeFactory - val format = EnumerableTableScan.deduceFormat(table) - val physType = PhysTypeImpl.of(typeFactory, table.rowType, format) - val childPhysType = result.physType - val o = Expressions.parameter(childPhysType.javaRowType, "o") - val expressionList = List(childPhysType.rowType.fieldCount) { i -> - childPhysType.fieldReference(o, i, physType.getJavaFieldType(i)) - } + val convertedChildExpr = + if (getInput().rowType != rowType) { + val typeFactory = cluster.typeFactory as JavaTypeFactory + val format = EnumerableTableScan.deduceFormat(table) + val physType = PhysTypeImpl.of(typeFactory, table.rowType, format) + val childPhysType = result.physType + val o = Expressions.parameter(childPhysType.javaRowType, "o") + val expressionList = + List(childPhysType.rowType.fieldCount) { i -> + childPhysType.fieldReference(o, i, physType.getJavaFieldType(i)) + } - builder.append( - "convertedChild", - Expressions.call( - childExp, - BuiltInMethod.SELECT.method, - Expressions.lambda<org.apache.calcite.linq4j.function.Function<*>>(physType.record(expressionList), o) + builder.append( + "convertedChild", + Expressions.call( + childExp, + BuiltInMethod.SELECT.method, + Expressions.lambda<org.apache.calcite.linq4j.function.Function<*>>(physType.record(expressionList), o), + ), ) - ) - } else { - childExp - } + } else { + childExp + } if (!isInsert) { throw UnsupportedOperationException("Deletion and update not supported") @@ -126,10 +137,10 @@ internal class TraceTableModify( Long::class.java, expression, INSERT_METHOD, - convertedChildExpr - ) - ) - ) + convertedChildExpr, + ), + ), + ), ) val rowFormat = if (pref === Prefer.ARRAY) JavaRowFormat.ARRAY else JavaRowFormat.SCALAR diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt index 7572e381..9c560984 100644 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt @@ -52,14 +52,20 @@ internal class TraceTableModifyRule(config: Config) : ConverterRule(config) { modify.operation, modify.updateColumnList, modify.sourceExpressionList, - modify.isFlattened + modify.isFlattened, ) } companion object { /** Default configuration. */ - val DEFAULT: Config = Config.INSTANCE - .withConversion(LogicalTableModify::class.java, Convention.NONE, EnumerableConvention.INSTANCE, "TraceTableModificationRule") - .withRuleFactory { config: Config -> TraceTableModifyRule(config) } + val DEFAULT: Config = + Config.INSTANCE + .withConversion( + LogicalTableModify::class.java, + Convention.NONE, + EnumerableConvention.INSTANCE, + "TraceTableModificationRule", + ) + .withRuleFactory { config: Config -> TraceTableModifyRule(config) } } } diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt index 64bb31c9..93b15e5f 100644 --- a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt +++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt @@ -71,7 +71,7 @@ class CalciteTest { { assertEquals("1052", rs.getString("id")) }, { assertTrue(rs.next()) }, { assertEquals("1073", rs.getString("id")) }, - { assertFalse(rs.next()) } + { assertFalse(rs.next()) }, ) } } @@ -86,7 +86,7 @@ class CalciteTest { { assertEquals(300000, rs.getLong("duration")) }, { assertEquals(0.0, rs.getDouble("cpu_usage")) }, { assertTrue(rs.next()) }, - { assertEquals("1019", rs.getString("id")) } + { assertEquals("1019", rs.getString("id")) }, ) } } @@ -98,7 +98,7 @@ class CalciteTest { { assertTrue(rs.next()) }, { assertArrayEquals(arrayOf("1019", "1023", "1052"), rs.getArray("members").array as Array<*>) }, { assertEquals(0.0, rs.getDouble("target")) }, - { assertEquals(0.8830158730158756, rs.getDouble("score")) } + { assertEquals(0.8830158730158756, rs.getDouble("score")) }, ) } } @@ -109,7 +109,7 @@ class CalciteTest { assertAll( { assertTrue(rs.next()) }, { assertEquals(249.59993808, rs.getDouble("max_cpu_usage")) }, - { assertEquals(5.387240309118493, rs.getDouble("avg_cpu_usage")) } + { assertEquals(5.387240309118493, rs.getDouble("avg_cpu_usage")) }, ) } } @@ -120,12 +120,13 @@ class CalciteTest { val newTrace = Trace.create(tmp, "opendc-vm") runStatement(newTrace) { stmt -> - val count = stmt.executeUpdate( - """ - INSERT INTO trace.resources (id, start_time, stop_time, cpu_count, cpu_capacity, mem_capacity) - VALUES (1234, '2013-08-12 13:35:46.0', '2013-09-11 13:39:58.0', 1, 2926.0, 1024.0) - """.trimIndent() - ) + val count = + stmt.executeUpdate( + """ + INSERT INTO trace.resources (id, start_time, stop_time, cpu_count, cpu_capacity, mem_capacity) + VALUES (1234, '2013-08-12 13:35:46.0', '2013-09-11 13:39:58.0', 1, 2926.0, 1024.0) + """.trimIndent(), + ) assertEquals(1, count) } @@ -136,7 +137,7 @@ class CalciteTest { { assertEquals(1, rs.getInt("cpu_count")) }, { assertEquals(Timestamp.valueOf("2013-08-12 13:35:46.0"), rs.getTimestamp("start_time")) }, { assertEquals(2926.0, rs.getDouble("cpu_capacity")) }, - { assertEquals(1024.0, rs.getDouble("mem_capacity")) } + { assertEquals(1024.0, rs.getDouble("mem_capacity")) }, ) } } @@ -145,9 +146,10 @@ class CalciteTest { fun testUUID() { val trace = mockk<Trace>() every { trace.tables } returns listOf(TABLE_RESOURCES) - every { trace.getTable(TABLE_RESOURCES)!!.columns } returns listOf( - TableColumn("id", TableColumnType.UUID) - ) + every { trace.getTable(TABLE_RESOURCES)!!.columns } returns + listOf( + TableColumn("id", TableColumnType.UUID), + ) every { trace.getTable(TABLE_RESOURCES)!!.newReader() } answers { object : TableReader { override fun nextRow(): Boolean = true @@ -195,15 +197,25 @@ class CalciteTest { TODO("not implemented") } - override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + override fun <T> getList( + index: Int, + elementType: Class<T>, + ): List<T>? { TODO("not implemented") } - override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + override fun <T> getSet( + index: Int, + elementType: Class<T>, + ): Set<T>? { TODO("not implemented") } - override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + override fun <K, V> getMap( + index: Int, + keyType: Class<K>, + valueType: Class<V>, + ): Map<K, V>? { TODO("not implemented") } @@ -214,7 +226,7 @@ class CalciteTest { runQuery(trace, "SELECT id FROM trace.resources") { rs -> assertAll( { assertTrue(rs.next()) }, - { assertArrayEquals(byteArrayOf(0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2), rs.getBytes("id")) } + { assertArrayEquals(byteArrayOf(0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2), rs.getBytes("id")) }, ) } } @@ -222,7 +234,11 @@ class CalciteTest { /** * Helper function to run statement for the specified trace. */ - private fun runQuery(trace: Trace, query: String, block: (ResultSet) -> Unit) { + private fun runQuery( + trace: Trace, + query: String, + block: (ResultSet) -> Unit, + ) { runStatement(trace) { stmt -> val rs = stmt.executeQuery(query) rs.use { block(rs) } @@ -232,7 +248,10 @@ class CalciteTest { /** * Helper function to run statement for the specified trace. */ - private fun runStatement(trace: Trace, block: (Statement) -> Unit) { + private fun runStatement( + trace: Trace, + block: (Statement) -> Unit, + ) { val info = Properties() info.setProperty("lex", "JAVA") val connection = DriverManager.getConnection("jdbc:calcite:", info).unwrap(CalciteConnection::class.java) diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt index 735cedce..eb4bc769 100644 --- a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt +++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt @@ -48,7 +48,7 @@ class TraceSchemaFactoryTest { { assertEquals("1019", rs.getString("id")) }, { assertEquals(1, rs.getInt("cpu_count")) }, { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("start_time")) }, - { assertEquals(181352.0, rs.getDouble("mem_capacity")) } + { assertEquals(181352.0, rs.getDouble("mem_capacity")) }, ) } finally { rs.close() diff --git a/opendc-trace/opendc-trace-gwf/build.gradle.kts b/opendc-trace/opendc-trace-gwf/build.gradle.kts index 0c041439..4d0bd796 100644 --- a/opendc-trace/opendc-trace-gwf/build.gradle.kts +++ b/opendc-trace/opendc-trace-gwf/build.gradle.kts @@ -22,7 +22,7 @@ description = "Support for GWF traces in OpenDC" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt index 78ce6ad4..8a2a99cb 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTaskTableReader.kt @@ -88,19 +88,19 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { override fun resolve(name: String): Int { return when (name) { - TASK_ID -> COL_JOB_ID - TASK_WORKFLOW_ID -> COL_WORKFLOW_ID - TASK_SUBMIT_TIME -> COL_SUBMIT_TIME - TASK_RUNTIME -> COL_RUNTIME - TASK_ALLOC_NCPUS -> COL_NPROC - TASK_REQ_NCPUS -> COL_REQ_NPROC - TASK_PARENTS -> COL_DEPS + TASK_ID -> colJobID + TASK_WORKFLOW_ID -> colWorkflowID + TASK_SUBMIT_TIME -> colSubmitTime + TASK_RUNTIME -> colRuntime + TASK_ALLOC_NCPUS -> colNproc + TASK_REQ_NCPUS -> colReqNproc + TASK_PARENTS -> colDeps else -> -1 } } override fun isNull(index: Int): Boolean { - require(index in 0..COL_DEPS) { "Invalid column" } + require(index in 0..colDeps) { "Invalid column" } return false } @@ -111,8 +111,8 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { override fun getInt(index: Int): Int { checkActive() return when (index) { - COL_REQ_NPROC -> reqNProcs - COL_NPROC -> nProcs + colReqNproc -> reqNProcs + colNproc -> nProcs else -> throw IllegalArgumentException("Invalid column") } } @@ -132,8 +132,8 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { override fun getString(index: Int): String? { checkActive() return when (index) { - COL_JOB_ID -> jobId - COL_WORKFLOW_ID -> workflowId + colJobID -> jobId + colWorkflowID -> workflowId else -> throw IllegalArgumentException("Invalid column") } } @@ -145,7 +145,7 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { override fun getInstant(index: Int): Instant? { checkActive() return when (index) { - COL_SUBMIT_TIME -> submitTime + colSubmitTime -> submitTime else -> throw IllegalArgumentException("Invalid column") } } @@ -153,23 +153,33 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { override fun getDuration(index: Int): Duration? { checkActive() return when (index) { - COL_RUNTIME -> runtime + colRuntime -> runtime else -> throw IllegalArgumentException("Invalid column") } } - override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + override fun <T> getList( + index: Int, + elementType: Class<T>, + ): List<T>? { throw IllegalArgumentException("Invalid column") } - override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + override fun <K, V> getMap( + index: Int, + keyType: Class<K>, + valueType: Class<V>, + ): Map<K, V>? { throw IllegalArgumentException("Invalid column") } - override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + override fun <T> getSet( + index: Int, + elementType: Class<T>, + ): Set<T>? { checkActive() return when (index) { - COL_DEPS -> TYPE_DEPS.convertTo(dependencies, elementType) + colDeps -> typeDeps.convertTo(dependencies, elementType) else -> throw IllegalArgumentException("Invalid column") } } @@ -245,31 +255,32 @@ internal class GwfTaskTableReader(private val parser: CsvParser) : TableReader { dependencies = emptySet() } - private val COL_WORKFLOW_ID = 0 - private val COL_JOB_ID = 1 - private val COL_SUBMIT_TIME = 2 - private val COL_RUNTIME = 3 - private val COL_NPROC = 4 - private val COL_REQ_NPROC = 5 - private val COL_DEPS = 6 + private val colWorkflowID = 0 + private val colJobID = 1 + private val colSubmitTime = 2 + private val colRuntime = 3 + private val colNproc = 4 + private val colReqNproc = 5 + private val colDeps = 6 - private val TYPE_DEPS = TableColumnType.Set(TableColumnType.String) + private val typeDeps = TableColumnType.Set(TableColumnType.String) companion object { /** * The [CsvSchema] that is used to parse the trace. */ - private val schema = CsvSchema.builder() - .addColumn("WorkflowID", CsvSchema.ColumnType.NUMBER) - .addColumn("JobID", CsvSchema.ColumnType.NUMBER) - .addColumn("SubmitTime", CsvSchema.ColumnType.NUMBER) - .addColumn("RunTime", CsvSchema.ColumnType.NUMBER) - .addColumn("NProcs", CsvSchema.ColumnType.NUMBER) - .addColumn("ReqNProcs", CsvSchema.ColumnType.NUMBER) - .addColumn("Dependencies", CsvSchema.ColumnType.STRING) - .setAllowComments(true) - .setUseHeader(true) - .setColumnSeparator(',') - .build() + private val schema = + CsvSchema.builder() + .addColumn("WorkflowID", CsvSchema.ColumnType.NUMBER) + .addColumn("JobID", CsvSchema.ColumnType.NUMBER) + .addColumn("SubmitTime", CsvSchema.ColumnType.NUMBER) + .addColumn("RunTime", CsvSchema.ColumnType.NUMBER) + .addColumn("NProcs", CsvSchema.ColumnType.NUMBER) + .addColumn("ReqNProcs", CsvSchema.ColumnType.NUMBER) + .addColumn("Dependencies", CsvSchema.ColumnType.STRING) + .setAllowComments(true) + .setUseHeader(true) + .setColumnSeparator(',') + .build() } } diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt index d2ded0ee..097c5593 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt @@ -52,9 +52,10 @@ public class GwfTraceFormat : TraceFormat { /** * The [CsvFactory] used to create the parser. */ - private val factory = CsvFactory() - .enable(CsvParser.Feature.ALLOW_COMMENTS) - .enable(CsvParser.Feature.TRIM_SPACES) + private val factory = + CsvFactory() + .enable(CsvParser.Feature.ALLOW_COMMENTS) + .enable(CsvParser.Feature.TRIM_SPACES) override fun create(path: Path) { throw UnsupportedOperationException("Writing not supported for this format") @@ -62,31 +63,42 @@ public class GwfTraceFormat : TraceFormat { override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) - override fun getDetails(path: Path, table: String): TableDetails { + override fun getDetails( + path: Path, + table: String, + ): TableDetails { return when (table) { - TABLE_TASKS -> TableDetails( - listOf( - TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), - TableColumn(TASK_ID, TableColumnType.String), - TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), - TableColumn(TASK_RUNTIME, TableColumnType.Duration), - TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), - TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int), - TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)) + TABLE_TASKS -> + TableDetails( + listOf( + TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + ), ) - ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { + override fun newReader( + path: Path, + table: String, + projection: List<String>?, + ): TableReader { return when (table) { TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile())) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newWriter(path: Path, table: String): TableWriter { + override fun newWriter( + path: Path, + table: String, + ): TableWriter { throw UnsupportedOperationException("Writing not supported for this format") } } diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index c75e86df..9c97547a 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -82,7 +82,7 @@ internal class GwfTraceFormatTest { { assertEquals("1", reader.getString(TASK_ID)) }, { assertEquals(Instant.ofEpochSecond(16), reader.getInstant(TASK_SUBMIT_TIME)) }, { assertEquals(Duration.ofSeconds(11), reader.getDuration(TASK_RUNTIME)) }, - { assertEquals(emptySet<String>(), reader.getSet(TASK_PARENTS, String::class.java)) } + { assertEquals(emptySet<String>(), reader.getSet(TASK_PARENTS, String::class.java)) }, ) } @@ -101,7 +101,7 @@ internal class GwfTraceFormatTest { { assertEquals("7", reader.getString(TASK_ID)) }, { assertEquals(Instant.ofEpochSecond(87), reader.getInstant(TASK_SUBMIT_TIME)) }, { assertEquals(Duration.ofSeconds(11), reader.getDuration(TASK_RUNTIME)) }, - { assertEquals(setOf("4", "5", "6"), reader.getSet(TASK_PARENTS, String::class.java)) } + { assertEquals(setOf("4", "5", "6"), reader.getSet(TASK_PARENTS, String::class.java)) }, ) } diff --git a/opendc-trace/opendc-trace-opendc/build.gradle.kts b/opendc-trace/opendc-trace-opendc/build.gradle.kts index 18967136..d9ed15f8 100644 --- a/opendc-trace/opendc-trace-opendc/build.gradle.kts +++ b/opendc-trace/opendc-trace-opendc/build.gradle.kts @@ -22,7 +22,7 @@ description = "Support for OpenDC-specific trace formats" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` `benchmark-conventions` diff --git a/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt b/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt index e504cf2f..e179e261 100644 --- a/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt +++ b/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt @@ -23,10 +23,10 @@ package org.opendc.trace.opendc import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE -import org.opendc.trace.conv.RESOURCE_ID import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceID import org.opendc.trace.spi.TraceFormat import org.openjdk.jmh.annotations.Benchmark import org.openjdk.jmh.annotations.Fork @@ -60,7 +60,7 @@ class OdcVmTraceBenchmarks { fun benchmarkResourcesReader(bh: Blackhole) { val reader = format.newReader(path, TABLE_RESOURCES, null) try { - val idColumn = reader.resolve(RESOURCE_ID) + val idColumn = reader.resolve(resourceID) while (reader.nextRow()) { bh.consume(reader.getString(idColumn)) } @@ -73,7 +73,7 @@ class OdcVmTraceBenchmarks { fun benchmarkResourceStatesReader(bh: Blackhole) { val reader = format.newReader(path, TABLE_RESOURCE_STATES, null) try { - val idColumn = reader.resolve(RESOURCE_ID) + val idColumn = reader.resolve(resourceID) while (reader.nextRow()) { bh.consume(reader.getString(idColumn)) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt index 3e1fca06..7bf48f1a 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt @@ -65,24 +65,24 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) } } - private val COL_MEMBERS = 0 - private val COL_TARGET = 1 - private val COL_SCORE = 2 + private val colMembers = 0 + private val colTarget = 1 + private val colScore = 2 - private val TYPE_MEMBERS = TableColumnType.Set(TableColumnType.String) + private val typeMembers = TableColumnType.Set(TableColumnType.String) override fun resolve(name: String): Int { return when (name) { - INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS - INTERFERENCE_GROUP_TARGET -> COL_TARGET - INTERFERENCE_GROUP_SCORE -> COL_SCORE + INTERFERENCE_GROUP_MEMBERS -> colMembers + INTERFERENCE_GROUP_TARGET -> colTarget + INTERFERENCE_GROUP_SCORE -> colScore else -> -1 } } override fun isNull(index: Int): Boolean { return when (index) { - COL_MEMBERS, COL_TARGET, COL_SCORE -> false + colMembers, colTarget, colScore -> false else -> throw IllegalArgumentException("Invalid column index $index") } } @@ -106,8 +106,8 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) override fun getDouble(index: Int): Double { checkActive() return when (index) { - COL_TARGET -> targetLoad - COL_SCORE -> score + colTarget -> targetLoad + colScore -> score else -> throw IllegalArgumentException("Invalid column $index") } } @@ -128,19 +128,29 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) throw IllegalArgumentException("Invalid column $index") } - override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + override fun <T> getList( + index: Int, + elementType: Class<T>, + ): List<T>? { throw IllegalArgumentException("Invalid column $index") } - override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + override fun <T> getSet( + index: Int, + elementType: Class<T>, + ): Set<T>? { checkActive() return when (index) { - COL_MEMBERS -> TYPE_MEMBERS.convertTo(members, elementType) + colMembers -> typeMembers.convertTo(members, elementType) else -> throw IllegalArgumentException("Invalid column $index") } } - override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + override fun <K, V> getMap( + index: Int, + keyType: Class<K>, + valueType: Class<V>, + ): Map<K, V>? { throw IllegalArgumentException("Invalid column $index") } @@ -196,7 +206,10 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) /** * Parse the members of a group. */ - private fun parseGroupMembers(parser: JsonParser, members: MutableSet<String>) { + private fun parseGroupMembers( + parser: JsonParser, + members: MutableSet<String>, + ) { if (!parser.isExpectedStartArrayToken) { throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}") } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt index c6905c5b..93f5a976 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt @@ -70,70 +70,106 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener override fun resolve(name: String): Int { return when (name) { - INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS - INTERFERENCE_GROUP_TARGET -> COL_TARGET - INTERFERENCE_GROUP_SCORE -> COL_SCORE + INTERFERENCE_GROUP_MEMBERS -> colMembers + INTERFERENCE_GROUP_TARGET -> colTarget + INTERFERENCE_GROUP_SCORE -> colScore else -> -1 } } - override fun setBoolean(index: Int, value: Boolean) { + override fun setBoolean( + index: Int, + value: Boolean, + ) { throw IllegalArgumentException("Invalid column $index") } - override fun setInt(index: Int, value: Int) { + override fun setInt( + index: Int, + value: Int, + ) { throw IllegalArgumentException("Invalid column $index") } - override fun setLong(index: Int, value: Long) { + override fun setLong( + index: Int, + value: Long, + ) { throw IllegalArgumentException("Invalid column $index") } - override fun setFloat(index: Int, value: Float) { + override fun setFloat( + index: Int, + value: Float, + ) { throw IllegalArgumentException("Invalid column $index") } - override fun setDouble(index: Int, value: Double) { + override fun setDouble( + index: Int, + value: Double, + ) { check(isRowActive) { "No active row" } when (index) { - COL_TARGET -> targetLoad = (value as Number).toDouble() - COL_SCORE -> score = (value as Number).toDouble() + colTarget -> targetLoad = (value as Number).toDouble() + colScore -> score = (value as Number).toDouble() else -> throw IllegalArgumentException("Invalid column $index") } } - override fun setString(index: Int, value: String) { + override fun setString( + index: Int, + value: String, + ) { throw IllegalArgumentException("Invalid column $index") } - override fun setUUID(index: Int, value: UUID) { + override fun setUUID( + index: Int, + value: UUID, + ) { throw IllegalArgumentException("Invalid column $index") } - override fun setInstant(index: Int, value: Instant) { + override fun setInstant( + index: Int, + value: Instant, + ) { throw IllegalArgumentException("Invalid column $index") } - override fun setDuration(index: Int, value: Duration) { + override fun setDuration( + index: Int, + value: Duration, + ) { throw IllegalArgumentException("Invalid column $index") } - override fun <T> setList(index: Int, value: List<T>) { + override fun <T> setList( + index: Int, + value: List<T>, + ) { throw IllegalArgumentException("Invalid column $index") } - override fun <T> setSet(index: Int, value: Set<T>) { + override fun <T> setSet( + index: Int, + value: Set<T>, + ) { check(isRowActive) { "No active row" } @Suppress("UNCHECKED_CAST") when (index) { - COL_MEMBERS -> members = value as Set<String> + colMembers -> members = value as Set<String> else -> throw IllegalArgumentException("Invalid column index $index") } } - override fun <K, V> setMap(index: Int, value: Map<K, V>) { + override fun <K, V> setMap( + index: Int, + value: Map<K, V>, + ) { throw IllegalArgumentException("Invalid column $index") } @@ -146,9 +182,9 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener generator.close() } - private val COL_MEMBERS = 0 - private val COL_TARGET = 1 - private val COL_SCORE = 2 + private val colMembers = 0 + private val colTarget = 1 + private val colScore = 2 private var members = emptySet<String>() private var targetLoad = Double.POSITIVE_INFINITY diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index ff9a98d7..8e54f2b0 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -23,11 +23,11 @@ package org.opendc.trace.opendc import org.opendc.trace.TableReader -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_DURATION -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateDuration +import org.opendc.trace.conv.resourceStateTimestamp import org.opendc.trace.opendc.parquet.ResourceState import org.opendc.trace.util.parquet.LocalParquetReader import java.time.Duration @@ -55,25 +55,25 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea } } - private val COL_ID = 0 - private val COL_TIMESTAMP = 1 - private val COL_DURATION = 2 - private val COL_CPU_COUNT = 3 - private val COL_CPU_USAGE = 4 + private val colID = 0 + private val colTimestamp = 1 + private val colDuration = 2 + private val colCpuCount = 3 + private val colCpuUsage = 4 override fun resolve(name: String): Int { return when (name) { - RESOURCE_ID -> COL_ID - RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP - RESOURCE_STATE_DURATION -> COL_DURATION - RESOURCE_CPU_COUNT -> COL_CPU_COUNT - RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE + resourceID -> colID + resourceStateTimestamp -> colTimestamp + resourceStateDuration -> colDuration + resourceCpuCount -> colCpuCount + resourceStateCpuUsage -> colCpuUsage else -> -1 } } override fun isNull(index: Int): Boolean { - require(index in 0..COL_CPU_USAGE) { "Invalid column index" } + require(index in 0..colCpuUsage) { "Invalid column index" } return false } @@ -84,7 +84,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea override fun getInt(index: Int): Int { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_CPU_COUNT -> record.cpuCount + colCpuCount -> record.cpuCount else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } @@ -100,7 +100,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_CPU_USAGE -> record.cpuUsage + colCpuUsage -> record.cpuUsage else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } @@ -109,7 +109,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_ID -> record.id + colID -> record.id else -> throw IllegalArgumentException("Invalid column index $index") } } @@ -122,7 +122,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_TIMESTAMP -> record.timestamp + colTimestamp -> record.timestamp else -> throw IllegalArgumentException("Invalid column index $index") } } @@ -131,20 +131,30 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_DURATION -> record.duration + colDuration -> record.duration else -> throw IllegalArgumentException("Invalid column index $index") } } - override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + override fun <T> getList( + index: Int, + elementType: Class<T>, + ): List<T>? { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + override fun <T> getSet( + index: Int, + elementType: Class<T>, + ): Set<T>? { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + override fun <K, V> getMap( + index: Int, + keyType: Class<K>, + valueType: Class<V>, + ): Map<K, V>? { throw IllegalArgumentException("Invalid column or type [index $index]") } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt index cf0a401b..01cd13c8 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt @@ -24,11 +24,11 @@ package org.opendc.trace.opendc import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.TableWriter -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_DURATION -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateDuration +import org.opendc.trace.conv.resourceStateTimestamp import org.opendc.trace.opendc.parquet.ResourceState import java.time.Duration import java.time.Instant @@ -41,113 +41,149 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R /** * The current state for the record that is being written. */ - private var _isActive = false - private var _id: String = "" - private var _timestamp: Instant = Instant.MIN - private var _duration: Duration = Duration.ZERO - private var _cpuCount: Int = 0 - private var _cpuUsage: Double = Double.NaN + private var localIsActive = false + private var localID: String = "" + private var localTimestamp: Instant = Instant.MIN + private var localDuration: Duration = Duration.ZERO + private var localCpuCount: Int = 0 + private var localCpuUsage: Double = Double.NaN override fun startRow() { - _isActive = true - _id = "" - _timestamp = Instant.MIN - _duration = Duration.ZERO - _cpuCount = 0 - _cpuUsage = Double.NaN + localIsActive = true + localID = "" + localTimestamp = Instant.MIN + localDuration = Duration.ZERO + localCpuCount = 0 + localCpuUsage = Double.NaN } override fun endRow() { - check(_isActive) { "No active row" } - _isActive = false + check(localIsActive) { "No active row" } + localIsActive = false - check(lastId != _id || _timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" } + check(lastId != localID || localTimestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" } - writer.write(ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage)) + writer.write(ResourceState(localID, localTimestamp, localDuration, localCpuCount, localCpuUsage)) - lastId = _id - lastTimestamp = _timestamp + lastId = localID + lastTimestamp = localTimestamp } override fun resolve(name: String): Int { return when (name) { - RESOURCE_ID -> COL_ID - RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP - RESOURCE_STATE_DURATION -> COL_DURATION - RESOURCE_CPU_COUNT -> COL_CPU_COUNT - RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE + resourceID -> colID + resourceStateTimestamp -> colTimestamp + resourceStateDuration -> colDuration + resourceCpuCount -> colCpuCount + resourceStateCpuUsage -> colCpuUsage else -> -1 } } - override fun setBoolean(index: Int, value: Boolean) { + override fun setBoolean( + index: Int, + value: Boolean, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setInt(index: Int, value: Int) { - check(_isActive) { "No active row" } + override fun setInt( + index: Int, + value: Int, + ) { + check(localIsActive) { "No active row" } when (index) { - COL_CPU_COUNT -> _cpuCount = value + colCpuCount -> localCpuCount = value else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } - override fun setLong(index: Int, value: Long) { + override fun setLong( + index: Int, + value: Long, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setFloat(index: Int, value: Float) { + override fun setFloat( + index: Int, + value: Float, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setDouble(index: Int, value: Double) { - check(_isActive) { "No active row" } + override fun setDouble( + index: Int, + value: Double, + ) { + check(localIsActive) { "No active row" } when (index) { - COL_CPU_USAGE -> _cpuUsage = value + colCpuUsage -> localCpuUsage = value else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } - override fun setString(index: Int, value: String) { - check(_isActive) { "No active row" } + override fun setString( + index: Int, + value: String, + ) { + check(localIsActive) { "No active row" } when (index) { - COL_ID -> _id = value + colID -> localID = value else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } - override fun setUUID(index: Int, value: UUID) { + override fun setUUID( + index: Int, + value: UUID, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setInstant(index: Int, value: Instant) { - check(_isActive) { "No active row" } + override fun setInstant( + index: Int, + value: Instant, + ) { + check(localIsActive) { "No active row" } when (index) { - COL_TIMESTAMP -> _timestamp = value + colTimestamp -> localTimestamp = value else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } - override fun setDuration(index: Int, value: Duration) { - check(_isActive) { "No active row" } + override fun setDuration( + index: Int, + value: Duration, + ) { + check(localIsActive) { "No active row" } when (index) { - COL_DURATION -> _duration = value + colDuration -> localDuration = value else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } - override fun <T> setList(index: Int, value: List<T>) { + override fun <T> setList( + index: Int, + value: List<T>, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun <T> setSet(index: Int, value: Set<T>) { + override fun <T> setSet( + index: Int, + value: Set<T>, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun <K, V> setMap(index: Int, value: Map<K, V>) { + override fun <K, V> setMap( + index: Int, + value: Map<K, V>, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } @@ -165,9 +201,9 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R private var lastId: String? = null private var lastTimestamp: Instant = Instant.MAX - private val COL_ID = 0 - private val COL_TIMESTAMP = 1 - private val COL_DURATION = 2 - private val COL_CPU_COUNT = 3 - private val COL_CPU_USAGE = 4 + private val colID = 0 + private val colTimestamp = 1 + private val colDuration = 2 + private val colCpuCount = 3 + private val colCpuUsage = 4 } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt index d4613158..195929aa 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt @@ -23,12 +23,12 @@ package org.opendc.trace.opendc import org.opendc.trace.TableReader -import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY -import org.opendc.trace.conv.RESOURCE_START_TIME -import org.opendc.trace.conv.RESOURCE_STOP_TIME +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStartTime +import org.opendc.trace.conv.resourceStopTime import org.opendc.trace.opendc.parquet.Resource import org.opendc.trace.util.parquet.LocalParquetReader import java.time.Duration @@ -56,27 +56,27 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R } } - private val COL_ID = 0 - private val COL_START_TIME = 1 - private val COL_STOP_TIME = 2 - private val COL_CPU_COUNT = 3 - private val COL_CPU_CAPACITY = 4 - private val COL_MEM_CAPACITY = 5 + private val colID = 0 + private val colStartTime = 1 + private val colStopTime = 2 + private val colCpuCount = 3 + private val colCpuCapacity = 4 + private val colMemCapacity = 5 override fun resolve(name: String): Int { return when (name) { - RESOURCE_ID -> COL_ID - RESOURCE_START_TIME -> COL_START_TIME - RESOURCE_STOP_TIME -> COL_STOP_TIME - RESOURCE_CPU_COUNT -> COL_CPU_COUNT - RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY - RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY + resourceID -> colID + resourceStartTime -> colStartTime + resourceStopTime -> colStopTime + resourceCpuCount -> colCpuCount + resourceCpuCapacity -> colCpuCapacity + resourceMemCapacity -> colMemCapacity else -> -1 } } override fun isNull(index: Int): Boolean { - require(index in 0..COL_MEM_CAPACITY) { "Invalid column index" } + require(index in 0..colMemCapacity) { "Invalid column index" } return false } @@ -88,7 +88,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_CPU_COUNT -> record.cpuCount + colCpuCount -> record.cpuCount else -> throw IllegalArgumentException("Invalid column") } } @@ -105,8 +105,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_CPU_CAPACITY -> record.cpuCapacity - COL_MEM_CAPACITY -> record.memCapacity + colCpuCapacity -> record.cpuCapacity + colMemCapacity -> record.memCapacity else -> throw IllegalArgumentException("Invalid column") } } @@ -115,7 +115,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_ID -> record.id + colID -> record.id else -> throw IllegalArgumentException("Invalid column") } } @@ -128,8 +128,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_START_TIME -> record.startTime - COL_STOP_TIME -> record.stopTime + colStartTime -> record.startTime + colStopTime -> record.stopTime else -> throw IllegalArgumentException("Invalid column") } } @@ -138,15 +138,25 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R throw IllegalArgumentException("Invalid column") } - override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + override fun <T> getList( + index: Int, + elementType: Class<T>, + ): List<T>? { throw IllegalArgumentException("Invalid column") } - override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + override fun <T> getSet( + index: Int, + elementType: Class<T>, + ): Set<T>? { throw IllegalArgumentException("Invalid column") } - override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + override fun <K, V> getMap( + index: Int, + keyType: Class<K>, + valueType: Class<V>, + ): Map<K, V>? { throw IllegalArgumentException("Invalid column") } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt index 73a03891..5bbc2f3f 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt @@ -24,12 +24,12 @@ package org.opendc.trace.opendc import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.TableWriter -import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY -import org.opendc.trace.conv.RESOURCE_START_TIME -import org.opendc.trace.conv.RESOURCE_STOP_TIME +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStartTime +import org.opendc.trace.conv.resourceStopTime import org.opendc.trace.opendc.parquet.Resource import java.time.Duration import java.time.Instant @@ -42,105 +42,141 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour /** * The current state for the record that is being written. */ - private var _isActive = false - private var _id: String = "" - private var _startTime: Instant = Instant.MIN - private var _stopTime: Instant = Instant.MIN - private var _cpuCount: Int = 0 - private var _cpuCapacity: Double = Double.NaN - private var _memCapacity: Double = Double.NaN + private var localIsActive = false + private var localId: String = "" + private var localStartTime: Instant = Instant.MIN + private var localStopTime: Instant = Instant.MIN + private var localCpuCount: Int = 0 + private var localCpuCapacity: Double = Double.NaN + private var localMemCapacity: Double = Double.NaN override fun startRow() { - _isActive = true - _id = "" - _startTime = Instant.MIN - _stopTime = Instant.MIN - _cpuCount = 0 - _cpuCapacity = Double.NaN - _memCapacity = Double.NaN + localIsActive = true + localId = "" + localStartTime = Instant.MIN + localStopTime = Instant.MIN + localCpuCount = 0 + localCpuCapacity = Double.NaN + localMemCapacity = Double.NaN } override fun endRow() { - check(_isActive) { "No active row" } - _isActive = false - writer.write(Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity)) + check(localIsActive) { "No active row" } + localIsActive = false + writer.write(Resource(localId, localStartTime, localStopTime, localCpuCount, localCpuCapacity, localMemCapacity)) } override fun resolve(name: String): Int { return when (name) { - RESOURCE_ID -> COL_ID - RESOURCE_START_TIME -> COL_START_TIME - RESOURCE_STOP_TIME -> COL_STOP_TIME - RESOURCE_CPU_COUNT -> COL_CPU_COUNT - RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY - RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY + resourceID -> colID + resourceStartTime -> colStartTime + resourceStopTime -> colStopTime + resourceCpuCount -> colCpuCount + resourceCpuCapacity -> colCpuCapacity + resourceMemCapacity -> colMemCapacity else -> -1 } } - override fun setBoolean(index: Int, value: Boolean) { + override fun setBoolean( + index: Int, + value: Boolean, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setInt(index: Int, value: Int) { - check(_isActive) { "No active row" } + override fun setInt( + index: Int, + value: Int, + ) { + check(localIsActive) { "No active row" } when (index) { - COL_CPU_COUNT -> _cpuCount = value + colCpuCount -> localCpuCount = value else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } - override fun setLong(index: Int, value: Long) { + override fun setLong( + index: Int, + value: Long, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setFloat(index: Int, value: Float) { + override fun setFloat( + index: Int, + value: Float, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setDouble(index: Int, value: Double) { - check(_isActive) { "No active row" } + override fun setDouble( + index: Int, + value: Double, + ) { + check(localIsActive) { "No active row" } when (index) { - COL_CPU_CAPACITY -> _cpuCapacity = value - COL_MEM_CAPACITY -> _memCapacity = value + colCpuCapacity -> localCpuCapacity = value + colMemCapacity -> localMemCapacity = value else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } - override fun setString(index: Int, value: String) { - check(_isActive) { "No active row" } + override fun setString( + index: Int, + value: String, + ) { + check(localIsActive) { "No active row" } when (index) { - COL_ID -> _id = value + colID -> localId = value else -> throw IllegalArgumentException("Invalid column index $index") } } - override fun setUUID(index: Int, value: UUID) { + override fun setUUID( + index: Int, + value: UUID, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setInstant(index: Int, value: Instant) { - check(_isActive) { "No active row" } + override fun setInstant( + index: Int, + value: Instant, + ) { + check(localIsActive) { "No active row" } when (index) { - COL_START_TIME -> _startTime = value - COL_STOP_TIME -> _stopTime = value + colStartTime -> localStartTime = value + colStopTime -> localStopTime = value else -> throw IllegalArgumentException("Invalid column index $index") } } - override fun setDuration(index: Int, value: Duration) { + override fun setDuration( + index: Int, + value: Duration, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun <T> setList(index: Int, value: List<T>) { + override fun <T> setList( + index: Int, + value: List<T>, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun <T> setSet(index: Int, value: Set<T>) { + override fun <T> setSet( + index: Int, + value: Set<T>, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun <K, V> setMap(index: Int, value: Map<K, V>) { + override fun <K, V> setMap( + index: Int, + value: Map<K, V>, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } @@ -152,10 +188,10 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour writer.close() } - private val COL_ID = 0 - private val COL_START_TIME = 1 - private val COL_STOP_TIME = 2 - private val COL_CPU_COUNT = 3 - private val COL_CPU_CAPACITY = 4 - private val COL_MEM_CAPACITY = 5 + private val colID = 0 + private val colStartTime = 1 + private val colStopTime = 2 + private val colCpuCount = 3 + private val colCpuCapacity = 4 + private val colMemCapacity = 5 } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index c4790538..9abe872f 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -34,18 +34,18 @@ import org.opendc.trace.TableWriter import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET -import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY -import org.opendc.trace.conv.RESOURCE_START_TIME -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_DURATION -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP -import org.opendc.trace.conv.RESOURCE_STOP_TIME import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStartTime +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateDuration +import org.opendc.trace.conv.resourceStateTimestamp +import org.opendc.trace.conv.resourceStopTime import org.opendc.trace.opendc.parquet.ResourceReadSupport import org.opendc.trace.opendc.parquet.ResourceStateReadSupport import org.opendc.trace.opendc.parquet.ResourceStateWriteSupport @@ -86,39 +86,49 @@ public class OdcVmTraceFormat : TraceFormat { override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES, TABLE_INTERFERENCE_GROUPS) - override fun getDetails(path: Path, table: String): TableDetails { + override fun getDetails( + path: Path, + table: String, + ): TableDetails { return when (table) { - TABLE_RESOURCES -> TableDetails( - listOf( - TableColumn(RESOURCE_ID, TableColumnType.String), - TableColumn(RESOURCE_START_TIME, TableColumnType.Instant), - TableColumn(RESOURCE_STOP_TIME, TableColumnType.Instant), - TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int), - TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double), - TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double) + TABLE_RESOURCES -> + TableDetails( + listOf( + TableColumn(resourceID, TableColumnType.String), + TableColumn(resourceStartTime, TableColumnType.Instant), + TableColumn(resourceStopTime, TableColumnType.Instant), + TableColumn(resourceCpuCount, TableColumnType.Int), + TableColumn(resourceCpuCapacity, TableColumnType.Double), + TableColumn(resourceMemCapacity, TableColumnType.Double), + ), ) - ) - TABLE_RESOURCE_STATES -> TableDetails( - listOf( - TableColumn(RESOURCE_ID, TableColumnType.String), - TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant), - TableColumn(RESOURCE_STATE_DURATION, TableColumnType.Duration), - TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int), - TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double) + TABLE_RESOURCE_STATES -> + TableDetails( + listOf( + TableColumn(resourceID, TableColumnType.String), + TableColumn(resourceStateTimestamp, TableColumnType.Instant), + TableColumn(resourceStateDuration, TableColumnType.Duration), + TableColumn(resourceCpuCount, TableColumnType.Int), + TableColumn(resourceStateCpuUsage, TableColumnType.Double), + ), ) - ) - TABLE_INTERFERENCE_GROUPS -> TableDetails( - listOf( - TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)), - TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double), - TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double) + TABLE_INTERFERENCE_GROUPS -> + TableDetails( + listOf( + TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)), + TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double), + TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double), + ), ) - ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { + override fun newReader( + path: Path, + table: String, + projection: List<String>?, + ): TableReader { return when (table) { TABLE_RESOURCES -> { val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection)) @@ -130,11 +140,12 @@ public class OdcVmTraceFormat : TraceFormat { } TABLE_INTERFERENCE_GROUPS -> { val modelPath = path.resolve("interference-model.json") - val parser = if (modelPath.exists()) { - jsonFactory.createParser(modelPath.toFile()) - } else { - jsonFactory.createParser("[]") // If model does not exist, return empty model - } + val parser = + if (modelPath.exists()) { + jsonFactory.createParser(modelPath.toFile()) + } else { + jsonFactory.createParser("[]") // If model does not exist, return empty model + } OdcVmInterferenceJsonTableReader(parser) } @@ -142,26 +153,31 @@ public class OdcVmTraceFormat : TraceFormat { } } - override fun newWriter(path: Path, table: String): TableWriter { + override fun newWriter( + path: Path, + table: String, + ): TableWriter { return when (table) { TABLE_RESOURCES -> { - val writer = LocalParquetWriter.builder(path.resolve("meta.parquet"), ResourceWriteSupport()) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withPageWriteChecksumEnabled(true) - .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build() + val writer = + LocalParquetWriter.builder(path.resolve("meta.parquet"), ResourceWriteSupport()) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withPageWriteChecksumEnabled(true) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() OdcVmResourceTableWriter(writer) } TABLE_RESOURCE_STATES -> { - val writer = LocalParquetWriter.builder(path.resolve("trace.parquet"), ResourceStateWriteSupport()) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withDictionaryEncoding("id", true) - .withBloomFilterEnabled("id", true) - .withPageWriteChecksumEnabled(true) - .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build() + val writer = + LocalParquetWriter.builder(path.resolve("trace.parquet"), ResourceStateWriteSupport()) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withDictionaryEncoding("id", true) + .withBloomFilterEnabled("id", true) + .withPageWriteChecksumEnabled(true) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() OdcVmResourceStateTableWriter(writer) } TABLE_INTERFERENCE_GROUPS -> { diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt index c6db45b5..13eefe72 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt @@ -33,5 +33,5 @@ internal data class Resource( val stopTime: Instant, val cpuCount: Int, val cpuCapacity: Double, - val memCapacity: Double + val memCapacity: Double, ) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt index 52911d5f..8bada02e 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt @@ -31,12 +31,12 @@ import org.apache.parquet.schema.MessageType import org.apache.parquet.schema.PrimitiveType import org.apache.parquet.schema.Types import org.opendc.trace.TableColumn -import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY -import org.opendc.trace.conv.RESOURCE_START_TIME -import org.opendc.trace.conv.RESOURCE_STOP_TIME +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStartTime +import org.opendc.trace.conv.resourceStopTime /** * A [ReadSupport] instance for [Resource] objects. @@ -45,18 +45,19 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read /** * Mapping from field names to [TableColumn]s. */ - private val fieldMap = mapOf( - "id" to RESOURCE_ID, - "submissionTime" to RESOURCE_START_TIME, - "start_time" to RESOURCE_START_TIME, - "endTime" to RESOURCE_STOP_TIME, - "stop_time" to RESOURCE_STOP_TIME, - "maxCores" to RESOURCE_CPU_COUNT, - "cpu_count" to RESOURCE_CPU_COUNT, - "cpu_capacity" to RESOURCE_CPU_CAPACITY, - "requiredMemory" to RESOURCE_MEM_CAPACITY, - "mem_capacity" to RESOURCE_MEM_CAPACITY - ) + private val fieldMap = + mapOf( + "id" to resourceID, + "submissionTime" to resourceStartTime, + "start_time" to resourceStartTime, + "endTime" to resourceStopTime, + "stop_time" to resourceStopTime, + "maxCores" to resourceCpuCount, + "cpu_count" to resourceCpuCount, + "cpu_capacity" to resourceCpuCapacity, + "requiredMemory" to resourceMemCapacity, + "mem_capacity" to resourceMemCapacity, + ) override fun init(context: InitContext): ReadContext { val projectedSchema = @@ -84,7 +85,7 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read configuration: Configuration, keyValueMetaData: Map<String, String>, fileSchema: MessageType, - readContext: ReadContext + readContext: ReadContext, ): RecordMaterializer<Resource> = ResourceRecordMaterializer(readContext.requestedSchema) companion object { @@ -92,64 +93,67 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read * Parquet read schema (version 2.0) for the "resources" table in the trace. */ @JvmStatic - val READ_SCHEMA_V2_0: MessageType = Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("submissionTime"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("endTime"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("maxCores"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("requiredMemory") - ) - .named("resource") + val READ_SCHEMA_V2_0: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("submissionTime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("endTime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("maxCores"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("requiredMemory"), + ) + .named("resource") /** * Parquet read schema (version 2.1) for the "resources" table in the trace. */ @JvmStatic - val READ_SCHEMA_V2_1: MessageType = Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("start_time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("stop_time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity") - ) - .named("resource") + val READ_SCHEMA_V2_1: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("start_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("stop_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + ) + .named("resource") /** * Parquet read schema for the "resources" table in the trace. */ @JvmStatic - val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0 - .union(READ_SCHEMA_V2_1) + val READ_SCHEMA: MessageType = + READ_SCHEMA_V2_0 + .union(READ_SCHEMA_V2_1) } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt index 936a684a..6e2afa7a 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt @@ -37,75 +37,91 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali /** * State of current record being read. */ - private var _id = "" - private var _startTime = Instant.MIN - private var _stopTime = Instant.MIN - private var _cpuCount = 0 - private var _cpuCapacity = 0.0 - private var _memCapacity = 0.0 + private var localId = "" + private var localStartTime = Instant.MIN + private var localStopTime = Instant.MIN + private var localCpuCount = 0 + private var localCpuCapacity = 0.0 + private var localMemCapacity = 0.0 /** * Root converter for the record. */ - private val root = object : GroupConverter() { - /** - * The converters for the columns of the schema. - */ - private val converters = schema.fields.map { type -> - when (type.name) { - "id" -> object : PrimitiveConverter() { - override fun addBinary(value: Binary) { - _id = value.toStringUsingUTF8() - } - } - "start_time", "submissionTime" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - _startTime = Instant.ofEpochMilli(value) - } - } - "stop_time", "endTime" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - _stopTime = Instant.ofEpochMilli(value) - } - } - "cpu_count", "maxCores" -> object : PrimitiveConverter() { - override fun addInt(value: Int) { - _cpuCount = value - } - } - "cpu_capacity" -> object : PrimitiveConverter() { - override fun addDouble(value: Double) { - _cpuCapacity = value - } - } - "mem_capacity", "requiredMemory" -> object : PrimitiveConverter() { - override fun addDouble(value: Double) { - _memCapacity = value - } + private val root = + object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = + schema.fields.map { type -> + when (type.name) { + "id" -> + object : PrimitiveConverter() { + override fun addBinary(value: Binary) { + localId = value.toStringUsingUTF8() + } + } + "start_time", "submissionTime" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localStartTime = Instant.ofEpochMilli(value) + } + } + "stop_time", "endTime" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localStopTime = Instant.ofEpochMilli(value) + } + } + "cpu_count", "maxCores" -> + object : PrimitiveConverter() { + override fun addInt(value: Int) { + localCpuCount = value + } + } + "cpu_capacity" -> + object : PrimitiveConverter() { + override fun addDouble(value: Double) { + localCpuCapacity = value + } + } + "mem_capacity", "requiredMemory" -> + object : PrimitiveConverter() { + override fun addDouble(value: Double) { + localMemCapacity = value + } - override fun addLong(value: Long) { - _memCapacity = value.toDouble() + override fun addLong(value: Long) { + localMemCapacity = value.toDouble() + } + } + else -> error("Unknown column $type") } } - else -> error("Unknown column $type") - } - } - override fun start() { - _id = "" - _startTime = Instant.MIN - _stopTime = Instant.MIN - _cpuCount = 0 - _cpuCapacity = 0.0 - _memCapacity = 0.0 - } + override fun start() { + localId = "" + localStartTime = Instant.MIN + localStopTime = Instant.MIN + localCpuCount = 0 + localCpuCapacity = 0.0 + localMemCapacity = 0.0 + } - override fun end() {} + override fun end() {} - override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] - } + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } - override fun getCurrentRecord(): Resource = Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity) + override fun getCurrentRecord(): Resource = + Resource( + localId, + localStartTime, + localStopTime, + localCpuCount, + localCpuCapacity, + localMemCapacity, + ) override fun getRootConverter(): GroupConverter = root } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt index 9ad58764..483f444c 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt @@ -30,5 +30,5 @@ internal class ResourceState( val timestamp: Instant, val duration: Duration, val cpuCount: Int, - val cpuUsage: Double + val cpuUsage: Double, ) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt index 56366cd8..21e206a9 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt @@ -31,11 +31,11 @@ import org.apache.parquet.schema.MessageType import org.apache.parquet.schema.PrimitiveType import org.apache.parquet.schema.Types import org.opendc.trace.TableColumn -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_DURATION -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateDuration +import org.opendc.trace.conv.resourceStateTimestamp /** * A [ReadSupport] instance for [ResourceState] objects. @@ -44,16 +44,17 @@ internal class ResourceStateReadSupport(private val projection: List<String>?) : /** * Mapping from field names to [TableColumn]s. */ - private val fieldMap = mapOf( - "id" to RESOURCE_ID, - "time" to RESOURCE_STATE_TIMESTAMP, - "timestamp" to RESOURCE_STATE_TIMESTAMP, - "duration" to RESOURCE_STATE_DURATION, - "cores" to RESOURCE_CPU_COUNT, - "cpu_count" to RESOURCE_CPU_COUNT, - "cpuUsage" to RESOURCE_STATE_CPU_USAGE, - "cpu_usage" to RESOURCE_STATE_CPU_USAGE - ) + private val fieldMap = + mapOf( + "id" to resourceID, + "time" to resourceStateTimestamp, + "timestamp" to resourceStateTimestamp, + "duration" to resourceStateDuration, + "cores" to resourceCpuCount, + "cpu_count" to resourceCpuCount, + "cpuUsage" to resourceStateCpuUsage, + "cpu_usage" to resourceStateCpuUsage, + ) override fun init(context: InitContext): ReadContext { val projectedSchema = @@ -81,7 +82,7 @@ internal class ResourceStateReadSupport(private val projection: List<String>?) : configuration: Configuration, keyValueMetaData: Map<String, String>, fileSchema: MessageType, - readContext: ReadContext + readContext: ReadContext, ): RecordMaterializer<ResourceState> = ResourceStateRecordMaterializer(readContext.requestedSchema) companion object { @@ -89,53 +90,55 @@ internal class ResourceStateReadSupport(private val projection: List<String>?) : * Parquet read schema (version 2.0) for the "resource states" table in the trace. */ @JvmStatic - val READ_SCHEMA_V2_0: MessageType = Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cores"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpuUsage") - ) - .named("resource_state") + val READ_SCHEMA_V2_0: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cores"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpuUsage"), + ) + .named("resource_state") /** * Parquet read schema (version 2.1) for the "resource states" table in the trace. */ @JvmStatic - val READ_SCHEMA_V2_1: MessageType = Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_usage") - ) - .named("resource_state") + val READ_SCHEMA_V2_1: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage"), + ) + .named("resource_state") /** * Parquet read schema for the "resource states" table in the trace. diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt index a813a5af..72d24e78 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt @@ -38,69 +38,77 @@ internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMate /** * State of current record being read. */ - private var _id = "" - private var _timestamp = Instant.MIN - private var _duration = Duration.ZERO - private var _cpuCount = 0 - private var _cpuUsage = 0.0 + private var localId = "" + private var localTimestamp = Instant.MIN + private var localDuration = Duration.ZERO + private var localCpuCount = 0 + private var localCpuUsage = 0.0 /** * Root converter for the record. */ - private val root = object : GroupConverter() { - /** - * The converters for the columns of the schema. - */ - private val converters = schema.fields.map { type -> - when (type.name) { - "id" -> object : PrimitiveConverter() { - override fun addBinary(value: Binary) { - _id = value.toStringUsingUTF8() + private val root = + object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = + schema.fields.map { type -> + when (type.name) { + "id" -> + object : PrimitiveConverter() { + override fun addBinary(value: Binary) { + localId = value.toStringUsingUTF8() + } + } + "timestamp", "time" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localTimestamp = Instant.ofEpochMilli(value) + } + } + "duration" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localDuration = Duration.ofMillis(value) + } + } + "cpu_count", "cores" -> + object : PrimitiveConverter() { + override fun addInt(value: Int) { + localCpuCount = value + } + } + "cpu_usage", "cpuUsage" -> + object : PrimitiveConverter() { + override fun addDouble(value: Double) { + localCpuUsage = value + } + } + "flops" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + // Ignore to support v1 format + } + } + else -> error("Unknown column $type") } } - "timestamp", "time" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - _timestamp = Instant.ofEpochMilli(value) - } - } - "duration" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - _duration = Duration.ofMillis(value) - } - } - "cpu_count", "cores" -> object : PrimitiveConverter() { - override fun addInt(value: Int) { - _cpuCount = value - } - } - "cpu_usage", "cpuUsage" -> object : PrimitiveConverter() { - override fun addDouble(value: Double) { - _cpuUsage = value - } - } - "flops" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - // Ignore to support v1 format - } - } - else -> error("Unknown column $type") - } - } - override fun start() { - _id = "" - _timestamp = Instant.MIN - _duration = Duration.ZERO - _cpuCount = 0 - _cpuUsage = 0.0 - } + override fun start() { + localId = "" + localTimestamp = Instant.MIN + localDuration = Duration.ZERO + localCpuCount = 0 + localCpuUsage = 0.0 + } - override fun end() {} + override fun end() {} - override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] - } + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } - override fun getCurrentRecord(): ResourceState = ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage) + override fun getCurrentRecord(): ResourceState = ResourceState(localId, localTimestamp, localDuration, localCpuCount, localCpuUsage) override fun getRootConverter(): GroupConverter = root } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt index 0bbec4d2..2a6d8c12 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt @@ -52,7 +52,10 @@ internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() { write(recordConsumer, record) } - private fun write(consumer: RecordConsumer, record: ResourceState) { + private fun write( + consumer: RecordConsumer, + record: ResourceState, + ) { consumer.startMessage() consumer.startField("id", 0) @@ -83,26 +86,27 @@ internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() { * Parquet schema for the "resource states" table in the trace. */ @JvmStatic - val WRITE_SCHEMA: MessageType = Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_usage") - ) - .named("resource_state") + val WRITE_SCHEMA: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage"), + ) + .named("resource_state") } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt index cd428754..ed62e2ce 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt @@ -53,7 +53,10 @@ internal class ResourceWriteSupport : WriteSupport<Resource>() { write(recordConsumer, record) } - private fun write(consumer: RecordConsumer, record: Resource) { + private fun write( + consumer: RecordConsumer, + record: Resource, + ) { consumer.startMessage() consumer.startField("id", 0) @@ -88,30 +91,31 @@ internal class ResourceWriteSupport : WriteSupport<Resource>() { * Parquet schema for the "resources" table in the trace. */ @JvmStatic - val WRITE_SCHEMA: MessageType = Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("start_time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("stop_time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity") - ) - .named("resource") + val WRITE_SCHEMA: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("start_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("stop_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + ) + .named("resource") } } diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index d3c3b35b..c9fa21c3 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -40,17 +40,17 @@ import org.opendc.trace.TableWriter import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET -import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY -import org.opendc.trace.conv.RESOURCE_START_TIME -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP -import org.opendc.trace.conv.RESOURCE_STOP_TIME import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStartTime +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateTimestamp +import org.opendc.trace.conv.resourceStopTime import org.opendc.trace.testkit.TableReaderTestKit import org.opendc.trace.testkit.TableWriterTestKit import java.nio.file.Files @@ -88,19 +88,19 @@ internal class OdcVmTraceFormatTest { @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testResources(name: String) { val path = Paths.get("src/test/resources/$name") - val reader = format.newReader(path, TABLE_RESOURCES, listOf(RESOURCE_ID, RESOURCE_START_TIME)) + val reader = format.newReader(path, TABLE_RESOURCES, listOf(resourceID, resourceStartTime)) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1019", reader.getString(RESOURCE_ID)) }, - { assertEquals(Instant.ofEpochMilli(1376314846000), reader.getInstant(RESOURCE_START_TIME)) }, + { assertEquals("1019", reader.getString(resourceID)) }, + { assertEquals(Instant.ofEpochMilli(1376314846000), reader.getInstant(resourceStartTime)) }, { assertTrue(reader.nextRow()) }, - { assertEquals("1023", reader.getString(RESOURCE_ID)) }, + { assertEquals("1023", reader.getString(resourceID)) }, { assertTrue(reader.nextRow()) }, - { assertEquals("1052", reader.getString(RESOURCE_ID)) }, + { assertEquals("1052", reader.getString(resourceID)) }, { assertTrue(reader.nextRow()) }, - { assertEquals("1073", reader.getString(RESOURCE_ID)) }, - { assertFalse(reader.nextRow()) } + { assertEquals("1073", reader.getString(resourceID)) }, + { assertFalse(reader.nextRow()) }, ) reader.close() @@ -112,12 +112,12 @@ internal class OdcVmTraceFormatTest { val writer = format.newWriter(path, TABLE_RESOURCES) writer.startRow() - writer.setString(RESOURCE_ID, "1019") - writer.setInstant(RESOURCE_START_TIME, Instant.EPOCH) - writer.setInstant(RESOURCE_STOP_TIME, Instant.EPOCH) - writer.setInt(RESOURCE_CPU_COUNT, 1) - writer.setDouble(RESOURCE_CPU_CAPACITY, 1024.0) - writer.setDouble(RESOURCE_MEM_CAPACITY, 1024.0) + writer.setString(resourceID, "1019") + writer.setInstant(resourceStartTime, Instant.EPOCH) + writer.setInstant(resourceStopTime, Instant.EPOCH) + writer.setInt(resourceCpuCount, 1) + writer.setDouble(resourceCpuCapacity, 1024.0) + writer.setDouble(resourceMemCapacity, 1024.0) writer.endRow() writer.close() @@ -125,13 +125,13 @@ internal class OdcVmTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1019", reader.getString(RESOURCE_ID)) }, - { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_START_TIME)) }, - { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_STOP_TIME)) }, - { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, - { assertEquals(1024.0, reader.getDouble(RESOURCE_CPU_CAPACITY)) }, - { assertEquals(1024.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) }, - { assertFalse(reader.nextRow()) } + { assertEquals("1019", reader.getString(resourceID)) }, + { assertEquals(Instant.EPOCH, reader.getInstant(resourceStartTime)) }, + { assertEquals(Instant.EPOCH, reader.getInstant(resourceStopTime)) }, + { assertEquals(1, reader.getInt(resourceCpuCount)) }, + { assertEquals(1024.0, reader.getDouble(resourceCpuCapacity)) }, + { assertEquals(1024.0, reader.getDouble(resourceMemCapacity)) }, + { assertFalse(reader.nextRow()) }, ) reader.close() @@ -141,17 +141,18 @@ internal class OdcVmTraceFormatTest { @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testSmoke(name: String) { val path = Paths.get("src/test/resources/$name") - val reader = format.newReader( - path, - TABLE_RESOURCE_STATES, - listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_CPU_USAGE) - ) + val reader = + format.newReader( + path, + TABLE_RESOURCE_STATES, + listOf(resourceID, resourceStateTimestamp, resourceStateCpuUsage), + ) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1019", reader.getString(RESOURCE_ID)) }, - { assertEquals(1376314846, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) }, - { assertEquals(0.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) } + { assertEquals("1019", reader.getString(resourceID)) }, + { assertEquals(1376314846, reader.getInstant(resourceStateTimestamp)?.epochSecond) }, + { assertEquals(0.0, reader.getDouble(resourceStateCpuUsage), 0.01) }, ) reader.close() @@ -163,10 +164,10 @@ internal class OdcVmTraceFormatTest { val writer = format.newWriter(path, TABLE_RESOURCE_STATES) writer.startRow() - writer.setString(RESOURCE_ID, "1019") - writer.setInstant(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH) - writer.setDouble(RESOURCE_STATE_CPU_USAGE, 23.0) - writer.setInt(RESOURCE_CPU_COUNT, 1) + writer.setString(resourceID, "1019") + writer.setInstant(resourceStateTimestamp, Instant.EPOCH) + writer.setDouble(resourceStateCpuUsage, 23.0) + writer.setInt(resourceCpuCount, 1) writer.endRow() writer.close() @@ -174,11 +175,11 @@ internal class OdcVmTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1019", reader.getString(RESOURCE_ID)) }, - { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_STATE_TIMESTAMP)) }, - { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, - { assertEquals(23.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE)) }, - { assertFalse(reader.nextRow()) } + { assertEquals("1019", reader.getString(resourceID)) }, + { assertEquals(Instant.EPOCH, reader.getInstant(resourceStateTimestamp)) }, + { assertEquals(1, reader.getInt(resourceCpuCount)) }, + { assertEquals(23.0, reader.getDouble(resourceStateCpuUsage)) }, + { assertFalse(reader.nextRow()) }, ) reader.close() @@ -187,11 +188,12 @@ internal class OdcVmTraceFormatTest { @Test fun testInterferenceGroups() { val path = Paths.get("src/test/resources/trace-v2.1") - val reader = format.newReader( - path, - TABLE_INTERFERENCE_GROUPS, - listOf(INTERFERENCE_GROUP_MEMBERS, INTERFERENCE_GROUP_TARGET, INTERFERENCE_GROUP_SCORE) - ) + val reader = + format.newReader( + path, + TABLE_INTERFERENCE_GROUPS, + listOf(INTERFERENCE_GROUP_MEMBERS, INTERFERENCE_GROUP_TARGET, INTERFERENCE_GROUP_SCORE), + ) assertAll( { assertTrue(reader.nextRow()) }, @@ -202,7 +204,7 @@ internal class OdcVmTraceFormatTest { { assertEquals(setOf("1023", "1052", "1073"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) }, { assertEquals(0.0, reader.getDouble(INTERFERENCE_GROUP_TARGET)) }, { assertEquals(0.7133055555552751, reader.getDouble(INTERFERENCE_GROUP_SCORE)) }, - { assertFalse(reader.nextRow()) } + { assertFalse(reader.nextRow()) }, ) reader.close() @@ -247,7 +249,7 @@ internal class OdcVmTraceFormatTest { { assertEquals(setOf("a", "b", "d"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) }, { assertEquals(0.5, reader.getDouble(INTERFERENCE_GROUP_TARGET)) }, { assertEquals(0.9, reader.getDouble(INTERFERENCE_GROUP_SCORE)) }, - { assertFalse(reader.nextRow()) } + { assertFalse(reader.nextRow()) }, ) reader.close() diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts index 2217a017..4cdd4350 100644 --- a/opendc-trace/opendc-trace-parquet/build.gradle.kts +++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts @@ -22,13 +22,13 @@ description = "Parquet helpers for traces in OpenDC" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } dependencies { - /* This configuration is necessary for a slim dependency on Apache Parquet */ + // This configuration is necessary for a slim dependency on Apache Parquet api(libs.parquet) { exclude(group = "org.apache.hadoop") } diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt index fd2e00cd..a60b426a 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalInputFile.kt @@ -47,61 +47,66 @@ public class LocalInputFile(private val path: Path) : InputFile { override fun getLength(): Long = channel.size() - override fun newStream(): SeekableInputStream = object : SeekableInputStream() { - override fun read(buf: ByteBuffer): Int { - return channel.read(buf) - } + override fun newStream(): SeekableInputStream = + object : SeekableInputStream() { + override fun read(buf: ByteBuffer): Int { + return channel.read(buf) + } - override fun read(): Int { - val single = ByteBuffer.allocate(1) - var read: Int + override fun read(): Int { + val single = ByteBuffer.allocate(1) + var read: Int - // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte - do { - read = channel.read(single) - } while (read == 0) + // ReadableByteChannel#read might read zero bytes so continue until we read at least one byte + do { + read = channel.read(single) + } while (read == 0) - return if (read == -1) { - read - } else { - single.get(0).toInt() and 0xff + return if (read == -1) { + read + } else { + single.get(0).toInt() and 0xff + } } - } - override fun getPos(): Long { - return channel.position() - } + override fun getPos(): Long { + return channel.position() + } - override fun seek(newPos: Long) { - channel.position(newPos) - } + override fun seek(newPos: Long) { + channel.position(newPos) + } - override fun readFully(bytes: ByteArray) { - readFully(ByteBuffer.wrap(bytes)) - } + override fun readFully(bytes: ByteArray) { + readFully(ByteBuffer.wrap(bytes)) + } - override fun readFully(bytes: ByteArray, start: Int, len: Int) { - readFully(ByteBuffer.wrap(bytes, start, len)) - } + override fun readFully( + bytes: ByteArray, + start: Int, + len: Int, + ) { + readFully(ByteBuffer.wrap(bytes, start, len)) + } - override fun readFully(buf: ByteBuffer) { - var remainder = buf.remaining() - while (remainder > 0) { - val read = channel.read(buf) - remainder -= read + override fun readFully(buf: ByteBuffer) { + var remainder = buf.remaining() + while (remainder > 0) { + val read = channel.read(buf) + remainder -= read - if (read == -1 && remainder > 0) { - throw EOFException() + if (read == -1 && remainder > 0) { + throw EOFException() + } } } - } - override fun close() { - channel.close() - } + override fun close() { + channel.close() + } - override fun toString(): String = "NioSeekableInputStream" - } + override fun toString(): String = "NioSeekableInputStream" + } override fun toString(): String = "LocalInputFile[path=$path]" } diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt index 1b17ae5d..24627b45 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalOutputFile.kt @@ -51,8 +51,7 @@ public class LocalOutputFile(private val path: Path) : OutputFile { override fun supportsBlockSize(): Boolean = false - override fun defaultBlockSize(): Long = - throw UnsupportedOperationException("Local filesystem does not have default block size") + override fun defaultBlockSize(): Long = throw UnsupportedOperationException("Local filesystem does not have default block size") override fun getPath(): String = path.toString() @@ -77,7 +76,11 @@ public class LocalOutputFile(private val path: Path) : OutputFile { _pos += b.size } - override fun write(b: ByteArray, off: Int, len: Int) { + override fun write( + b: ByteArray, + off: Int, + len: Int, + ) { output.write(b, off, len) _pos += len } diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt index de8a56d0..b503254e 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt @@ -43,20 +43,21 @@ import kotlin.io.path.isDirectory public class LocalParquetReader<out T>( path: Path, private val readSupport: ReadSupport<T>, - private val strictTyping: Boolean = true + private val strictTyping: Boolean = true, ) : AutoCloseable { /** * The input files to process. */ - private val filesIterator = if (path.isDirectory()) { - Files.list(path) - .filter { !it.isDirectory() } - .sorted() - .map { LocalInputFile(it) } - .iterator() - } else { - listOf(LocalInputFile(path)).iterator() - } + private val filesIterator = + if (path.isDirectory()) { + Files.list(path) + .filter { !it.isDirectory() } + .sorted() + .map { LocalInputFile(it) } + .iterator() + } else { + listOf(LocalInputFile(path)).iterator() + } /** * The Parquet reader to use. @@ -104,11 +105,12 @@ public class LocalParquetReader<out T>( reader?.close() try { - this.reader = if (filesIterator.hasNext()) { - createReader(filesIterator.next()) - } else { - null - } + this.reader = + if (filesIterator.hasNext()) { + createReader(filesIterator.next()) + } else { + null + } } catch (e: Throwable) { this.reader = null throw e diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt index b5eb1deb..c7028fc3 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt @@ -37,7 +37,7 @@ public class LocalParquetWriter { */ public class Builder<T> internal constructor( output: OutputFile, - private val writeSupport: WriteSupport<T> + private val writeSupport: WriteSupport<T>, ) : ParquetWriter.Builder<T, Builder<T>>(output) { override fun self(): Builder<T> = this @@ -49,7 +49,9 @@ public class LocalParquetWriter { * Create a [Builder] instance that writes a Parquet file at the specified [path]. */ @JvmStatic - public fun <T> builder(path: Path, writeSupport: WriteSupport<T>): Builder<T> = - Builder(LocalOutputFile(path), writeSupport) + public fun <T> builder( + path: Path, + writeSupport: WriteSupport<T>, + ): Builder<T> = Builder(LocalOutputFile(path), writeSupport) } } diff --git a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt index b6c5a423..fc90aded 100644 --- a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt +++ b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt @@ -51,49 +51,52 @@ import java.nio.file.Path internal class ParquetTest { private lateinit var path: Path - private val schema = Types.buildMessage() - .addField( - Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED) - .named("field") - ) - .named("test") - private val writeSupport = object : WriteSupport<Int>() { - lateinit var recordConsumer: RecordConsumer - - override fun init(configuration: Configuration): WriteContext { - return WriteContext(schema, emptyMap()) - } + private val schema = + Types.buildMessage() + .addField( + Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED) + .named("field"), + ) + .named("test") + private val writeSupport = + object : WriteSupport<Int>() { + lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(schema, emptyMap()) + } - override fun prepareForWrite(recordConsumer: RecordConsumer) { - this.recordConsumer = recordConsumer - } + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } - override fun write(record: Int) { - val consumer = recordConsumer + override fun write(record: Int) { + val consumer = recordConsumer - consumer.startMessage() - consumer.startField("field", 0) - consumer.addInteger(record) - consumer.endField("field", 0) - consumer.endMessage() + consumer.startMessage() + consumer.startField("field", 0) + consumer.addInteger(record) + consumer.endField("field", 0) + consumer.endMessage() + } } - } - private val readSupport = object : ReadSupport<Int>() { - @Suppress("OVERRIDE_DEPRECATION") - override fun init( - configuration: Configuration, - keyValueMetaData: Map<String, String>, - fileSchema: MessageType - ): ReadContext = ReadContext(fileSchema) - - override fun prepareForRead( - configuration: Configuration, - keyValueMetaData: Map<String, String>, - fileSchema: MessageType, - readContext: ReadContext - ): RecordMaterializer<Int> = TestRecordMaterializer() - } + private val readSupport = + object : ReadSupport<Int>() { + @Suppress("OVERRIDE_DEPRECATION") + override fun init( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType, + ): ReadContext = ReadContext(fileSchema) + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType, + readContext: ReadContext, + ): RecordMaterializer<Int> = TestRecordMaterializer() + } /** * Set up the test @@ -117,9 +120,10 @@ internal class ParquetTest { @Test fun testSmoke() { val n = 4 - val writer = LocalParquetWriter.builder(path, writeSupport) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build() + val writer = + LocalParquetWriter.builder(path, writeSupport) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() try { repeat(n) { i -> @@ -166,19 +170,23 @@ internal class ParquetTest { private class TestRecordMaterializer : RecordMaterializer<Int>() { private var current: Int = 0 - private val fieldConverter = object : PrimitiveConverter() { - override fun addInt(value: Int) { - current = value + private val fieldConverter = + object : PrimitiveConverter() { + override fun addInt(value: Int) { + current = value + } } - } - private val root = object : GroupConverter() { - override fun getConverter(fieldIndex: Int): Converter { - require(fieldIndex == 0) - return fieldConverter + private val root = + object : GroupConverter() { + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return fieldConverter + } + + override fun start() {} + + override fun end() {} } - override fun start() {} - override fun end() {} - } override fun getCurrentRecord(): Int = current diff --git a/opendc-trace/opendc-trace-swf/build.gradle.kts b/opendc-trace/opendc-trace-swf/build.gradle.kts index d3bc5aa6..2798cdb1 100644 --- a/opendc-trace/opendc-trace-swf/build.gradle.kts +++ b/opendc-trace/opendc-trace-swf/build.gradle.kts @@ -22,7 +22,7 @@ description = "Support for Standard Workload Format (SWF) traces in OpenDC" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt index 2465fb47..5a79fd6f 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTaskTableReader.kt @@ -99,22 +99,22 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea override fun resolve(name: String): Int { return when (name) { - TASK_ID -> COL_JOB_ID - TASK_SUBMIT_TIME -> COL_SUBMIT_TIME - TASK_WAIT_TIME -> COL_WAIT_TIME - TASK_RUNTIME -> COL_RUN_TIME - TASK_ALLOC_NCPUS -> COL_ALLOC_NCPUS - TASK_REQ_NCPUS -> COL_REQ_NCPUS - TASK_STATUS -> COL_STATUS - TASK_USER_ID -> COL_USER_ID - TASK_GROUP_ID -> COL_GROUP_ID - TASK_PARENTS -> COL_PARENT_JOB + TASK_ID -> colJobID + TASK_SUBMIT_TIME -> colSubmitTime + TASK_WAIT_TIME -> colWaitTime + TASK_RUNTIME -> colRunTime + TASK_ALLOC_NCPUS -> colAllocNcpus + TASK_REQ_NCPUS -> colReqNcpus + TASK_STATUS -> colStatus + TASK_USER_ID -> colUserID + TASK_GROUP_ID -> colGroupID + TASK_PARENTS -> colParentJob else -> -1 } } override fun isNull(index: Int): Boolean { - require(index in COL_JOB_ID..COL_PARENT_THINK_TIME) { "Invalid column index" } + require(index in colJobID..colParentThinkTime) { "Invalid column index" } return false } @@ -125,7 +125,7 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea override fun getInt(index: Int): Int { check(state == State.Active) { "No active row" } return when (index) { - COL_REQ_NCPUS, COL_ALLOC_NCPUS, COL_STATUS, COL_GROUP_ID, COL_USER_ID -> fields[index].toInt(10) + colReqNcpus, colAllocNcpus, colStatus, colGroupID, colUserID -> fields[index].toInt(10) else -> throw IllegalArgumentException("Invalid column") } } @@ -145,7 +145,7 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea override fun getString(index: Int): String { check(state == State.Active) { "No active row" } return when (index) { - COL_JOB_ID -> fields[index] + colJobID -> fields[index] else -> throw IllegalArgumentException("Invalid column") } } @@ -157,7 +157,7 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea override fun getInstant(index: Int): Instant? { check(state == State.Active) { "No active row" } return when (index) { - COL_SUBMIT_TIME -> Instant.ofEpochSecond(fields[index].toLong(10)) + colSubmitTime -> Instant.ofEpochSecond(fields[index].toLong(10)) else -> throw IllegalArgumentException("Invalid column") } } @@ -165,20 +165,26 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea override fun getDuration(index: Int): Duration? { check(state == State.Active) { "No active row" } return when (index) { - COL_WAIT_TIME, COL_RUN_TIME -> Duration.ofSeconds(fields[index].toLong(10)) + colWaitTime, colRunTime -> Duration.ofSeconds(fields[index].toLong(10)) else -> throw IllegalArgumentException("Invalid column") } } - override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + override fun <T> getList( + index: Int, + elementType: Class<T>, + ): List<T>? { throw IllegalArgumentException("Invalid column") } - override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + override fun <T> getSet( + index: Int, + elementType: Class<T>, + ): Set<T>? { check(state == State.Active) { "No active row" } @Suppress("UNCHECKED_CAST") return when (index) { - COL_PARENT_JOB -> { + colParentJob -> { require(elementType.isAssignableFrom(String::class.java)) val parent = fields[index].toLong(10) if (parent < 0) emptySet() else setOf(parent) @@ -187,7 +193,11 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea } as Set<T>? } - override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + override fun <K, V> getMap( + index: Int, + keyType: Class<K>, + valueType: Class<V>, + ): Map<K, V>? { throw IllegalArgumentException("Invalid column") } @@ -199,26 +209,28 @@ internal class SwfTaskTableReader(private val reader: BufferedReader) : TableRea /** * Default column indices for the SWF format. */ - private val COL_JOB_ID = 0 - private val COL_SUBMIT_TIME = 1 - private val COL_WAIT_TIME = 2 - private val COL_RUN_TIME = 3 - private val COL_ALLOC_NCPUS = 4 - private val COL_AVG_CPU_TIME = 5 - private val COL_USED_MEM = 6 - private val COL_REQ_NCPUS = 7 - private val COL_REQ_TIME = 8 - private val COL_REQ_MEM = 9 - private val COL_STATUS = 10 - private val COL_USER_ID = 11 - private val COL_GROUP_ID = 12 - private val COL_EXEC_NUM = 13 - private val COL_QUEUE_NUM = 14 - private val COL_PART_NUM = 15 - private val COL_PARENT_JOB = 16 - private val COL_PARENT_THINK_TIME = 17 + private val colJobID = 0 + private val colSubmitTime = 1 + private val colWaitTime = 2 + private val colRunTime = 3 + private val colAllocNcpus = 4 + private val colAvgCpuTime = 5 + private val colUsedMem = 6 + private val colReqNcpus = 7 + private val colReqTime = 8 + private val colReqMem = 9 + private val colStatus = 10 + private val colUserID = 11 + private val colGroupID = 12 + private val colExecNum = 13 + private val colQueueNum = 14 + private val colPartNum = 15 + private val colParentJob = 16 + private val colParentThinkTime = 17 private enum class State { - Pending, Active, Closed + Pending, + Active, + Closed, } } diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt index c51805d7..d59b07b4 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt @@ -56,34 +56,45 @@ public class SwfTraceFormat : TraceFormat { override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) - override fun getDetails(path: Path, table: String): TableDetails { + override fun getDetails( + path: Path, + table: String, + ): TableDetails { return when (table) { - TABLE_TASKS -> TableDetails( - listOf( - TableColumn(TASK_ID, TableColumnType.String), - TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), - TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), - TableColumn(TASK_RUNTIME, TableColumnType.Duration), - TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), - TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int), - TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), - TableColumn(TASK_STATUS, TableColumnType.Int), - TableColumn(TASK_GROUP_ID, TableColumnType.Int), - TableColumn(TASK_USER_ID, TableColumnType.Int) + TABLE_TASKS -> + TableDetails( + listOf( + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), + TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_ALLOC_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_STATUS, TableColumnType.Int), + TableColumn(TASK_GROUP_ID, TableColumnType.Int), + TableColumn(TASK_USER_ID, TableColumnType.Int), + ), ) - ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { + override fun newReader( + path: Path, + table: String, + projection: List<String>?, + ): TableReader { return when (table) { TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader()) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newWriter(path: Path, table: String): TableWriter { + override fun newWriter( + path: Path, + table: String, + ): TableWriter { throw UnsupportedOperationException("Writing not supported for this format") } } diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt index 71d6dee3..436f2572 100644 --- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -77,7 +77,7 @@ internal class SwfTraceFormatTest { { assertEquals(306, reader.getInt(TASK_ALLOC_NCPUS)) }, { assertTrue(reader.nextRow()) }, { assertEquals("2", reader.getString(TASK_ID)) }, - { assertEquals(17, reader.getInt(TASK_ALLOC_NCPUS)) } + { assertEquals(17, reader.getInt(TASK_ALLOC_NCPUS)) }, ) reader.close() diff --git a/opendc-trace/opendc-trace-testkit/build.gradle.kts b/opendc-trace/opendc-trace-testkit/build.gradle.kts index f6b7222c..e75ffc8c 100644 --- a/opendc-trace/opendc-trace-testkit/build.gradle.kts +++ b/opendc-trace/opendc-trace-testkit/build.gradle.kts @@ -22,7 +22,7 @@ description = "Reusable test suite for implementors" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } diff --git a/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableReaderTestKit.kt b/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableReaderTestKit.kt index 4624cba0..e5808f81 100644 --- a/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableReaderTestKit.kt +++ b/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableReaderTestKit.kt @@ -89,7 +89,7 @@ public abstract class TableReaderTestKit { { assertThrows<IllegalArgumentException> { reader.getDuration(-1) } }, { assertThrows<IllegalArgumentException> { reader.getList(-1, Any::class.java) } }, { assertThrows<IllegalArgumentException> { reader.getSet(-1, Any::class.java) } }, - { assertThrows<IllegalArgumentException> { reader.getMap(-1, Any::class.java, Any::class.java) } } + { assertThrows<IllegalArgumentException> { reader.getMap(-1, Any::class.java, Any::class.java) } }, ) } @@ -111,13 +111,25 @@ public abstract class TableReaderTestKit { is TableColumnType.String -> assertFalse(reader.isNull(column.name) && reader.getString(column.name) != null) is TableColumnType.UUID -> assertFalse(reader.isNull(column.name) && reader.getUUID(column.name) != null) is TableColumnType.Instant -> assertFalse(reader.isNull(column.name) && reader.getInstant(column.name) != null) - is TableColumnType.Duration -> assertFalse(reader.isNull(column.name) && reader.getDuration(column.name) != null) - is TableColumnType.List -> assertFalse(reader.isNull(column.name) && reader.getList(column.name, Any::class.java) != null) - is TableColumnType.Set -> assertFalse(reader.isNull(column.name) && reader.getSet(column.name, Any::class.java) != null) - is TableColumnType.Map -> assertFalse(reader.isNull(column.name) && reader.getMap(column.name, Any::class.java, Any::class.java) != null) + is TableColumnType.Duration -> + assertFalse( + reader.isNull(column.name) && reader.getDuration(column.name) != null, + ) + is TableColumnType.List -> + assertFalse( + reader.isNull(column.name) && reader.getList(column.name, Any::class.java) != null, + ) + is TableColumnType.Set -> + assertFalse( + reader.isNull(column.name) && reader.getSet(column.name, Any::class.java) != null, + ) + is TableColumnType.Map -> + assertFalse( + reader.isNull(column.name) && reader.getMap(column.name, Any::class.java, Any::class.java) != null, + ) } } - } + }, ) } } diff --git a/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableWriterTestKit.kt b/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableWriterTestKit.kt index 3cd05f50..2b4adf19 100644 --- a/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableWriterTestKit.kt +++ b/opendc-trace/opendc-trace-testkit/src/main/kotlin/org/opendc/trace/testkit/TableWriterTestKit.kt @@ -88,7 +88,7 @@ public abstract class TableWriterTestKit { { assertThrows<IllegalArgumentException> { writer.setDuration(-1, Duration.ofMinutes(5)) } }, { assertThrows<IllegalArgumentException> { writer.setList(-1, listOf("test")) } }, { assertThrows<IllegalArgumentException> { writer.setSet(-1, setOf("test")) } }, - { assertThrows<IllegalArgumentException> { writer.setMap(-1, mapOf("test" to "test")) } } + { assertThrows<IllegalArgumentException> { writer.setMap(-1, mapOf("test" to "test")) } }, ) } @@ -117,7 +117,7 @@ public abstract class TableWriterTestKit { } } } - } + }, ) } diff --git a/opendc-trace/opendc-trace-tools/build.gradle.kts b/opendc-trace/opendc-trace-tools/build.gradle.kts index db11059b..002ab8cc 100644 --- a/opendc-trace/opendc-trace-tools/build.gradle.kts +++ b/opendc-trace/opendc-trace-tools/build.gradle.kts @@ -22,7 +22,7 @@ description = "Tools for working with workload traces" -/* Build configuration */ +// Build configuration plugins { `kotlin-conventions` application diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt index bf0e2e3b..17ff0c90 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt @@ -39,24 +39,23 @@ import com.github.ajalt.clikt.parameters.types.restrictTo import mu.KotlinLogging import org.opendc.trace.TableWriter import org.opendc.trace.Trace -import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY -import org.opendc.trace.conv.RESOURCE_START_TIME -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE_PCT -import org.opendc.trace.conv.RESOURCE_STATE_DURATION -import org.opendc.trace.conv.RESOURCE_STATE_MEM_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP -import org.opendc.trace.conv.RESOURCE_STOP_TIME import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStartTime +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateCpuUsagePct +import org.opendc.trace.conv.resourceStateDuration +import org.opendc.trace.conv.resourceStateMemUsage +import org.opendc.trace.conv.resourceStateTimestamp +import org.opendc.trace.conv.resourceStopTime import java.io.File import java.time.Duration import java.time.Instant import java.util.Random -import kotlin.collections.HashMap import kotlin.math.abs import kotlin.math.max import kotlin.math.min @@ -105,7 +104,7 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b */ private val converter by option("-c", "--converter", help = "converter strategy to use").groupChoice( "default" to DefaultTraceConverter(), - "azure" to AzureTraceConverter() + "azure" to AzureTraceConverter(), ).defaultByName("default") override fun run() { @@ -174,7 +173,11 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b * @param samplingOptions The sampling options to use. * @return The map of resources that have been selected. */ - abstract fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map<String, Resource> + abstract fun convertResources( + trace: Trace, + writer: TableWriter, + samplingOptions: SamplingOptions?, + ): Map<String, Resource> /** * Convert the resource states table for the trace. @@ -184,7 +187,11 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b * @param selected The set of virtual machines that have been selected. * @return The number of rows written. */ - abstract fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map<String, Resource>): Int + abstract fun convertResourceStates( + trace: Trace, + writer: TableWriter, + selected: Map<String, Resource>, + ): Int /** * A resource in the resource table. @@ -195,7 +202,7 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b val stopTime: Instant, val cpuCount: Int, val cpuCapacity: Double, - val memCapacity: Double + val memCapacity: Double, ) } @@ -211,14 +218,18 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b /** * The interval at which the samples where taken. */ - private val SAMPLE_INTERVAL = Duration.ofMinutes(5) + private val sampleInterval = Duration.ofMinutes(5) /** * The difference in CPU usage for the algorithm to cascade samples. */ - private val SAMPLE_CASCADE_DIFF = 0.1 + private val sampleCascadeDiff = 0.1 - override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map<String, Resource> { + override fun convertResources( + trace: Trace, + writer: TableWriter, + samplingOptions: SamplingOptions?, + ): Map<String, Resource> { val random = samplingOptions?.let { Random(it.seed) } val samplingFraction = samplingOptions?.fraction ?: 1.0 val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() @@ -226,12 +237,12 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b var hasNextRow = reader.nextRow() val selectedVms = mutableMapOf<String, Resource>() - val idCol = reader.resolve(RESOURCE_ID) - val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) - val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT) - val cpuCapacityCol = reader.resolve(RESOURCE_CPU_CAPACITY) - val memCapacityCol = reader.resolve(RESOURCE_MEM_CAPACITY) - val memUsageCol = reader.resolve(RESOURCE_STATE_MEM_USAGE) + val idCol = reader.resolve(resourceID) + val timestampCol = reader.resolve(resourceStateTimestamp) + val cpuCountCol = reader.resolve(resourceCpuCount) + val cpuCapacityCol = reader.resolve(resourceCpuCapacity) + val memCapacityCol = reader.resolve(resourceMemCapacity) + val memUsageCol = reader.resolve(resourceStateMemUsage) while (hasNextRow) { var id: String @@ -257,7 +268,7 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b } hasNextRow = reader.nextRow() - } while (hasNextRow && id == reader.getString(RESOURCE_ID)) + } while (hasNextRow && id == reader.getString(resourceID)) // Sample only a fraction of the VMs if (random != null && random.nextDouble() > samplingFraction) { @@ -266,7 +277,7 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b logger.info { "Selecting VM $id" } - val startInstant = Instant.ofEpochMilli(startTime) - SAMPLE_INTERVAL // Offset by sample interval + val startInstant = Instant.ofEpochMilli(startTime) - sampleInterval // Offset by sample interval val stopInstant = Instant.ofEpochMilli(stopTime) selectedVms.computeIfAbsent(id) { @@ -274,26 +285,30 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b } writer.startRow() - writer.setString(RESOURCE_ID, id) - writer.setInstant(RESOURCE_START_TIME, startInstant) - writer.setInstant(RESOURCE_STOP_TIME, stopInstant) - writer.setInt(RESOURCE_CPU_COUNT, cpuCount) - writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity) - writer.setDouble(RESOURCE_MEM_CAPACITY, max(memCapacity, memUsage)) + writer.setString(resourceID, id) + writer.setInstant(resourceStartTime, startInstant) + writer.setInstant(resourceStopTime, stopInstant) + writer.setInt(resourceCpuCount, cpuCount) + writer.setDouble(resourceCpuCapacity, cpuCapacity) + writer.setDouble(resourceMemCapacity, max(memCapacity, memUsage)) writer.endRow() } return selectedVms } - override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map<String, Resource>): Int { + override fun convertResourceStates( + trace: Trace, + writer: TableWriter, + selected: Map<String, Resource>, + ): Int { val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() - val sampleInterval = SAMPLE_INTERVAL.toMillis() + val sampleInterval = sampleInterval.toMillis() - val idCol = reader.resolve(RESOURCE_ID) - val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) - val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT) - val cpuUsageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE) + val idCol = reader.resolve(resourceID) + val timestampCol = reader.resolve(resourceStateTimestamp) + val cpuCountCol = reader.resolve(resourceCpuCount) + val cpuUsageCol = reader.resolve(resourceStateCpuUsage) var hasNextRow = reader.nextRow() var count = 0 @@ -315,9 +330,10 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b // Attempt to cascade further samples into one if they share the same CPU usage while (reader.nextRow().also { hasNextRow = it }) { - val shouldCascade = id == reader.getString(idCol) && - abs(cpuUsage - reader.getDouble(cpuUsageCol)) < SAMPLE_CASCADE_DIFF && - cpuCount == reader.getInt(cpuCountCol) + val shouldCascade = + id == reader.getString(idCol) && + abs(cpuUsage - reader.getDouble(cpuUsageCol)) < sampleCascadeDiff && + cpuCount == reader.getInt(cpuCountCol) // Check whether the next sample can be cascaded with the current sample: // (1) The VM identifier of both samples matches @@ -339,11 +355,11 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b } writer.startRow() - writer.setString(RESOURCE_ID, id) - writer.setInstant(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(timestamp)) - writer.setDuration(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) - writer.setInt(RESOURCE_CPU_COUNT, cpuCount) - writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) + writer.setString(resourceID, id) + writer.setInstant(resourceStateTimestamp, Instant.ofEpochMilli(timestamp)) + writer.setDuration(resourceStateDuration, Duration.ofMillis(duration)) + writer.setInt(resourceCpuCount, cpuCount) + writer.setDouble(resourceStateCpuUsage, cpuUsage) writer.endRow() count++ @@ -365,28 +381,32 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b /** * CPU capacity of the machines used by Azure. */ - private val CPU_CAPACITY = 2500.0 + private val cpuCapacity = 2500.0 /** * The interval at which the samples where taken. */ - private val SAMPLE_INTERVAL = Duration.ofMinutes(5) + private val sampleInterval = Duration.ofMinutes(5) /** * The difference in CPU usage for the algorithm to cascade samples. */ - private val SAMPLE_CASCADE_DIFF = 0.1 + private val sampleCascadeDiff = 0.1 - override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map<String, Resource> { + override fun convertResources( + trace: Trace, + writer: TableWriter, + samplingOptions: SamplingOptions?, + ): Map<String, Resource> { val random = samplingOptions?.let { Random(it.seed) } val samplingFraction = samplingOptions?.fraction ?: 1.0 val reader = checkNotNull(trace.getTable(TABLE_RESOURCES)).newReader() - val idCol = reader.resolve(RESOURCE_ID) - val startTimeCol = reader.resolve(RESOURCE_START_TIME) - val stopTimeCol = reader.resolve(RESOURCE_STOP_TIME) - val cpuCountCol = reader.resolve(RESOURCE_CPU_COUNT) - val memCapacityCol = reader.resolve(RESOURCE_MEM_CAPACITY) + val idCol = reader.resolve(resourceID) + val startTimeCol = reader.resolve(resourceStartTime) + val stopTimeCol = reader.resolve(resourceStopTime) + val cpuCountCol = reader.resolve(resourceCpuCount) + val memCapacityCol = reader.resolve(resourceMemCapacity) val selectedVms = mutableMapOf<String, Resource>() @@ -406,33 +426,37 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b val startInstant = Instant.ofEpochMilli(startTime) val stopInstant = Instant.ofEpochMilli(stopTime) - val cpuCapacity = cpuCount * CPU_CAPACITY + val cpuCapacity = cpuCount * cpuCapacity selectedVms.computeIfAbsent(id) { Resource(it, startInstant, stopInstant, cpuCount, cpuCapacity, memCapacity) } writer.startRow() - writer.setString(RESOURCE_ID, id) - writer.setInstant(RESOURCE_START_TIME, startInstant) - writer.setInstant(RESOURCE_STOP_TIME, stopInstant) - writer.setInt(RESOURCE_CPU_COUNT, cpuCount) - writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity) - writer.setDouble(RESOURCE_MEM_CAPACITY, memCapacity) + writer.setString(resourceID, id) + writer.setInstant(resourceStartTime, startInstant) + writer.setInstant(resourceStopTime, stopInstant) + writer.setInt(resourceCpuCount, cpuCount) + writer.setDouble(resourceCpuCapacity, cpuCapacity) + writer.setDouble(resourceMemCapacity, memCapacity) writer.endRow() } return selectedVms } - override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map<String, Resource>): Int { + override fun convertResourceStates( + trace: Trace, + writer: TableWriter, + selected: Map<String, Resource>, + ): Int { val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() val states = HashMap<String, State>() - val sampleInterval = SAMPLE_INTERVAL.toMillis() + val sampleInterval = sampleInterval.toMillis() - val idCol = reader.resolve(RESOURCE_ID) - val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP) - val cpuUsageCol = reader.resolve(RESOURCE_STATE_CPU_USAGE_PCT) + val idCol = reader.resolve(resourceID) + val timestampCol = reader.resolve(resourceStateTimestamp) + val cpuUsageCol = reader.resolve(resourceStateCpuUsagePct) var count = 0 @@ -448,7 +472,7 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b // Check whether the next sample can be cascaded with the current sample: // (1) The CPU usage is almost identical (lower than `SAMPLE_CASCADE_DIFF`) // (2) The interval between both samples is not higher than `SAMPLE_INTERVAL` - if (abs(cpuUsage - state.cpuUsage) <= SAMPLE_CASCADE_DIFF && delta <= sampleInterval) { + if (abs(cpuUsage - state.cpuUsage) <= sampleCascadeDiff && delta <= sampleInterval) { state.time = timestamp state.duration += delta continue @@ -470,7 +494,11 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b return count } - private class State(@JvmField val resource: Resource, @JvmField var cpuUsage: Double, @JvmField var duration: Long) { + private class State( + @JvmField val resource: Resource, + @JvmField var cpuUsage: Double, + @JvmField var duration: Long, + ) { @JvmField var time: Long = resource.startTime.toEpochMilli() private var lastWrite: Long = Long.MIN_VALUE @@ -482,11 +510,11 @@ internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert b lastWrite = time writer.startRow() - writer.setString(RESOURCE_ID, resource.id) - writer.setInstant(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(time)) - writer.setDuration(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) - writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) - writer.setInt(RESOURCE_CPU_COUNT, resource.cpuCount) + writer.setString(resourceID, resource.id) + writer.setInstant(resourceStateTimestamp, Instant.ofEpochMilli(time)) + writer.setDuration(resourceStateDuration, Duration.ofMillis(duration)) + writer.setDouble(resourceStateCpuUsage, cpuUsage) + writer.setInt(resourceCpuCount, resource.cpuCount) writer.endRow() } } diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt index 98b4cdf5..7b7a2a64 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt @@ -67,11 +67,12 @@ internal class QueryCommand : CliktCommand(name = "query", help = "Query workloa /** * Access to the terminal. */ - private val terminal = TerminalBuilder.builder() - .system(false) - .streams(System.`in`, System.out) - .encoding(StandardCharsets.UTF_8) - .build() + private val terminal = + TerminalBuilder.builder() + .system(false) + .streams(System.`in`, System.out) + .encoding(StandardCharsets.UTF_8) + .build() /** * Helper class to print results to console. @@ -119,10 +120,11 @@ internal class QueryCommand : CliktCommand(name = "query", help = "Query workloa var count = 0 val meta: ResultSetMetaData = rs.metaData - val options = mapOf( - Printer.COLUMNS to List(meta.columnCount) { meta.getColumnName(it + 1) }, - Printer.BORDER to "|" - ) + val options = + mapOf( + Printer.COLUMNS to List(meta.columnCount) { meta.getColumnName(it + 1) }, + Printer.BORDER to "|", + ) val data = mutableListOf<Map<String, Any>>() while (rs.next()) { @@ -146,7 +148,10 @@ internal class QueryCommand : CliktCommand(name = "query", help = "Query workloa private class QueryPrinter(private val terminal: Terminal) : DefaultPrinter(null) { override fun terminal(): Terminal = terminal - override fun highlightAndPrint(options: MutableMap<String, Any>, exception: Throwable) { + override fun highlightAndPrint( + options: MutableMap<String, Any>, + exception: Throwable, + ) { if (options.getOrDefault("exception", "stack") == "stack") { exception.printStackTrace() } else { diff --git a/opendc-trace/opendc-trace-wfformat/build.gradle.kts b/opendc-trace/opendc-trace-wfformat/build.gradle.kts index a0e22b16..57313a73 100644 --- a/opendc-trace/opendc-trace-wfformat/build.gradle.kts +++ b/opendc-trace/opendc-trace-wfformat/build.gradle.kts @@ -22,7 +22,7 @@ description = "Support for WfCommons workload traces in OpenDC" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt index e0cbd305..8f84e51f 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReader.kt @@ -81,13 +81,14 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe } ParserLevel.WORKFLOW -> { // Seek for the jobs object in the file - level = if (!seekJobs()) { - ParserLevel.TRACE - } else if (!parser.isExpectedStartArrayToken) { - throw JsonParseException(parser, "Expected array", parser.currentLocation) - } else { - ParserLevel.JOB - } + level = + if (!seekJobs()) { + ParserLevel.TRACE + } else if (!parser.isExpectedStartArrayToken) { + throw JsonParseException(parser, "Expected array", parser.currentLocation) + } else { + ParserLevel.JOB + } } ParserLevel.JOB -> { when (parser.nextToken()) { @@ -108,18 +109,18 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe override fun resolve(name: String): Int { return when (name) { - TASK_ID -> COL_ID - TASK_WORKFLOW_ID -> COL_WORKFLOW_ID - TASK_RUNTIME -> COL_RUNTIME - TASK_REQ_NCPUS -> COL_NPROC - TASK_PARENTS -> COL_PARENTS - TASK_CHILDREN -> COL_CHILDREN + TASK_ID -> colID + TASK_WORKFLOW_ID -> colWorkflowID + TASK_RUNTIME -> colRuntime + TASK_REQ_NCPUS -> colNproc + TASK_PARENTS -> colParents + TASK_CHILDREN -> colChildren else -> -1 } } override fun isNull(index: Int): Boolean { - require(index in 0..COL_CHILDREN) { "Invalid column value" } + require(index in 0..colChildren) { "Invalid column value" } return false } @@ -130,7 +131,7 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe override fun getInt(index: Int): Int { checkActive() return when (index) { - COL_NPROC -> cores + colNproc -> cores else -> throw IllegalArgumentException("Invalid column") } } @@ -150,8 +151,8 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe override fun getString(index: Int): String? { checkActive() return when (index) { - COL_ID -> id - COL_WORKFLOW_ID -> workflowId + colID -> id + colWorkflowID -> workflowId else -> throw IllegalArgumentException("Invalid column") } } @@ -167,25 +168,35 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe override fun getDuration(index: Int): Duration? { checkActive() return when (index) { - COL_RUNTIME -> runtime + colRuntime -> runtime else -> throw IllegalArgumentException("Invalid column") } } - override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + override fun <T> getList( + index: Int, + elementType: Class<T>, + ): List<T>? { throw IllegalArgumentException("Invalid column") } - override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + override fun <T> getSet( + index: Int, + elementType: Class<T>, + ): Set<T>? { checkActive() return when (index) { - COL_PARENTS -> TYPE_PARENTS.convertTo(parents, elementType) - COL_CHILDREN -> TYPE_CHILDREN.convertTo(children, elementType) + colParents -> typeParents.convertTo(parents, elementType) + colChildren -> typeChildren.convertTo(children, elementType) else -> throw IllegalArgumentException("Invalid column") } } - override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + override fun <K, V> getMap( + index: Int, + keyType: Class<K>, + valueType: Class<V>, + ): Map<K, V>? { throw IllegalArgumentException("Invalid column") } @@ -267,7 +278,10 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe } private enum class ParserLevel { - TOP, TRACE, WORKFLOW, JOB + TOP, + TRACE, + WORKFLOW, + JOB, } /** @@ -288,13 +302,13 @@ internal class WfFormatTaskTableReader(private val parser: JsonParser) : TableRe cores = -1 } - private val COL_ID = 0 - private val COL_WORKFLOW_ID = 1 - private val COL_RUNTIME = 3 - private val COL_NPROC = 4 - private val COL_PARENTS = 5 - private val COL_CHILDREN = 6 + private val colID = 0 + private val colWorkflowID = 1 + private val colRuntime = 3 + private val colNproc = 4 + private val colParents = 5 + private val colChildren = 6 - private val TYPE_PARENTS = TableColumnType.Set(TableColumnType.String) - private val TYPE_CHILDREN = TableColumnType.Set(TableColumnType.String) + private val typeParents = TableColumnType.Set(TableColumnType.String) + private val typeChildren = TableColumnType.Set(TableColumnType.String) } diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt index 35fb883a..2178fac6 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt @@ -55,30 +55,41 @@ public class WfFormatTraceFormat : TraceFormat { override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) - override fun getDetails(path: Path, table: String): TableDetails { + override fun getDetails( + path: Path, + table: String, + ): TableDetails { return when (table) { - TABLE_TASKS -> TableDetails( - listOf( - TableColumn(TASK_ID, TableColumnType.String), - TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), - TableColumn(TASK_RUNTIME, TableColumnType.Duration), - TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), - TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), - TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)) + TABLE_TASKS -> + TableDetails( + listOf( + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), + ), ) - ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { + override fun newReader( + path: Path, + table: String, + projection: List<String>?, + ): TableReader { return when (table) { TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile())) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newWriter(path: Path, table: String): TableWriter { + override fun newWriter( + path: Path, + table: String, + ): TableWriter { throw UnsupportedOperationException("Writing not supported for this format") } } diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt index 0560d642..618cdf7d 100644 --- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTaskTableReaderTest.kt @@ -69,11 +69,12 @@ internal class WfFormatTaskTableReaderTest { @Test fun testNoWorkflow() { - val content = """ - { - "name": "eager-nextflow-chameleon" - } - """.trimIndent() + val content = + """ + { + "name": "eager-nextflow-chameleon" + } + """.trimIndent() val parser = factory.createParser(content) val reader = WfFormatTaskTableReader(parser) @@ -88,12 +89,13 @@ internal class WfFormatTaskTableReaderTest { @Test fun testWorkflowArrayType() { - val content = """ - { - "name": "eager-nextflow-chameleon", - "workflow": [] - } - """.trimIndent() + val content = + """ + { + "name": "eager-nextflow-chameleon", + "workflow": [] + } + """.trimIndent() val parser = factory.createParser(content) val reader = WfFormatTaskTableReader(parser) @@ -108,12 +110,13 @@ internal class WfFormatTaskTableReaderTest { @Test fun testWorkflowNullType() { - val content = """ - { - "name": "eager-nextflow-chameleon", - "workflow": null - } - """.trimIndent() + val content = + """ + { + "name": "eager-nextflow-chameleon", + "workflow": null + } + """.trimIndent() val parser = factory.createParser(content) val reader = WfFormatTaskTableReader(parser) @@ -128,14 +131,15 @@ internal class WfFormatTaskTableReaderTest { @Test fun testNoJobs() { - val content = """ - { - "name": "eager-nextflow-chameleon", - "workflow": { + val content = + """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + } } - } - """.trimIndent() + """.trimIndent() val parser = factory.createParser(content) val reader = WfFormatTaskTableReader(parser) @@ -146,12 +150,13 @@ internal class WfFormatTaskTableReaderTest { @Test fun testJobsObjectType() { - val content = """ - { - "name": "eager-nextflow-chameleon", - "workflow": { "jobs": {} } - } - """.trimIndent() + val content = + """ + { + "name": "eager-nextflow-chameleon", + "workflow": { "jobs": {} } + } + """.trimIndent() val parser = factory.createParser(content) val reader = WfFormatTaskTableReader(parser) @@ -162,12 +167,13 @@ internal class WfFormatTaskTableReaderTest { @Test fun testJobsNullType() { - val content = """ - { - "name": "eager-nextflow-chameleon", - "workflow": { "jobs": null } - } - """.trimIndent() + val content = + """ + { + "name": "eager-nextflow-chameleon", + "workflow": { "jobs": null } + } + """.trimIndent() val parser = factory.createParser(content) val reader = WfFormatTaskTableReader(parser) @@ -178,14 +184,15 @@ internal class WfFormatTaskTableReaderTest { @Test fun testJobsInvalidChildType() { - val content = """ - { - "name": "eager-nextflow-chameleon", - "workflow": { - "jobs": [1] + val content = + """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [1] + } } - } - """.trimIndent() + """.trimIndent() val parser = factory.createParser(content) val reader = WfFormatTaskTableReader(parser) @@ -196,18 +203,19 @@ internal class WfFormatTaskTableReaderTest { @Test fun testJobsValidChildType() { - val content = """ - { - "name": "eager-nextflow-chameleon", - "workflow": { - "jobs": [ - { - "name": "test" - } - ] + val content = + """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test" + } + ] + } } - } - """.trimIndent() + """.trimIndent() val parser = factory.createParser(content) val reader = WfFormatTaskTableReader(parser) @@ -220,19 +228,20 @@ internal class WfFormatTaskTableReaderTest { @Test fun testJobsInvalidParents() { - val content = """ - { - "name": "eager-nextflow-chameleon", - "workflow": { - "jobs": [ - { - "name": "test", - "parents": 1, - } - ] + val content = + """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": 1, + } + ] + } } - } - """.trimIndent() + """.trimIndent() val parser = factory.createParser(content) val reader = WfFormatTaskTableReader(parser) @@ -243,19 +252,20 @@ internal class WfFormatTaskTableReaderTest { @Test fun testJobsInvalidParentsItem() { - val content = """ - { - "name": "eager-nextflow-chameleon", - "workflow": { - "jobs": [ - { - "name": "test", - "parents": [1], - } - ] + val content = + """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": [1], + } + ] + } } - } - """.trimIndent() + """.trimIndent() val parser = factory.createParser(content) val reader = WfFormatTaskTableReader(parser) @@ -266,19 +276,20 @@ internal class WfFormatTaskTableReaderTest { @Test fun testJobsValidParents() { - val content = """ - { - "name": "eager-nextflow-chameleon", - "workflow": { - "jobs": [ - { - "name": "test", - "parents": ["1"] - } - ] + val content = + """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": ["1"] + } + ] + } } - } - """.trimIndent() + """.trimIndent() val parser = factory.createParser(content) val reader = WfFormatTaskTableReader(parser) @@ -291,19 +302,20 @@ internal class WfFormatTaskTableReaderTest { @Test fun testJobsInvalidSecondEntry() { - val content = """ - { - "workflow": { - "jobs": [ - { - "name": "test", - "parents": ["1"] - }, - "test" - ] + val content = + """ + { + "workflow": { + "jobs": [ + { + "name": "test", + "parents": ["1"] + }, + "test" + ] + } } - } - """.trimIndent() + """.trimIndent() val parser = factory.createParser(content) val reader = WfFormatTaskTableReader(parser) @@ -315,25 +327,26 @@ internal class WfFormatTaskTableReaderTest { @Test fun testDuplicateJobsArray() { - val content = """ - { - "name": "eager-nextflow-chameleon", - "workflow": { - "jobs": [ - { - "name": "test", - "parents": ["1"] - } - ], - "jobs": [ - { - "name": "test2", - "parents": ["test"] - } - ] + val content = + """ + { + "name": "eager-nextflow-chameleon", + "workflow": { + "jobs": [ + { + "name": "test", + "parents": ["1"] + } + ], + "jobs": [ + { + "name": "test2", + "parents": ["test"] + } + ] + } } - } - """.trimIndent() + """.trimIndent() val parser = factory.createParser(content) val reader = WfFormatTaskTableReader(parser) diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt index 75f4b413..80a9d80e 100644 --- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt @@ -81,7 +81,7 @@ class WfFormatTraceFormatTest { { assertEquals("makebwaindex_mammoth_mt_krause.fasta", reader.getString(TASK_ID)) }, { assertEquals("eager-nextflow-chameleon", reader.getString(TASK_WORKFLOW_ID)) }, { assertEquals(172000, reader.getDuration(TASK_RUNTIME)?.toMillis()) }, - { assertEquals(emptySet<String>(), reader.getSet(TASK_PARENTS, String::class.java)) } + { assertEquals(emptySet<String>(), reader.getSet(TASK_PARENTS, String::class.java)) }, ) assertAll( @@ -89,7 +89,7 @@ class WfFormatTraceFormatTest { { assertEquals("makeseqdict_mammoth_mt_krause.fasta", reader.getString(TASK_ID)) }, { assertEquals("eager-nextflow-chameleon", reader.getString(TASK_WORKFLOW_ID)) }, { assertEquals(175000, reader.getDuration(TASK_RUNTIME)?.toMillis()) }, - { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.getSet(TASK_PARENTS, String::class.java)) } + { assertEquals(setOf("makebwaindex_mammoth_mt_krause.fasta"), reader.getSet(TASK_PARENTS, String::class.java)) }, ) reader.close() diff --git a/opendc-trace/opendc-trace-wtf/build.gradle.kts b/opendc-trace/opendc-trace-wtf/build.gradle.kts index 599087e1..a3119e5e 100644 --- a/opendc-trace/opendc-trace-wtf/build.gradle.kts +++ b/opendc-trace/opendc-trace-wtf/build.gradle.kts @@ -22,7 +22,7 @@ description = "Support for Workflow Trace Format (WTF) traces in OpenDC" -/* Build configuration */ +// Build configuration plugins { `kotlin-library-conventions` } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt index 73c1b8a9..95582388 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt @@ -62,38 +62,38 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) } } - private val COL_ID = 0 - private val COL_WORKFLOW_ID = 1 - private val COL_SUBMIT_TIME = 2 - private val COL_WAIT_TIME = 3 - private val COL_RUNTIME = 4 - private val COL_REQ_NCPUS = 5 - private val COL_PARENTS = 6 - private val COL_CHILDREN = 7 - private val COL_GROUP_ID = 8 - private val COL_USER_ID = 9 - - private val TYPE_PARENTS = TableColumnType.Set(TableColumnType.String) - private val TYPE_CHILDREN = TableColumnType.Set(TableColumnType.String) + private val colID = 0 + private val colWorkflowID = 1 + private val colSubmitTime = 2 + private val colWaitTime = 3 + private val colRuntime = 4 + private val colReqNcpus = 5 + private val colParents = 6 + private val colChildren = 7 + private val colGroupID = 8 + private val colUserID = 9 + + private val typeParents = TableColumnType.Set(TableColumnType.String) + private val typeChildren = TableColumnType.Set(TableColumnType.String) override fun resolve(name: String): Int { return when (name) { - TASK_ID -> COL_ID - TASK_WORKFLOW_ID -> COL_WORKFLOW_ID - TASK_SUBMIT_TIME -> COL_SUBMIT_TIME - TASK_WAIT_TIME -> COL_WAIT_TIME - TASK_RUNTIME -> COL_RUNTIME - TASK_REQ_NCPUS -> COL_REQ_NCPUS - TASK_PARENTS -> COL_PARENTS - TASK_CHILDREN -> COL_CHILDREN - TASK_GROUP_ID -> COL_GROUP_ID - TASK_USER_ID -> COL_USER_ID + TASK_ID -> colID + TASK_WORKFLOW_ID -> colWorkflowID + TASK_SUBMIT_TIME -> colSubmitTime + TASK_WAIT_TIME -> colWaitTime + TASK_RUNTIME -> colRuntime + TASK_REQ_NCPUS -> colReqNcpus + TASK_PARENTS -> colParents + TASK_CHILDREN -> colChildren + TASK_GROUP_ID -> colGroupID + TASK_USER_ID -> colUserID else -> -1 } } override fun isNull(index: Int): Boolean { - require(index in COL_ID..COL_USER_ID) { "Invalid column index" } + require(index in colID..colUserID) { "Invalid column index" } return false } @@ -105,9 +105,9 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_REQ_NCPUS -> record.requestedCpus - COL_GROUP_ID -> record.groupId - COL_USER_ID -> record.userId + colReqNcpus -> record.requestedCpus + colGroupID -> record.groupId + colUserID -> record.userId else -> throw IllegalArgumentException("Invalid column") } } @@ -127,8 +127,8 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) override fun getString(index: Int): String { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_ID -> record.id - COL_WORKFLOW_ID -> record.workflowId + colID -> record.id + colWorkflowID -> record.workflowId else -> throw IllegalArgumentException("Invalid column") } } @@ -140,7 +140,7 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) override fun getInstant(index: Int): Instant { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_SUBMIT_TIME -> record.submitTime + colSubmitTime -> record.submitTime else -> throw IllegalArgumentException("Invalid column") } } @@ -148,26 +148,36 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) override fun getDuration(index: Int): Duration { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_WAIT_TIME -> record.waitTime - COL_RUNTIME -> record.runtime + colWaitTime -> record.waitTime + colRuntime -> record.runtime else -> throw IllegalArgumentException("Invalid column") } } - override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + override fun <T> getList( + index: Int, + elementType: Class<T>, + ): List<T>? { throw IllegalArgumentException("Invalid column") } - override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + override fun <T> getSet( + index: Int, + elementType: Class<T>, + ): Set<T>? { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_PARENTS -> TYPE_PARENTS.convertTo(record.parents, elementType) - COL_CHILDREN -> TYPE_CHILDREN.convertTo(record.children, elementType) + colParents -> typeParents.convertTo(record.parents, elementType) + colChildren -> typeChildren.convertTo(record.children, elementType) else -> throw IllegalArgumentException("Invalid column") } } - override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + override fun <K, V> getMap( + index: Int, + keyType: Class<K>, + valueType: Class<V>, + ): Map<K, V>? { throw IllegalArgumentException("Invalid column") } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index c25b512c..1386d2ef 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -55,27 +55,35 @@ public class WtfTraceFormat : TraceFormat { override fun getTables(path: Path): List<String> = listOf(TABLE_TASKS) - override fun getDetails(path: Path, table: String): TableDetails { + override fun getDetails( + path: Path, + table: String, + ): TableDetails { return when (table) { - TABLE_TASKS -> TableDetails( - listOf( - TableColumn(TASK_ID, TableColumnType.String), - TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), - TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), - TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), - TableColumn(TASK_RUNTIME, TableColumnType.Duration), - TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), - TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), - TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), - TableColumn(TASK_GROUP_ID, TableColumnType.Int), - TableColumn(TASK_USER_ID, TableColumnType.Int) + TABLE_TASKS -> + TableDetails( + listOf( + TableColumn(TASK_ID, TableColumnType.String), + TableColumn(TASK_WORKFLOW_ID, TableColumnType.String), + TableColumn(TASK_SUBMIT_TIME, TableColumnType.Instant), + TableColumn(TASK_WAIT_TIME, TableColumnType.Duration), + TableColumn(TASK_RUNTIME, TableColumnType.Duration), + TableColumn(TASK_REQ_NCPUS, TableColumnType.Int), + TableColumn(TASK_PARENTS, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_CHILDREN, TableColumnType.Set(TableColumnType.String)), + TableColumn(TASK_GROUP_ID, TableColumnType.Int), + TableColumn(TASK_USER_ID, TableColumnType.Int), + ), ) - ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List<String>?): TableReader { + override fun newReader( + path: Path, + table: String, + projection: List<String>?, + ): TableReader { return when (table) { TABLE_TASKS -> { val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection), strictTyping = false) @@ -85,7 +93,10 @@ public class WtfTraceFormat : TraceFormat { } } - override fun newWriter(path: Path, table: String): TableWriter { + override fun newWriter( + path: Path, + table: String, + ): TableWriter { throw UnsupportedOperationException("Writing not supported for this format") } } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt index 71557f96..a1db0cab 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt @@ -38,5 +38,5 @@ internal data class Task( val groupId: Int, val userId: Int, val parents: Set<String>, - val children: Set<String> + val children: Set<String>, ) diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt index 33be38d4..1f9c506d 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt @@ -51,18 +51,19 @@ internal class TaskReadSupport(private val projection: List<String>?) : ReadSupp /** * Mapping of table columns to their Parquet column names. */ - private val colMap = mapOf( - TASK_ID to "id", - TASK_WORKFLOW_ID to "workflow_id", - TASK_SUBMIT_TIME to "ts_submit", - TASK_WAIT_TIME to "wait_time", - TASK_RUNTIME to "runtime", - TASK_REQ_NCPUS to "resource_amount_requested", - TASK_PARENTS to "parents", - TASK_CHILDREN to "children", - TASK_GROUP_ID to "group_id", - TASK_USER_ID to "user_id" - ) + private val colMap = + mapOf( + TASK_ID to "id", + TASK_WORKFLOW_ID to "workflow_id", + TASK_SUBMIT_TIME to "ts_submit", + TASK_WAIT_TIME to "wait_time", + TASK_RUNTIME to "runtime", + TASK_REQ_NCPUS to "resource_amount_requested", + TASK_PARENTS to "parents", + TASK_CHILDREN to "children", + TASK_GROUP_ID to "group_id", + TASK_USER_ID to "user_id", + ) override fun init(context: InitContext): ReadContext { val projectedSchema = @@ -87,7 +88,7 @@ internal class TaskReadSupport(private val projection: List<String>?) : ReadSupp configuration: Configuration, keyValueMetaData: Map<String, String>, fileSchema: MessageType, - readContext: ReadContext + readContext: ReadContext, ): RecordMaterializer<Task> = TaskRecordMaterializer(readContext.requestedSchema) companion object { @@ -95,52 +96,53 @@ internal class TaskReadSupport(private val projection: List<String>?) : ReadSupp * Parquet read schema for the "tasks" table in the trace. */ @JvmStatic - val READ_SCHEMA: MessageType = Types.buildMessage() - .addFields( - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("id"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("workflow_id"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("ts_submit"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("wait_time"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT64) - .named("runtime"), - Types - .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("resource_amount_requested"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT32) - .named("user_id"), - Types - .optional(PrimitiveType.PrimitiveTypeName.INT32) - .named("group_id"), - Types - .buildGroup(Type.Repetition.OPTIONAL) - .addField( - Types.repeatedGroup() - .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) - .named("list") - ) - .`as`(LogicalTypeAnnotation.listType()) - .named("children"), - Types - .buildGroup(Type.Repetition.OPTIONAL) - .addField( - Types.repeatedGroup() - .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) - .named("list") - ) - .`as`(LogicalTypeAnnotation.listType()) - .named("parents") - ) - .named("task") + val READ_SCHEMA: MessageType = + Types.buildMessage() + .addFields( + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("workflow_id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("ts_submit"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("wait_time"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("runtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("resource_amount_requested"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("user_id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("group_id"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("children"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) + .named("list"), + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("parents"), + ) + .named("task") } } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt index 055be0c3..412a4f8b 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt @@ -39,102 +39,113 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer< /** * State of current record being read. */ - private var _id = "" - private var _workflowId = "" - private var _submitTime = Instant.MIN - private var _waitTime = Duration.ZERO - private var _runtime = Duration.ZERO - private var _requestedCpus = 0 - private var _groupId = 0 - private var _userId = 0 - private var _parents = mutableSetOf<String>() - private var _children = mutableSetOf<String>() + private var localID = "" + private var localWorkflowID = "" + private var localSubmitTime = Instant.MIN + private var localWaitTime = Duration.ZERO + private var localRuntime = Duration.ZERO + private var localRequestedCpus = 0 + private var localGroupId = 0 + private var localUserId = 0 + private var localParents = mutableSetOf<String>() + private var localChildren = mutableSetOf<String>() /** * Root converter for the record. */ - private val root = object : GroupConverter() { - /** - * The converters for the columns of the schema. - */ - private val converters = schema.fields.map { type -> - when (type.name) { - "id" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - _id = value.toString() + private val root = + object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = + schema.fields.map { type -> + when (type.name) { + "id" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localID = value.toString() + } + } + "workflow_id" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localWorkflowID = value.toString() + } + } + "ts_submit" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localSubmitTime = Instant.ofEpochMilli(value) + } + } + "wait_time" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localWaitTime = Duration.ofMillis(value) + } + } + "runtime" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localRuntime = Duration.ofMillis(value) + } + } + "resource_amount_requested" -> + object : PrimitiveConverter() { + override fun addDouble(value: Double) { + localRequestedCpus = value.roundToInt() + } + } + "group_id" -> + object : PrimitiveConverter() { + override fun addInt(value: Int) { + localGroupId = value + } + } + "user_id" -> + object : PrimitiveConverter() { + override fun addInt(value: Int) { + localUserId = value + } + } + "children" -> RelationConverter(localChildren) + "parents" -> RelationConverter(localParents) + else -> error("Unknown column $type") } } - "workflow_id" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - _workflowId = value.toString() - } - } - "ts_submit" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - _submitTime = Instant.ofEpochMilli(value) - } - } - "wait_time" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - _waitTime = Duration.ofMillis(value) - } - } - "runtime" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - _runtime = Duration.ofMillis(value) - } - } - "resource_amount_requested" -> object : PrimitiveConverter() { - override fun addDouble(value: Double) { - _requestedCpus = value.roundToInt() - } - } - "group_id" -> object : PrimitiveConverter() { - override fun addInt(value: Int) { - _groupId = value - } - } - "user_id" -> object : PrimitiveConverter() { - override fun addInt(value: Int) { - _userId = value - } - } - "children" -> RelationConverter(_children) - "parents" -> RelationConverter(_parents) - else -> error("Unknown column $type") - } - } - override fun start() { - _id = "" - _workflowId = "" - _submitTime = Instant.MIN - _waitTime = Duration.ZERO - _runtime = Duration.ZERO - _requestedCpus = 0 - _groupId = 0 - _userId = 0 - _parents.clear() - _children.clear() - } + override fun start() { + localID = "" + localWorkflowID = "" + localSubmitTime = Instant.MIN + localWaitTime = Duration.ZERO + localRuntime = Duration.ZERO + localRequestedCpus = 0 + localGroupId = 0 + localUserId = 0 + localParents.clear() + localChildren.clear() + } - override fun end() {} + override fun end() {} - override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] - } + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } - override fun getCurrentRecord(): Task = Task( - _id, - _workflowId, - _submitTime, - _waitTime, - _runtime, - _requestedCpus, - _groupId, - _userId, - _parents.toSet(), - _children.toSet() - ) + override fun getCurrentRecord(): Task = + Task( + localID, + localWorkflowID, + localSubmitTime, + localWaitTime, + localRuntime, + localRequestedCpus, + localGroupId, + localUserId, + localParents.toSet(), + localChildren.toSet(), + ) override fun getRootConverter(): GroupConverter = root @@ -142,25 +153,28 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer< * Helper class to convert parent and child relations and add them to [relations]. */ private class RelationConverter(private val relations: MutableSet<String>) : GroupConverter() { - private val entryConverter = object : PrimitiveConverter() { - override fun addLong(value: Long) { - relations.add(value.toString()) - } + private val entryConverter = + object : PrimitiveConverter() { + override fun addLong(value: Long) { + relations.add(value.toString()) + } - override fun addDouble(value: Double) { - relations.add(value.roundToLong().toString()) + override fun addDouble(value: Double) { + relations.add(value.roundToLong().toString()) + } } - } - private val listConverter = object : GroupConverter() { - override fun getConverter(fieldIndex: Int): Converter { - require(fieldIndex == 0) - return entryConverter - } + private val listConverter = + object : GroupConverter() { + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return entryConverter + } - override fun start() {} - override fun end() {} - } + override fun start() {} + + override fun end() {} + } override fun getConverter(fieldIndex: Int): Converter { require(fieldIndex == 0) @@ -168,6 +182,7 @@ internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer< } override fun start() {} + override fun end() {} } } diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index 0457098c..ad49cce0 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -87,9 +87,9 @@ class WtfTraceFormatTest { { assertEquals( setOf("584055316413447529", "133113685133695608", "1008582348422865408"), - reader.getSet(TASK_PARENTS, String::class.java) + reader.getSet(TASK_PARENTS, String::class.java), ) - } + }, ) assertAll( @@ -101,9 +101,9 @@ class WtfTraceFormatTest { { assertEquals( setOf("584055316413447529", "133113685133695608", "1008582348422865408"), - reader.getSet(TASK_PARENTS, String::class.java) + reader.getSet(TASK_PARENTS, String::class.java), ) - } + }, ) reader.close() |
