From 5864cbcbfe2eb8c36ca05c3a39c7e5916aeecaec Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Tue, 5 Mar 2024 13:23:57 +0100 Subject: Updated package versions, updated web server tests. (#207) * Updated all package versions including kotlin. Updated all web-server tests to run. * Changed the java version of the tests. OpenDC now only supports java 19. * small update * test update * new update * updated docker version to 19 * updated docker version to 19 --- .../opendc/trace/opendc/OdcVmTraceBenchmarks.kt | 6 +- .../opendc/OdcVmInterferenceJsonTableReader.kt | 43 +++--- .../opendc/OdcVmInterferenceJsonTableWriter.kt | 78 ++++++++--- .../trace/opendc/OdcVmResourceStateTableReader.kt | 58 ++++---- .../trace/opendc/OdcVmResourceStateTableWriter.kt | 146 ++++++++++++-------- .../trace/opendc/OdcVmResourceTableReader.kt | 66 +++++---- .../trace/opendc/OdcVmResourceTableWriter.kt | 150 +++++++++++++-------- .../org/opendc/trace/opendc/OdcVmTraceFormat.kt | 124 +++++++++-------- .../org/opendc/trace/opendc/parquet/Resource.kt | 2 +- .../trace/opendc/parquet/ResourceReadSupport.kt | 140 +++++++++---------- .../opendc/parquet/ResourceRecordMaterializer.kt | 132 ++++++++++-------- .../opendc/trace/opendc/parquet/ResourceState.kt | 2 +- .../opendc/parquet/ResourceStateReadSupport.kt | 119 ++++++++-------- .../parquet/ResourceStateRecordMaterializer.kt | 114 ++++++++-------- .../opendc/parquet/ResourceStateWriteSupport.kt | 48 ++++--- .../trace/opendc/parquet/ResourceWriteSupport.kt | 56 ++++---- .../opendc/trace/opendc/OdcVmTraceFormatTest.kt | 106 ++++++++------- 17 files changed, 794 insertions(+), 596 deletions(-) (limited to 'opendc-trace/opendc-trace-opendc/src') diff --git a/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt b/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt index e504cf2f..e179e261 100644 --- a/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt +++ b/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt @@ -23,10 +23,10 @@ package org.opendc.trace.opendc import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE -import org.opendc.trace.conv.RESOURCE_ID import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceID import org.opendc.trace.spi.TraceFormat import org.openjdk.jmh.annotations.Benchmark import org.openjdk.jmh.annotations.Fork @@ -60,7 +60,7 @@ class OdcVmTraceBenchmarks { fun benchmarkResourcesReader(bh: Blackhole) { val reader = format.newReader(path, TABLE_RESOURCES, null) try { - val idColumn = reader.resolve(RESOURCE_ID) + val idColumn = reader.resolve(resourceID) while (reader.nextRow()) { bh.consume(reader.getString(idColumn)) } @@ -73,7 +73,7 @@ class OdcVmTraceBenchmarks { fun benchmarkResourceStatesReader(bh: Blackhole) { val reader = format.newReader(path, TABLE_RESOURCE_STATES, null) try { - val idColumn = reader.resolve(RESOURCE_ID) + val idColumn = reader.resolve(resourceID) while (reader.nextRow()) { bh.consume(reader.getString(idColumn)) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt index 3e1fca06..7bf48f1a 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt @@ -65,24 +65,24 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) } } - private val COL_MEMBERS = 0 - private val COL_TARGET = 1 - private val COL_SCORE = 2 + private val colMembers = 0 + private val colTarget = 1 + private val colScore = 2 - private val TYPE_MEMBERS = TableColumnType.Set(TableColumnType.String) + private val typeMembers = TableColumnType.Set(TableColumnType.String) override fun resolve(name: String): Int { return when (name) { - INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS - INTERFERENCE_GROUP_TARGET -> COL_TARGET - INTERFERENCE_GROUP_SCORE -> COL_SCORE + INTERFERENCE_GROUP_MEMBERS -> colMembers + INTERFERENCE_GROUP_TARGET -> colTarget + INTERFERENCE_GROUP_SCORE -> colScore else -> -1 } } override fun isNull(index: Int): Boolean { return when (index) { - COL_MEMBERS, COL_TARGET, COL_SCORE -> false + colMembers, colTarget, colScore -> false else -> throw IllegalArgumentException("Invalid column index $index") } } @@ -106,8 +106,8 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) override fun getDouble(index: Int): Double { checkActive() return when (index) { - COL_TARGET -> targetLoad - COL_SCORE -> score + colTarget -> targetLoad + colScore -> score else -> throw IllegalArgumentException("Invalid column $index") } } @@ -128,19 +128,29 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) throw IllegalArgumentException("Invalid column $index") } - override fun getList(index: Int, elementType: Class): List? { + override fun getList( + index: Int, + elementType: Class, + ): List? { throw IllegalArgumentException("Invalid column $index") } - override fun getSet(index: Int, elementType: Class): Set? { + override fun getSet( + index: Int, + elementType: Class, + ): Set? { checkActive() return when (index) { - COL_MEMBERS -> TYPE_MEMBERS.convertTo(members, elementType) + colMembers -> typeMembers.convertTo(members, elementType) else -> throw IllegalArgumentException("Invalid column $index") } } - override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { + override fun getMap( + index: Int, + keyType: Class, + valueType: Class, + ): Map? { throw IllegalArgumentException("Invalid column $index") } @@ -196,7 +206,10 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser) /** * Parse the members of a group. */ - private fun parseGroupMembers(parser: JsonParser, members: MutableSet) { + private fun parseGroupMembers( + parser: JsonParser, + members: MutableSet, + ) { if (!parser.isExpectedStartArrayToken) { throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}") } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt index c6905c5b..93f5a976 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt @@ -70,70 +70,106 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener override fun resolve(name: String): Int { return when (name) { - INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS - INTERFERENCE_GROUP_TARGET -> COL_TARGET - INTERFERENCE_GROUP_SCORE -> COL_SCORE + INTERFERENCE_GROUP_MEMBERS -> colMembers + INTERFERENCE_GROUP_TARGET -> colTarget + INTERFERENCE_GROUP_SCORE -> colScore else -> -1 } } - override fun setBoolean(index: Int, value: Boolean) { + override fun setBoolean( + index: Int, + value: Boolean, + ) { throw IllegalArgumentException("Invalid column $index") } - override fun setInt(index: Int, value: Int) { + override fun setInt( + index: Int, + value: Int, + ) { throw IllegalArgumentException("Invalid column $index") } - override fun setLong(index: Int, value: Long) { + override fun setLong( + index: Int, + value: Long, + ) { throw IllegalArgumentException("Invalid column $index") } - override fun setFloat(index: Int, value: Float) { + override fun setFloat( + index: Int, + value: Float, + ) { throw IllegalArgumentException("Invalid column $index") } - override fun setDouble(index: Int, value: Double) { + override fun setDouble( + index: Int, + value: Double, + ) { check(isRowActive) { "No active row" } when (index) { - COL_TARGET -> targetLoad = (value as Number).toDouble() - COL_SCORE -> score = (value as Number).toDouble() + colTarget -> targetLoad = (value as Number).toDouble() + colScore -> score = (value as Number).toDouble() else -> throw IllegalArgumentException("Invalid column $index") } } - override fun setString(index: Int, value: String) { + override fun setString( + index: Int, + value: String, + ) { throw IllegalArgumentException("Invalid column $index") } - override fun setUUID(index: Int, value: UUID) { + override fun setUUID( + index: Int, + value: UUID, + ) { throw IllegalArgumentException("Invalid column $index") } - override fun setInstant(index: Int, value: Instant) { + override fun setInstant( + index: Int, + value: Instant, + ) { throw IllegalArgumentException("Invalid column $index") } - override fun setDuration(index: Int, value: Duration) { + override fun setDuration( + index: Int, + value: Duration, + ) { throw IllegalArgumentException("Invalid column $index") } - override fun setList(index: Int, value: List) { + override fun setList( + index: Int, + value: List, + ) { throw IllegalArgumentException("Invalid column $index") } - override fun setSet(index: Int, value: Set) { + override fun setSet( + index: Int, + value: Set, + ) { check(isRowActive) { "No active row" } @Suppress("UNCHECKED_CAST") when (index) { - COL_MEMBERS -> members = value as Set + colMembers -> members = value as Set else -> throw IllegalArgumentException("Invalid column index $index") } } - override fun setMap(index: Int, value: Map) { + override fun setMap( + index: Int, + value: Map, + ) { throw IllegalArgumentException("Invalid column $index") } @@ -146,9 +182,9 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener generator.close() } - private val COL_MEMBERS = 0 - private val COL_TARGET = 1 - private val COL_SCORE = 2 + private val colMembers = 0 + private val colTarget = 1 + private val colScore = 2 private var members = emptySet() private var targetLoad = Double.POSITIVE_INFINITY diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index ff9a98d7..8e54f2b0 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -23,11 +23,11 @@ package org.opendc.trace.opendc import org.opendc.trace.TableReader -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_DURATION -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateDuration +import org.opendc.trace.conv.resourceStateTimestamp import org.opendc.trace.opendc.parquet.ResourceState import org.opendc.trace.util.parquet.LocalParquetReader import java.time.Duration @@ -55,25 +55,25 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea } } - private val COL_ID = 0 - private val COL_TIMESTAMP = 1 - private val COL_DURATION = 2 - private val COL_CPU_COUNT = 3 - private val COL_CPU_USAGE = 4 + private val colID = 0 + private val colTimestamp = 1 + private val colDuration = 2 + private val colCpuCount = 3 + private val colCpuUsage = 4 override fun resolve(name: String): Int { return when (name) { - RESOURCE_ID -> COL_ID - RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP - RESOURCE_STATE_DURATION -> COL_DURATION - RESOURCE_CPU_COUNT -> COL_CPU_COUNT - RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE + resourceID -> colID + resourceStateTimestamp -> colTimestamp + resourceStateDuration -> colDuration + resourceCpuCount -> colCpuCount + resourceStateCpuUsage -> colCpuUsage else -> -1 } } override fun isNull(index: Int): Boolean { - require(index in 0..COL_CPU_USAGE) { "Invalid column index" } + require(index in 0..colCpuUsage) { "Invalid column index" } return false } @@ -84,7 +84,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea override fun getInt(index: Int): Int { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_CPU_COUNT -> record.cpuCount + colCpuCount -> record.cpuCount else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } @@ -100,7 +100,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_CPU_USAGE -> record.cpuUsage + colCpuUsage -> record.cpuUsage else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } @@ -109,7 +109,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_ID -> record.id + colID -> record.id else -> throw IllegalArgumentException("Invalid column index $index") } } @@ -122,7 +122,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_TIMESTAMP -> record.timestamp + colTimestamp -> record.timestamp else -> throw IllegalArgumentException("Invalid column index $index") } } @@ -131,20 +131,30 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_DURATION -> record.duration + colDuration -> record.duration else -> throw IllegalArgumentException("Invalid column index $index") } } - override fun getList(index: Int, elementType: Class): List? { + override fun getList( + index: Int, + elementType: Class, + ): List? { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun getSet(index: Int, elementType: Class): Set? { + override fun getSet( + index: Int, + elementType: Class, + ): Set? { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { + override fun getMap( + index: Int, + keyType: Class, + valueType: Class, + ): Map? { throw IllegalArgumentException("Invalid column or type [index $index]") } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt index cf0a401b..01cd13c8 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt @@ -24,11 +24,11 @@ package org.opendc.trace.opendc import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.TableWriter -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_DURATION -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateDuration +import org.opendc.trace.conv.resourceStateTimestamp import org.opendc.trace.opendc.parquet.ResourceState import java.time.Duration import java.time.Instant @@ -41,113 +41,149 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter= lastTimestamp) { "Records need to be ordered by (id, timestamp)" } + check(lastId != localID || localTimestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" } - writer.write(ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage)) + writer.write(ResourceState(localID, localTimestamp, localDuration, localCpuCount, localCpuUsage)) - lastId = _id - lastTimestamp = _timestamp + lastId = localID + lastTimestamp = localTimestamp } override fun resolve(name: String): Int { return when (name) { - RESOURCE_ID -> COL_ID - RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP - RESOURCE_STATE_DURATION -> COL_DURATION - RESOURCE_CPU_COUNT -> COL_CPU_COUNT - RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE + resourceID -> colID + resourceStateTimestamp -> colTimestamp + resourceStateDuration -> colDuration + resourceCpuCount -> colCpuCount + resourceStateCpuUsage -> colCpuUsage else -> -1 } } - override fun setBoolean(index: Int, value: Boolean) { + override fun setBoolean( + index: Int, + value: Boolean, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setInt(index: Int, value: Int) { - check(_isActive) { "No active row" } + override fun setInt( + index: Int, + value: Int, + ) { + check(localIsActive) { "No active row" } when (index) { - COL_CPU_COUNT -> _cpuCount = value + colCpuCount -> localCpuCount = value else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } - override fun setLong(index: Int, value: Long) { + override fun setLong( + index: Int, + value: Long, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setFloat(index: Int, value: Float) { + override fun setFloat( + index: Int, + value: Float, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setDouble(index: Int, value: Double) { - check(_isActive) { "No active row" } + override fun setDouble( + index: Int, + value: Double, + ) { + check(localIsActive) { "No active row" } when (index) { - COL_CPU_USAGE -> _cpuUsage = value + colCpuUsage -> localCpuUsage = value else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } - override fun setString(index: Int, value: String) { - check(_isActive) { "No active row" } + override fun setString( + index: Int, + value: String, + ) { + check(localIsActive) { "No active row" } when (index) { - COL_ID -> _id = value + colID -> localID = value else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } - override fun setUUID(index: Int, value: UUID) { + override fun setUUID( + index: Int, + value: UUID, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setInstant(index: Int, value: Instant) { - check(_isActive) { "No active row" } + override fun setInstant( + index: Int, + value: Instant, + ) { + check(localIsActive) { "No active row" } when (index) { - COL_TIMESTAMP -> _timestamp = value + colTimestamp -> localTimestamp = value else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } - override fun setDuration(index: Int, value: Duration) { - check(_isActive) { "No active row" } + override fun setDuration( + index: Int, + value: Duration, + ) { + check(localIsActive) { "No active row" } when (index) { - COL_DURATION -> _duration = value + colDuration -> localDuration = value else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } - override fun setList(index: Int, value: List) { + override fun setList( + index: Int, + value: List, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setSet(index: Int, value: Set) { + override fun setSet( + index: Int, + value: Set, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setMap(index: Int, value: Map) { + override fun setMap( + index: Int, + value: Map, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } @@ -165,9 +201,9 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter COL_ID - RESOURCE_START_TIME -> COL_START_TIME - RESOURCE_STOP_TIME -> COL_STOP_TIME - RESOURCE_CPU_COUNT -> COL_CPU_COUNT - RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY - RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY + resourceID -> colID + resourceStartTime -> colStartTime + resourceStopTime -> colStopTime + resourceCpuCount -> colCpuCount + resourceCpuCapacity -> colCpuCapacity + resourceMemCapacity -> colMemCapacity else -> -1 } } override fun isNull(index: Int): Boolean { - require(index in 0..COL_MEM_CAPACITY) { "Invalid column index" } + require(index in 0..colMemCapacity) { "Invalid column index" } return false } @@ -88,7 +88,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader record.cpuCount + colCpuCount -> record.cpuCount else -> throw IllegalArgumentException("Invalid column") } } @@ -105,8 +105,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader record.cpuCapacity - COL_MEM_CAPACITY -> record.memCapacity + colCpuCapacity -> record.cpuCapacity + colMemCapacity -> record.memCapacity else -> throw IllegalArgumentException("Invalid column") } } @@ -115,7 +115,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader record.id + colID -> record.id else -> throw IllegalArgumentException("Invalid column") } } @@ -128,8 +128,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader record.startTime - COL_STOP_TIME -> record.stopTime + colStartTime -> record.startTime + colStopTime -> record.stopTime else -> throw IllegalArgumentException("Invalid column") } } @@ -138,15 +138,25 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader getList(index: Int, elementType: Class): List? { + override fun getList( + index: Int, + elementType: Class, + ): List? { throw IllegalArgumentException("Invalid column") } - override fun getSet(index: Int, elementType: Class): Set? { + override fun getSet( + index: Int, + elementType: Class, + ): Set? { throw IllegalArgumentException("Invalid column") } - override fun getMap(index: Int, keyType: Class, valueType: Class): Map? { + override fun getMap( + index: Int, + keyType: Class, + valueType: Class, + ): Map? { throw IllegalArgumentException("Invalid column") } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt index 73a03891..5bbc2f3f 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt @@ -24,12 +24,12 @@ package org.opendc.trace.opendc import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.TableWriter -import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY -import org.opendc.trace.conv.RESOURCE_START_TIME -import org.opendc.trace.conv.RESOURCE_STOP_TIME +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStartTime +import org.opendc.trace.conv.resourceStopTime import org.opendc.trace.opendc.parquet.Resource import java.time.Duration import java.time.Instant @@ -42,105 +42,141 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter COL_ID - RESOURCE_START_TIME -> COL_START_TIME - RESOURCE_STOP_TIME -> COL_STOP_TIME - RESOURCE_CPU_COUNT -> COL_CPU_COUNT - RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY - RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY + resourceID -> colID + resourceStartTime -> colStartTime + resourceStopTime -> colStopTime + resourceCpuCount -> colCpuCount + resourceCpuCapacity -> colCpuCapacity + resourceMemCapacity -> colMemCapacity else -> -1 } } - override fun setBoolean(index: Int, value: Boolean) { + override fun setBoolean( + index: Int, + value: Boolean, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setInt(index: Int, value: Int) { - check(_isActive) { "No active row" } + override fun setInt( + index: Int, + value: Int, + ) { + check(localIsActive) { "No active row" } when (index) { - COL_CPU_COUNT -> _cpuCount = value + colCpuCount -> localCpuCount = value else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } - override fun setLong(index: Int, value: Long) { + override fun setLong( + index: Int, + value: Long, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setFloat(index: Int, value: Float) { + override fun setFloat( + index: Int, + value: Float, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setDouble(index: Int, value: Double) { - check(_isActive) { "No active row" } + override fun setDouble( + index: Int, + value: Double, + ) { + check(localIsActive) { "No active row" } when (index) { - COL_CPU_CAPACITY -> _cpuCapacity = value - COL_MEM_CAPACITY -> _memCapacity = value + colCpuCapacity -> localCpuCapacity = value + colMemCapacity -> localMemCapacity = value else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } - override fun setString(index: Int, value: String) { - check(_isActive) { "No active row" } + override fun setString( + index: Int, + value: String, + ) { + check(localIsActive) { "No active row" } when (index) { - COL_ID -> _id = value + colID -> localId = value else -> throw IllegalArgumentException("Invalid column index $index") } } - override fun setUUID(index: Int, value: UUID) { + override fun setUUID( + index: Int, + value: UUID, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setInstant(index: Int, value: Instant) { - check(_isActive) { "No active row" } + override fun setInstant( + index: Int, + value: Instant, + ) { + check(localIsActive) { "No active row" } when (index) { - COL_START_TIME -> _startTime = value - COL_STOP_TIME -> _stopTime = value + colStartTime -> localStartTime = value + colStopTime -> localStopTime = value else -> throw IllegalArgumentException("Invalid column index $index") } } - override fun setDuration(index: Int, value: Duration) { + override fun setDuration( + index: Int, + value: Duration, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setList(index: Int, value: List) { + override fun setList( + index: Int, + value: List, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setSet(index: Int, value: Set) { + override fun setSet( + index: Int, + value: Set, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } - override fun setMap(index: Int, value: Map) { + override fun setMap( + index: Int, + value: Map, + ) { throw IllegalArgumentException("Invalid column or type [index $index]") } @@ -152,10 +188,10 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES, TABLE_INTERFERENCE_GROUPS) - override fun getDetails(path: Path, table: String): TableDetails { + override fun getDetails( + path: Path, + table: String, + ): TableDetails { return when (table) { - TABLE_RESOURCES -> TableDetails( - listOf( - TableColumn(RESOURCE_ID, TableColumnType.String), - TableColumn(RESOURCE_START_TIME, TableColumnType.Instant), - TableColumn(RESOURCE_STOP_TIME, TableColumnType.Instant), - TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int), - TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double), - TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double) + TABLE_RESOURCES -> + TableDetails( + listOf( + TableColumn(resourceID, TableColumnType.String), + TableColumn(resourceStartTime, TableColumnType.Instant), + TableColumn(resourceStopTime, TableColumnType.Instant), + TableColumn(resourceCpuCount, TableColumnType.Int), + TableColumn(resourceCpuCapacity, TableColumnType.Double), + TableColumn(resourceMemCapacity, TableColumnType.Double), + ), ) - ) - TABLE_RESOURCE_STATES -> TableDetails( - listOf( - TableColumn(RESOURCE_ID, TableColumnType.String), - TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant), - TableColumn(RESOURCE_STATE_DURATION, TableColumnType.Duration), - TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int), - TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double) + TABLE_RESOURCE_STATES -> + TableDetails( + listOf( + TableColumn(resourceID, TableColumnType.String), + TableColumn(resourceStateTimestamp, TableColumnType.Instant), + TableColumn(resourceStateDuration, TableColumnType.Duration), + TableColumn(resourceCpuCount, TableColumnType.Int), + TableColumn(resourceStateCpuUsage, TableColumnType.Double), + ), ) - ) - TABLE_INTERFERENCE_GROUPS -> TableDetails( - listOf( - TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)), - TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double), - TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double) + TABLE_INTERFERENCE_GROUPS -> + TableDetails( + listOf( + TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)), + TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double), + TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double), + ), ) - ) else -> throw IllegalArgumentException("Table $table not supported") } } - override fun newReader(path: Path, table: String, projection: List?): TableReader { + override fun newReader( + path: Path, + table: String, + projection: List?, + ): TableReader { return when (table) { TABLE_RESOURCES -> { val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection)) @@ -130,11 +140,12 @@ public class OdcVmTraceFormat : TraceFormat { } TABLE_INTERFERENCE_GROUPS -> { val modelPath = path.resolve("interference-model.json") - val parser = if (modelPath.exists()) { - jsonFactory.createParser(modelPath.toFile()) - } else { - jsonFactory.createParser("[]") // If model does not exist, return empty model - } + val parser = + if (modelPath.exists()) { + jsonFactory.createParser(modelPath.toFile()) + } else { + jsonFactory.createParser("[]") // If model does not exist, return empty model + } OdcVmInterferenceJsonTableReader(parser) } @@ -142,26 +153,31 @@ public class OdcVmTraceFormat : TraceFormat { } } - override fun newWriter(path: Path, table: String): TableWriter { + override fun newWriter( + path: Path, + table: String, + ): TableWriter { return when (table) { TABLE_RESOURCES -> { - val writer = LocalParquetWriter.builder(path.resolve("meta.parquet"), ResourceWriteSupport()) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withPageWriteChecksumEnabled(true) - .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build() + val writer = + LocalParquetWriter.builder(path.resolve("meta.parquet"), ResourceWriteSupport()) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withPageWriteChecksumEnabled(true) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() OdcVmResourceTableWriter(writer) } TABLE_RESOURCE_STATES -> { - val writer = LocalParquetWriter.builder(path.resolve("trace.parquet"), ResourceStateWriteSupport()) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withDictionaryEncoding("id", true) - .withBloomFilterEnabled("id", true) - .withPageWriteChecksumEnabled(true) - .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build() + val writer = + LocalParquetWriter.builder(path.resolve("trace.parquet"), ResourceStateWriteSupport()) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withDictionaryEncoding("id", true) + .withBloomFilterEnabled("id", true) + .withPageWriteChecksumEnabled(true) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build() OdcVmResourceStateTableWriter(writer) } TABLE_INTERFERENCE_GROUPS -> { diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt index c6db45b5..13eefe72 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt @@ -33,5 +33,5 @@ internal data class Resource( val stopTime: Instant, val cpuCount: Int, val cpuCapacity: Double, - val memCapacity: Double + val memCapacity: Double, ) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt index 52911d5f..8bada02e 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt @@ -31,12 +31,12 @@ import org.apache.parquet.schema.MessageType import org.apache.parquet.schema.PrimitiveType import org.apache.parquet.schema.Types import org.opendc.trace.TableColumn -import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY -import org.opendc.trace.conv.RESOURCE_START_TIME -import org.opendc.trace.conv.RESOURCE_STOP_TIME +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStartTime +import org.opendc.trace.conv.resourceStopTime /** * A [ReadSupport] instance for [Resource] objects. @@ -45,18 +45,19 @@ internal class ResourceReadSupport(private val projection: List?) : Read /** * Mapping from field names to [TableColumn]s. */ - private val fieldMap = mapOf( - "id" to RESOURCE_ID, - "submissionTime" to RESOURCE_START_TIME, - "start_time" to RESOURCE_START_TIME, - "endTime" to RESOURCE_STOP_TIME, - "stop_time" to RESOURCE_STOP_TIME, - "maxCores" to RESOURCE_CPU_COUNT, - "cpu_count" to RESOURCE_CPU_COUNT, - "cpu_capacity" to RESOURCE_CPU_CAPACITY, - "requiredMemory" to RESOURCE_MEM_CAPACITY, - "mem_capacity" to RESOURCE_MEM_CAPACITY - ) + private val fieldMap = + mapOf( + "id" to resourceID, + "submissionTime" to resourceStartTime, + "start_time" to resourceStartTime, + "endTime" to resourceStopTime, + "stop_time" to resourceStopTime, + "maxCores" to resourceCpuCount, + "cpu_count" to resourceCpuCount, + "cpu_capacity" to resourceCpuCapacity, + "requiredMemory" to resourceMemCapacity, + "mem_capacity" to resourceMemCapacity, + ) override fun init(context: InitContext): ReadContext { val projectedSchema = @@ -84,7 +85,7 @@ internal class ResourceReadSupport(private val projection: List?) : Read configuration: Configuration, keyValueMetaData: Map, fileSchema: MessageType, - readContext: ReadContext + readContext: ReadContext, ): RecordMaterializer = ResourceRecordMaterializer(readContext.requestedSchema) companion object { @@ -92,64 +93,67 @@ internal class ResourceReadSupport(private val projection: List?) : Read * Parquet read schema (version 2.0) for the "resources" table in the trace. */ @JvmStatic - val READ_SCHEMA_V2_0: MessageType = Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("submissionTime"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("endTime"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("maxCores"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("requiredMemory") - ) - .named("resource") + val READ_SCHEMA_V2_0: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("submissionTime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("endTime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("maxCores"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("requiredMemory"), + ) + .named("resource") /** * Parquet read schema (version 2.1) for the "resources" table in the trace. */ @JvmStatic - val READ_SCHEMA_V2_1: MessageType = Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("start_time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("stop_time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity") - ) - .named("resource") + val READ_SCHEMA_V2_1: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("start_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("stop_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + ) + .named("resource") /** * Parquet read schema for the "resources" table in the trace. */ @JvmStatic - val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0 - .union(READ_SCHEMA_V2_1) + val READ_SCHEMA: MessageType = + READ_SCHEMA_V2_0 + .union(READ_SCHEMA_V2_1) } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt index 936a684a..6e2afa7a 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt @@ -37,75 +37,91 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali /** * State of current record being read. */ - private var _id = "" - private var _startTime = Instant.MIN - private var _stopTime = Instant.MIN - private var _cpuCount = 0 - private var _cpuCapacity = 0.0 - private var _memCapacity = 0.0 + private var localId = "" + private var localStartTime = Instant.MIN + private var localStopTime = Instant.MIN + private var localCpuCount = 0 + private var localCpuCapacity = 0.0 + private var localMemCapacity = 0.0 /** * Root converter for the record. */ - private val root = object : GroupConverter() { - /** - * The converters for the columns of the schema. - */ - private val converters = schema.fields.map { type -> - when (type.name) { - "id" -> object : PrimitiveConverter() { - override fun addBinary(value: Binary) { - _id = value.toStringUsingUTF8() - } - } - "start_time", "submissionTime" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - _startTime = Instant.ofEpochMilli(value) - } - } - "stop_time", "endTime" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - _stopTime = Instant.ofEpochMilli(value) - } - } - "cpu_count", "maxCores" -> object : PrimitiveConverter() { - override fun addInt(value: Int) { - _cpuCount = value - } - } - "cpu_capacity" -> object : PrimitiveConverter() { - override fun addDouble(value: Double) { - _cpuCapacity = value - } - } - "mem_capacity", "requiredMemory" -> object : PrimitiveConverter() { - override fun addDouble(value: Double) { - _memCapacity = value - } + private val root = + object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = + schema.fields.map { type -> + when (type.name) { + "id" -> + object : PrimitiveConverter() { + override fun addBinary(value: Binary) { + localId = value.toStringUsingUTF8() + } + } + "start_time", "submissionTime" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localStartTime = Instant.ofEpochMilli(value) + } + } + "stop_time", "endTime" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localStopTime = Instant.ofEpochMilli(value) + } + } + "cpu_count", "maxCores" -> + object : PrimitiveConverter() { + override fun addInt(value: Int) { + localCpuCount = value + } + } + "cpu_capacity" -> + object : PrimitiveConverter() { + override fun addDouble(value: Double) { + localCpuCapacity = value + } + } + "mem_capacity", "requiredMemory" -> + object : PrimitiveConverter() { + override fun addDouble(value: Double) { + localMemCapacity = value + } - override fun addLong(value: Long) { - _memCapacity = value.toDouble() + override fun addLong(value: Long) { + localMemCapacity = value.toDouble() + } + } + else -> error("Unknown column $type") } } - else -> error("Unknown column $type") - } - } - override fun start() { - _id = "" - _startTime = Instant.MIN - _stopTime = Instant.MIN - _cpuCount = 0 - _cpuCapacity = 0.0 - _memCapacity = 0.0 - } + override fun start() { + localId = "" + localStartTime = Instant.MIN + localStopTime = Instant.MIN + localCpuCount = 0 + localCpuCapacity = 0.0 + localMemCapacity = 0.0 + } - override fun end() {} + override fun end() {} - override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] - } + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } - override fun getCurrentRecord(): Resource = Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity) + override fun getCurrentRecord(): Resource = + Resource( + localId, + localStartTime, + localStopTime, + localCpuCount, + localCpuCapacity, + localMemCapacity, + ) override fun getRootConverter(): GroupConverter = root } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt index 9ad58764..483f444c 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt @@ -30,5 +30,5 @@ internal class ResourceState( val timestamp: Instant, val duration: Duration, val cpuCount: Int, - val cpuUsage: Double + val cpuUsage: Double, ) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt index 56366cd8..21e206a9 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt @@ -31,11 +31,11 @@ import org.apache.parquet.schema.MessageType import org.apache.parquet.schema.PrimitiveType import org.apache.parquet.schema.Types import org.opendc.trace.TableColumn -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_DURATION -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateDuration +import org.opendc.trace.conv.resourceStateTimestamp /** * A [ReadSupport] instance for [ResourceState] objects. @@ -44,16 +44,17 @@ internal class ResourceStateReadSupport(private val projection: List?) : /** * Mapping from field names to [TableColumn]s. */ - private val fieldMap = mapOf( - "id" to RESOURCE_ID, - "time" to RESOURCE_STATE_TIMESTAMP, - "timestamp" to RESOURCE_STATE_TIMESTAMP, - "duration" to RESOURCE_STATE_DURATION, - "cores" to RESOURCE_CPU_COUNT, - "cpu_count" to RESOURCE_CPU_COUNT, - "cpuUsage" to RESOURCE_STATE_CPU_USAGE, - "cpu_usage" to RESOURCE_STATE_CPU_USAGE - ) + private val fieldMap = + mapOf( + "id" to resourceID, + "time" to resourceStateTimestamp, + "timestamp" to resourceStateTimestamp, + "duration" to resourceStateDuration, + "cores" to resourceCpuCount, + "cpu_count" to resourceCpuCount, + "cpuUsage" to resourceStateCpuUsage, + "cpu_usage" to resourceStateCpuUsage, + ) override fun init(context: InitContext): ReadContext { val projectedSchema = @@ -81,7 +82,7 @@ internal class ResourceStateReadSupport(private val projection: List?) : configuration: Configuration, keyValueMetaData: Map, fileSchema: MessageType, - readContext: ReadContext + readContext: ReadContext, ): RecordMaterializer = ResourceStateRecordMaterializer(readContext.requestedSchema) companion object { @@ -89,53 +90,55 @@ internal class ResourceStateReadSupport(private val projection: List?) : * Parquet read schema (version 2.0) for the "resource states" table in the trace. */ @JvmStatic - val READ_SCHEMA_V2_0: MessageType = Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cores"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpuUsage") - ) - .named("resource_state") + val READ_SCHEMA_V2_0: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cores"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpuUsage"), + ) + .named("resource_state") /** * Parquet read schema (version 2.1) for the "resource states" table in the trace. */ @JvmStatic - val READ_SCHEMA_V2_1: MessageType = Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_usage") - ) - .named("resource_state") + val READ_SCHEMA_V2_1: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage"), + ) + .named("resource_state") /** * Parquet read schema for the "resource states" table in the trace. diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt index a813a5af..72d24e78 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt @@ -38,69 +38,77 @@ internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMate /** * State of current record being read. */ - private var _id = "" - private var _timestamp = Instant.MIN - private var _duration = Duration.ZERO - private var _cpuCount = 0 - private var _cpuUsage = 0.0 + private var localId = "" + private var localTimestamp = Instant.MIN + private var localDuration = Duration.ZERO + private var localCpuCount = 0 + private var localCpuUsage = 0.0 /** * Root converter for the record. */ - private val root = object : GroupConverter() { - /** - * The converters for the columns of the schema. - */ - private val converters = schema.fields.map { type -> - when (type.name) { - "id" -> object : PrimitiveConverter() { - override fun addBinary(value: Binary) { - _id = value.toStringUsingUTF8() + private val root = + object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = + schema.fields.map { type -> + when (type.name) { + "id" -> + object : PrimitiveConverter() { + override fun addBinary(value: Binary) { + localId = value.toStringUsingUTF8() + } + } + "timestamp", "time" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localTimestamp = Instant.ofEpochMilli(value) + } + } + "duration" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + localDuration = Duration.ofMillis(value) + } + } + "cpu_count", "cores" -> + object : PrimitiveConverter() { + override fun addInt(value: Int) { + localCpuCount = value + } + } + "cpu_usage", "cpuUsage" -> + object : PrimitiveConverter() { + override fun addDouble(value: Double) { + localCpuUsage = value + } + } + "flops" -> + object : PrimitiveConverter() { + override fun addLong(value: Long) { + // Ignore to support v1 format + } + } + else -> error("Unknown column $type") } } - "timestamp", "time" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - _timestamp = Instant.ofEpochMilli(value) - } - } - "duration" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - _duration = Duration.ofMillis(value) - } - } - "cpu_count", "cores" -> object : PrimitiveConverter() { - override fun addInt(value: Int) { - _cpuCount = value - } - } - "cpu_usage", "cpuUsage" -> object : PrimitiveConverter() { - override fun addDouble(value: Double) { - _cpuUsage = value - } - } - "flops" -> object : PrimitiveConverter() { - override fun addLong(value: Long) { - // Ignore to support v1 format - } - } - else -> error("Unknown column $type") - } - } - override fun start() { - _id = "" - _timestamp = Instant.MIN - _duration = Duration.ZERO - _cpuCount = 0 - _cpuUsage = 0.0 - } + override fun start() { + localId = "" + localTimestamp = Instant.MIN + localDuration = Duration.ZERO + localCpuCount = 0 + localCpuUsage = 0.0 + } - override fun end() {} + override fun end() {} - override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] - } + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } - override fun getCurrentRecord(): ResourceState = ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage) + override fun getCurrentRecord(): ResourceState = ResourceState(localId, localTimestamp, localDuration, localCpuCount, localCpuUsage) override fun getRootConverter(): GroupConverter = root } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt index 0bbec4d2..2a6d8c12 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt @@ -52,7 +52,10 @@ internal class ResourceStateWriteSupport : WriteSupport() { write(recordConsumer, record) } - private fun write(consumer: RecordConsumer, record: ResourceState) { + private fun write( + consumer: RecordConsumer, + record: ResourceState, + ) { consumer.startMessage() consumer.startField("id", 0) @@ -83,26 +86,27 @@ internal class ResourceStateWriteSupport : WriteSupport() { * Parquet schema for the "resource states" table in the trace. */ @JvmStatic - val WRITE_SCHEMA: MessageType = Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("timestamp"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("duration"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_usage") - ) - .named("resource_state") + val WRITE_SCHEMA: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage"), + ) + .named("resource_state") } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt index cd428754..ed62e2ce 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt @@ -53,7 +53,10 @@ internal class ResourceWriteSupport : WriteSupport() { write(recordConsumer, record) } - private fun write(consumer: RecordConsumer, record: Resource) { + private fun write( + consumer: RecordConsumer, + record: Resource, + ) { consumer.startMessage() consumer.startField("id", 0) @@ -88,30 +91,31 @@ internal class ResourceWriteSupport : WriteSupport() { * Parquet schema for the "resources" table in the trace. */ @JvmStatic - val WRITE_SCHEMA: MessageType = Types.buildMessage() - .addFields( - Types - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .`as`(LogicalTypeAnnotation.stringType()) - .named("id"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("start_time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) - .named("stop_time"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT32) - .named("cpu_count"), - Types - .required(PrimitiveType.PrimitiveTypeName.DOUBLE) - .named("cpu_capacity"), - Types - .required(PrimitiveType.PrimitiveTypeName.INT64) - .named("mem_capacity") - ) - .named("resource") + val WRITE_SCHEMA: MessageType = + Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("start_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("stop_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + ) + .named("resource") } } diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index d3c3b35b..c9fa21c3 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -40,17 +40,17 @@ import org.opendc.trace.TableWriter import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET -import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY -import org.opendc.trace.conv.RESOURCE_CPU_COUNT -import org.opendc.trace.conv.RESOURCE_ID -import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY -import org.opendc.trace.conv.RESOURCE_START_TIME -import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE -import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP -import org.opendc.trace.conv.RESOURCE_STOP_TIME import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES +import org.opendc.trace.conv.resourceCpuCapacity +import org.opendc.trace.conv.resourceCpuCount +import org.opendc.trace.conv.resourceID +import org.opendc.trace.conv.resourceMemCapacity +import org.opendc.trace.conv.resourceStartTime +import org.opendc.trace.conv.resourceStateCpuUsage +import org.opendc.trace.conv.resourceStateTimestamp +import org.opendc.trace.conv.resourceStopTime import org.opendc.trace.testkit.TableReaderTestKit import org.opendc.trace.testkit.TableWriterTestKit import java.nio.file.Files @@ -88,19 +88,19 @@ internal class OdcVmTraceFormatTest { @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testResources(name: String) { val path = Paths.get("src/test/resources/$name") - val reader = format.newReader(path, TABLE_RESOURCES, listOf(RESOURCE_ID, RESOURCE_START_TIME)) + val reader = format.newReader(path, TABLE_RESOURCES, listOf(resourceID, resourceStartTime)) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1019", reader.getString(RESOURCE_ID)) }, - { assertEquals(Instant.ofEpochMilli(1376314846000), reader.getInstant(RESOURCE_START_TIME)) }, + { assertEquals("1019", reader.getString(resourceID)) }, + { assertEquals(Instant.ofEpochMilli(1376314846000), reader.getInstant(resourceStartTime)) }, { assertTrue(reader.nextRow()) }, - { assertEquals("1023", reader.getString(RESOURCE_ID)) }, + { assertEquals("1023", reader.getString(resourceID)) }, { assertTrue(reader.nextRow()) }, - { assertEquals("1052", reader.getString(RESOURCE_ID)) }, + { assertEquals("1052", reader.getString(resourceID)) }, { assertTrue(reader.nextRow()) }, - { assertEquals("1073", reader.getString(RESOURCE_ID)) }, - { assertFalse(reader.nextRow()) } + { assertEquals("1073", reader.getString(resourceID)) }, + { assertFalse(reader.nextRow()) }, ) reader.close() @@ -112,12 +112,12 @@ internal class OdcVmTraceFormatTest { val writer = format.newWriter(path, TABLE_RESOURCES) writer.startRow() - writer.setString(RESOURCE_ID, "1019") - writer.setInstant(RESOURCE_START_TIME, Instant.EPOCH) - writer.setInstant(RESOURCE_STOP_TIME, Instant.EPOCH) - writer.setInt(RESOURCE_CPU_COUNT, 1) - writer.setDouble(RESOURCE_CPU_CAPACITY, 1024.0) - writer.setDouble(RESOURCE_MEM_CAPACITY, 1024.0) + writer.setString(resourceID, "1019") + writer.setInstant(resourceStartTime, Instant.EPOCH) + writer.setInstant(resourceStopTime, Instant.EPOCH) + writer.setInt(resourceCpuCount, 1) + writer.setDouble(resourceCpuCapacity, 1024.0) + writer.setDouble(resourceMemCapacity, 1024.0) writer.endRow() writer.close() @@ -125,13 +125,13 @@ internal class OdcVmTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1019", reader.getString(RESOURCE_ID)) }, - { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_START_TIME)) }, - { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_STOP_TIME)) }, - { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, - { assertEquals(1024.0, reader.getDouble(RESOURCE_CPU_CAPACITY)) }, - { assertEquals(1024.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) }, - { assertFalse(reader.nextRow()) } + { assertEquals("1019", reader.getString(resourceID)) }, + { assertEquals(Instant.EPOCH, reader.getInstant(resourceStartTime)) }, + { assertEquals(Instant.EPOCH, reader.getInstant(resourceStopTime)) }, + { assertEquals(1, reader.getInt(resourceCpuCount)) }, + { assertEquals(1024.0, reader.getDouble(resourceCpuCapacity)) }, + { assertEquals(1024.0, reader.getDouble(resourceMemCapacity)) }, + { assertFalse(reader.nextRow()) }, ) reader.close() @@ -141,17 +141,18 @@ internal class OdcVmTraceFormatTest { @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testSmoke(name: String) { val path = Paths.get("src/test/resources/$name") - val reader = format.newReader( - path, - TABLE_RESOURCE_STATES, - listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_CPU_USAGE) - ) + val reader = + format.newReader( + path, + TABLE_RESOURCE_STATES, + listOf(resourceID, resourceStateTimestamp, resourceStateCpuUsage), + ) assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1019", reader.getString(RESOURCE_ID)) }, - { assertEquals(1376314846, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) }, - { assertEquals(0.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) } + { assertEquals("1019", reader.getString(resourceID)) }, + { assertEquals(1376314846, reader.getInstant(resourceStateTimestamp)?.epochSecond) }, + { assertEquals(0.0, reader.getDouble(resourceStateCpuUsage), 0.01) }, ) reader.close() @@ -163,10 +164,10 @@ internal class OdcVmTraceFormatTest { val writer = format.newWriter(path, TABLE_RESOURCE_STATES) writer.startRow() - writer.setString(RESOURCE_ID, "1019") - writer.setInstant(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH) - writer.setDouble(RESOURCE_STATE_CPU_USAGE, 23.0) - writer.setInt(RESOURCE_CPU_COUNT, 1) + writer.setString(resourceID, "1019") + writer.setInstant(resourceStateTimestamp, Instant.EPOCH) + writer.setDouble(resourceStateCpuUsage, 23.0) + writer.setInt(resourceCpuCount, 1) writer.endRow() writer.close() @@ -174,11 +175,11 @@ internal class OdcVmTraceFormatTest { assertAll( { assertTrue(reader.nextRow()) }, - { assertEquals("1019", reader.getString(RESOURCE_ID)) }, - { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_STATE_TIMESTAMP)) }, - { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, - { assertEquals(23.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE)) }, - { assertFalse(reader.nextRow()) } + { assertEquals("1019", reader.getString(resourceID)) }, + { assertEquals(Instant.EPOCH, reader.getInstant(resourceStateTimestamp)) }, + { assertEquals(1, reader.getInt(resourceCpuCount)) }, + { assertEquals(23.0, reader.getDouble(resourceStateCpuUsage)) }, + { assertFalse(reader.nextRow()) }, ) reader.close() @@ -187,11 +188,12 @@ internal class OdcVmTraceFormatTest { @Test fun testInterferenceGroups() { val path = Paths.get("src/test/resources/trace-v2.1") - val reader = format.newReader( - path, - TABLE_INTERFERENCE_GROUPS, - listOf(INTERFERENCE_GROUP_MEMBERS, INTERFERENCE_GROUP_TARGET, INTERFERENCE_GROUP_SCORE) - ) + val reader = + format.newReader( + path, + TABLE_INTERFERENCE_GROUPS, + listOf(INTERFERENCE_GROUP_MEMBERS, INTERFERENCE_GROUP_TARGET, INTERFERENCE_GROUP_SCORE), + ) assertAll( { assertTrue(reader.nextRow()) }, @@ -202,7 +204,7 @@ internal class OdcVmTraceFormatTest { { assertEquals(setOf("1023", "1052", "1073"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) }, { assertEquals(0.0, reader.getDouble(INTERFERENCE_GROUP_TARGET)) }, { assertEquals(0.7133055555552751, reader.getDouble(INTERFERENCE_GROUP_SCORE)) }, - { assertFalse(reader.nextRow()) } + { assertFalse(reader.nextRow()) }, ) reader.close() @@ -247,7 +249,7 @@ internal class OdcVmTraceFormatTest { { assertEquals(setOf("a", "b", "d"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) }, { assertEquals(0.5, reader.getDouble(INTERFERENCE_GROUP_TARGET)) }, { assertEquals(0.9, reader.getDouble(INTERFERENCE_GROUP_SCORE)) }, - { assertFalse(reader.nextRow()) } + { assertFalse(reader.nextRow()) }, ) reader.close() -- cgit v1.2.3