summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-opendc/src
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2024-03-05 13:23:57 +0100
committerGitHub <noreply@github.com>2024-03-05 13:23:57 +0100
commit5864cbcbfe2eb8c36ca05c3a39c7e5916aeecaec (patch)
tree5b2773b8dc21c2e1b526fb70f829c376dd80532a /opendc-trace/opendc-trace-opendc/src
parentd28002a3c151d198298574312f32f1cb43f3a660 (diff)
Updated package versions, updated web server tests. (#207)
* Updated all package versions including kotlin. Updated all web-server tests to run. * Changed the java version of the tests. OpenDC now only supports java 19. * small update * test update * new update * updated docker version to 19 * updated docker version to 19
Diffstat (limited to 'opendc-trace/opendc-trace-opendc/src')
-rw-r--r--opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt6
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt43
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt78
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt58
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt146
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt66
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt150
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt124
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt2
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt140
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt132
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt2
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt119
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt114
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt48
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt56
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt106
17 files changed, 794 insertions, 596 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt b/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt
index e504cf2f..e179e261 100644
--- a/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt
+++ b/opendc-trace/opendc-trace-opendc/src/jmh/kotlin/org/opendc/trace/opendc/OdcVmTraceBenchmarks.kt
@@ -23,10 +23,10 @@
package org.opendc.trace.opendc
import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
-import org.opendc.trace.conv.RESOURCE_ID
import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS
import org.opendc.trace.conv.TABLE_RESOURCES
import org.opendc.trace.conv.TABLE_RESOURCE_STATES
+import org.opendc.trace.conv.resourceID
import org.opendc.trace.spi.TraceFormat
import org.openjdk.jmh.annotations.Benchmark
import org.openjdk.jmh.annotations.Fork
@@ -60,7 +60,7 @@ class OdcVmTraceBenchmarks {
fun benchmarkResourcesReader(bh: Blackhole) {
val reader = format.newReader(path, TABLE_RESOURCES, null)
try {
- val idColumn = reader.resolve(RESOURCE_ID)
+ val idColumn = reader.resolve(resourceID)
while (reader.nextRow()) {
bh.consume(reader.getString(idColumn))
}
@@ -73,7 +73,7 @@ class OdcVmTraceBenchmarks {
fun benchmarkResourceStatesReader(bh: Blackhole) {
val reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
try {
- val idColumn = reader.resolve(RESOURCE_ID)
+ val idColumn = reader.resolve(resourceID)
while (reader.nextRow()) {
bh.consume(reader.getString(idColumn))
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt
index 3e1fca06..7bf48f1a 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableReader.kt
@@ -65,24 +65,24 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser)
}
}
- private val COL_MEMBERS = 0
- private val COL_TARGET = 1
- private val COL_SCORE = 2
+ private val colMembers = 0
+ private val colTarget = 1
+ private val colScore = 2
- private val TYPE_MEMBERS = TableColumnType.Set(TableColumnType.String)
+ private val typeMembers = TableColumnType.Set(TableColumnType.String)
override fun resolve(name: String): Int {
return when (name) {
- INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS
- INTERFERENCE_GROUP_TARGET -> COL_TARGET
- INTERFERENCE_GROUP_SCORE -> COL_SCORE
+ INTERFERENCE_GROUP_MEMBERS -> colMembers
+ INTERFERENCE_GROUP_TARGET -> colTarget
+ INTERFERENCE_GROUP_SCORE -> colScore
else -> -1
}
}
override fun isNull(index: Int): Boolean {
return when (index) {
- COL_MEMBERS, COL_TARGET, COL_SCORE -> false
+ colMembers, colTarget, colScore -> false
else -> throw IllegalArgumentException("Invalid column index $index")
}
}
@@ -106,8 +106,8 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser)
override fun getDouble(index: Int): Double {
checkActive()
return when (index) {
- COL_TARGET -> targetLoad
- COL_SCORE -> score
+ colTarget -> targetLoad
+ colScore -> score
else -> throw IllegalArgumentException("Invalid column $index")
}
}
@@ -128,19 +128,29 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser)
throw IllegalArgumentException("Invalid column $index")
}
- override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
throw IllegalArgumentException("Invalid column $index")
}
- override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
checkActive()
return when (index) {
- COL_MEMBERS -> TYPE_MEMBERS.convertTo(members, elementType)
+ colMembers -> typeMembers.convertTo(members, elementType)
else -> throw IllegalArgumentException("Invalid column $index")
}
}
- override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
throw IllegalArgumentException("Invalid column $index")
}
@@ -196,7 +206,10 @@ internal class OdcVmInterferenceJsonTableReader(private val parser: JsonParser)
/**
* Parse the members of a group.
*/
- private fun parseGroupMembers(parser: JsonParser, members: MutableSet<String>) {
+ private fun parseGroupMembers(
+ parser: JsonParser,
+ members: MutableSet<String>,
+ ) {
if (!parser.isExpectedStartArrayToken) {
throw JsonParseException(parser, "Expected array for group members, but got ${parser.currentToken()}")
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt
index c6905c5b..93f5a976 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmInterferenceJsonTableWriter.kt
@@ -70,70 +70,106 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener
override fun resolve(name: String): Int {
return when (name) {
- INTERFERENCE_GROUP_MEMBERS -> COL_MEMBERS
- INTERFERENCE_GROUP_TARGET -> COL_TARGET
- INTERFERENCE_GROUP_SCORE -> COL_SCORE
+ INTERFERENCE_GROUP_MEMBERS -> colMembers
+ INTERFERENCE_GROUP_TARGET -> colTarget
+ INTERFERENCE_GROUP_SCORE -> colScore
else -> -1
}
}
- override fun setBoolean(index: Int, value: Boolean) {
+ override fun setBoolean(
+ index: Int,
+ value: Boolean,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setInt(index: Int, value: Int) {
+ override fun setInt(
+ index: Int,
+ value: Int,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setLong(index: Int, value: Long) {
+ override fun setLong(
+ index: Int,
+ value: Long,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setFloat(index: Int, value: Float) {
+ override fun setFloat(
+ index: Int,
+ value: Float,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setDouble(index: Int, value: Double) {
+ override fun setDouble(
+ index: Int,
+ value: Double,
+ ) {
check(isRowActive) { "No active row" }
when (index) {
- COL_TARGET -> targetLoad = (value as Number).toDouble()
- COL_SCORE -> score = (value as Number).toDouble()
+ colTarget -> targetLoad = (value as Number).toDouble()
+ colScore -> score = (value as Number).toDouble()
else -> throw IllegalArgumentException("Invalid column $index")
}
}
- override fun setString(index: Int, value: String) {
+ override fun setString(
+ index: Int,
+ value: String,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setUUID(index: Int, value: UUID) {
+ override fun setUUID(
+ index: Int,
+ value: UUID,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setInstant(index: Int, value: Instant) {
+ override fun setInstant(
+ index: Int,
+ value: Instant,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun setDuration(index: Int, value: Duration) {
+ override fun setDuration(
+ index: Int,
+ value: Duration,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun <T> setList(index: Int, value: List<T>) {
+ override fun <T> setList(
+ index: Int,
+ value: List<T>,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
- override fun <T> setSet(index: Int, value: Set<T>) {
+ override fun <T> setSet(
+ index: Int,
+ value: Set<T>,
+ ) {
check(isRowActive) { "No active row" }
@Suppress("UNCHECKED_CAST")
when (index) {
- COL_MEMBERS -> members = value as Set<String>
+ colMembers -> members = value as Set<String>
else -> throw IllegalArgumentException("Invalid column index $index")
}
}
- override fun <K, V> setMap(index: Int, value: Map<K, V>) {
+ override fun <K, V> setMap(
+ index: Int,
+ value: Map<K, V>,
+ ) {
throw IllegalArgumentException("Invalid column $index")
}
@@ -146,9 +182,9 @@ internal class OdcVmInterferenceJsonTableWriter(private val generator: JsonGener
generator.close()
}
- private val COL_MEMBERS = 0
- private val COL_TARGET = 1
- private val COL_SCORE = 2
+ private val colMembers = 0
+ private val colTarget = 1
+ private val colScore = 2
private var members = emptySet<String>()
private var targetLoad = Double.POSITIVE_INFINITY
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
index ff9a98d7..8e54f2b0 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
@@ -23,11 +23,11 @@
package org.opendc.trace.opendc
import org.opendc.trace.TableReader
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_DURATION
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateDuration
+import org.opendc.trace.conv.resourceStateTimestamp
import org.opendc.trace.opendc.parquet.ResourceState
import org.opendc.trace.util.parquet.LocalParquetReader
import java.time.Duration
@@ -55,25 +55,25 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
}
}
- private val COL_ID = 0
- private val COL_TIMESTAMP = 1
- private val COL_DURATION = 2
- private val COL_CPU_COUNT = 3
- private val COL_CPU_USAGE = 4
+ private val colID = 0
+ private val colTimestamp = 1
+ private val colDuration = 2
+ private val colCpuCount = 3
+ private val colCpuUsage = 4
override fun resolve(name: String): Int {
return when (name) {
- RESOURCE_ID -> COL_ID
- RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP
- RESOURCE_STATE_DURATION -> COL_DURATION
- RESOURCE_CPU_COUNT -> COL_CPU_COUNT
- RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE
+ resourceID -> colID
+ resourceStateTimestamp -> colTimestamp
+ resourceStateDuration -> colDuration
+ resourceCpuCount -> colCpuCount
+ resourceStateCpuUsage -> colCpuUsage
else -> -1
}
}
override fun isNull(index: Int): Boolean {
- require(index in 0..COL_CPU_USAGE) { "Invalid column index" }
+ require(index in 0..colCpuUsage) { "Invalid column index" }
return false
}
@@ -84,7 +84,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
override fun getInt(index: Int): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_COUNT -> record.cpuCount
+ colCpuCount -> record.cpuCount
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
@@ -100,7 +100,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
override fun getDouble(index: Int): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_USAGE -> record.cpuUsage
+ colCpuUsage -> record.cpuUsage
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
@@ -109,7 +109,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_ID -> record.id
+ colID -> record.id
else -> throw IllegalArgumentException("Invalid column index $index")
}
}
@@ -122,7 +122,7 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_TIMESTAMP -> record.timestamp
+ colTimestamp -> record.timestamp
else -> throw IllegalArgumentException("Invalid column index $index")
}
}
@@ -131,20 +131,30 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_DURATION -> record.duration
+ colDuration -> record.duration
else -> throw IllegalArgumentException("Invalid column index $index")
}
}
- override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
index cf0a401b..01cd13c8 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
@@ -24,11 +24,11 @@ package org.opendc.trace.opendc
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_DURATION
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateDuration
+import org.opendc.trace.conv.resourceStateTimestamp
import org.opendc.trace.opendc.parquet.ResourceState
import java.time.Duration
import java.time.Instant
@@ -41,113 +41,149 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
/**
* The current state for the record that is being written.
*/
- private var _isActive = false
- private var _id: String = ""
- private var _timestamp: Instant = Instant.MIN
- private var _duration: Duration = Duration.ZERO
- private var _cpuCount: Int = 0
- private var _cpuUsage: Double = Double.NaN
+ private var localIsActive = false
+ private var localID: String = ""
+ private var localTimestamp: Instant = Instant.MIN
+ private var localDuration: Duration = Duration.ZERO
+ private var localCpuCount: Int = 0
+ private var localCpuUsage: Double = Double.NaN
override fun startRow() {
- _isActive = true
- _id = ""
- _timestamp = Instant.MIN
- _duration = Duration.ZERO
- _cpuCount = 0
- _cpuUsage = Double.NaN
+ localIsActive = true
+ localID = ""
+ localTimestamp = Instant.MIN
+ localDuration = Duration.ZERO
+ localCpuCount = 0
+ localCpuUsage = Double.NaN
}
override fun endRow() {
- check(_isActive) { "No active row" }
- _isActive = false
+ check(localIsActive) { "No active row" }
+ localIsActive = false
- check(lastId != _id || _timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" }
+ check(lastId != localID || localTimestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" }
- writer.write(ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage))
+ writer.write(ResourceState(localID, localTimestamp, localDuration, localCpuCount, localCpuUsage))
- lastId = _id
- lastTimestamp = _timestamp
+ lastId = localID
+ lastTimestamp = localTimestamp
}
override fun resolve(name: String): Int {
return when (name) {
- RESOURCE_ID -> COL_ID
- RESOURCE_STATE_TIMESTAMP -> COL_TIMESTAMP
- RESOURCE_STATE_DURATION -> COL_DURATION
- RESOURCE_CPU_COUNT -> COL_CPU_COUNT
- RESOURCE_STATE_CPU_USAGE -> COL_CPU_USAGE
+ resourceID -> colID
+ resourceStateTimestamp -> colTimestamp
+ resourceStateDuration -> colDuration
+ resourceCpuCount -> colCpuCount
+ resourceStateCpuUsage -> colCpuUsage
else -> -1
}
}
- override fun setBoolean(index: Int, value: Boolean) {
+ override fun setBoolean(
+ index: Int,
+ value: Boolean,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun setInt(index: Int, value: Int) {
- check(_isActive) { "No active row" }
+ override fun setInt(
+ index: Int,
+ value: Int,
+ ) {
+ check(localIsActive) { "No active row" }
when (index) {
- COL_CPU_COUNT -> _cpuCount = value
+ colCpuCount -> localCpuCount = value
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
- override fun setLong(index: Int, value: Long) {
+ override fun setLong(
+ index: Int,
+ value: Long,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun setFloat(index: Int, value: Float) {
+ override fun setFloat(
+ index: Int,
+ value: Float,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun setDouble(index: Int, value: Double) {
- check(_isActive) { "No active row" }
+ override fun setDouble(
+ index: Int,
+ value: Double,
+ ) {
+ check(localIsActive) { "No active row" }
when (index) {
- COL_CPU_USAGE -> _cpuUsage = value
+ colCpuUsage -> localCpuUsage = value
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
- override fun setString(index: Int, value: String) {
- check(_isActive) { "No active row" }
+ override fun setString(
+ index: Int,
+ value: String,
+ ) {
+ check(localIsActive) { "No active row" }
when (index) {
- COL_ID -> _id = value
+ colID -> localID = value
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
- override fun setUUID(index: Int, value: UUID) {
+ override fun setUUID(
+ index: Int,
+ value: UUID,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun setInstant(index: Int, value: Instant) {
- check(_isActive) { "No active row" }
+ override fun setInstant(
+ index: Int,
+ value: Instant,
+ ) {
+ check(localIsActive) { "No active row" }
when (index) {
- COL_TIMESTAMP -> _timestamp = value
+ colTimestamp -> localTimestamp = value
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
- override fun setDuration(index: Int, value: Duration) {
- check(_isActive) { "No active row" }
+ override fun setDuration(
+ index: Int,
+ value: Duration,
+ ) {
+ check(localIsActive) { "No active row" }
when (index) {
- COL_DURATION -> _duration = value
+ colDuration -> localDuration = value
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
- override fun <T> setList(index: Int, value: List<T>) {
+ override fun <T> setList(
+ index: Int,
+ value: List<T>,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun <T> setSet(index: Int, value: Set<T>) {
+ override fun <T> setSet(
+ index: Int,
+ value: Set<T>,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun <K, V> setMap(index: Int, value: Map<K, V>) {
+ override fun <K, V> setMap(
+ index: Int,
+ value: Map<K, V>,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
@@ -165,9 +201,9 @@ internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<R
private var lastId: String? = null
private var lastTimestamp: Instant = Instant.MAX
- private val COL_ID = 0
- private val COL_TIMESTAMP = 1
- private val COL_DURATION = 2
- private val COL_CPU_COUNT = 3
- private val COL_CPU_USAGE = 4
+ private val colID = 0
+ private val colTimestamp = 1
+ private val colDuration = 2
+ private val colCpuCount = 3
+ private val colCpuUsage = 4
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
index d4613158..195929aa 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
@@ -23,12 +23,12 @@
package org.opendc.trace.opendc
import org.opendc.trace.TableReader
-import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY
-import org.opendc.trace.conv.RESOURCE_START_TIME
-import org.opendc.trace.conv.RESOURCE_STOP_TIME
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStartTime
+import org.opendc.trace.conv.resourceStopTime
import org.opendc.trace.opendc.parquet.Resource
import org.opendc.trace.util.parquet.LocalParquetReader
import java.time.Duration
@@ -56,27 +56,27 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
}
}
- private val COL_ID = 0
- private val COL_START_TIME = 1
- private val COL_STOP_TIME = 2
- private val COL_CPU_COUNT = 3
- private val COL_CPU_CAPACITY = 4
- private val COL_MEM_CAPACITY = 5
+ private val colID = 0
+ private val colStartTime = 1
+ private val colStopTime = 2
+ private val colCpuCount = 3
+ private val colCpuCapacity = 4
+ private val colMemCapacity = 5
override fun resolve(name: String): Int {
return when (name) {
- RESOURCE_ID -> COL_ID
- RESOURCE_START_TIME -> COL_START_TIME
- RESOURCE_STOP_TIME -> COL_STOP_TIME
- RESOURCE_CPU_COUNT -> COL_CPU_COUNT
- RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY
- RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY
+ resourceID -> colID
+ resourceStartTime -> colStartTime
+ resourceStopTime -> colStopTime
+ resourceCpuCount -> colCpuCount
+ resourceCpuCapacity -> colCpuCapacity
+ resourceMemCapacity -> colMemCapacity
else -> -1
}
}
override fun isNull(index: Int): Boolean {
- require(index in 0..COL_MEM_CAPACITY) { "Invalid column index" }
+ require(index in 0..colMemCapacity) { "Invalid column index" }
return false
}
@@ -88,7 +88,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_COUNT -> record.cpuCount
+ colCpuCount -> record.cpuCount
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -105,8 +105,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_CAPACITY -> record.cpuCapacity
- COL_MEM_CAPACITY -> record.memCapacity
+ colCpuCapacity -> record.cpuCapacity
+ colMemCapacity -> record.memCapacity
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -115,7 +115,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_ID -> record.id
+ colID -> record.id
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -128,8 +128,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_START_TIME -> record.startTime
- COL_STOP_TIME -> record.stopTime
+ colStartTime -> record.startTime
+ colStopTime -> record.stopTime
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -138,15 +138,25 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<R
throw IllegalArgumentException("Invalid column")
}
- override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ override fun <T> getList(
+ index: Int,
+ elementType: Class<T>,
+ ): List<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ override fun <T> getSet(
+ index: Int,
+ elementType: Class<T>,
+ ): Set<T>? {
throw IllegalArgumentException("Invalid column")
}
- override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ override fun <K, V> getMap(
+ index: Int,
+ keyType: Class<K>,
+ valueType: Class<V>,
+ ): Map<K, V>? {
throw IllegalArgumentException("Invalid column")
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
index 73a03891..5bbc2f3f 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
@@ -24,12 +24,12 @@ package org.opendc.trace.opendc
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.TableWriter
-import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY
-import org.opendc.trace.conv.RESOURCE_START_TIME
-import org.opendc.trace.conv.RESOURCE_STOP_TIME
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStartTime
+import org.opendc.trace.conv.resourceStopTime
import org.opendc.trace.opendc.parquet.Resource
import java.time.Duration
import java.time.Instant
@@ -42,105 +42,141 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
/**
* The current state for the record that is being written.
*/
- private var _isActive = false
- private var _id: String = ""
- private var _startTime: Instant = Instant.MIN
- private var _stopTime: Instant = Instant.MIN
- private var _cpuCount: Int = 0
- private var _cpuCapacity: Double = Double.NaN
- private var _memCapacity: Double = Double.NaN
+ private var localIsActive = false
+ private var localId: String = ""
+ private var localStartTime: Instant = Instant.MIN
+ private var localStopTime: Instant = Instant.MIN
+ private var localCpuCount: Int = 0
+ private var localCpuCapacity: Double = Double.NaN
+ private var localMemCapacity: Double = Double.NaN
override fun startRow() {
- _isActive = true
- _id = ""
- _startTime = Instant.MIN
- _stopTime = Instant.MIN
- _cpuCount = 0
- _cpuCapacity = Double.NaN
- _memCapacity = Double.NaN
+ localIsActive = true
+ localId = ""
+ localStartTime = Instant.MIN
+ localStopTime = Instant.MIN
+ localCpuCount = 0
+ localCpuCapacity = Double.NaN
+ localMemCapacity = Double.NaN
}
override fun endRow() {
- check(_isActive) { "No active row" }
- _isActive = false
- writer.write(Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity))
+ check(localIsActive) { "No active row" }
+ localIsActive = false
+ writer.write(Resource(localId, localStartTime, localStopTime, localCpuCount, localCpuCapacity, localMemCapacity))
}
override fun resolve(name: String): Int {
return when (name) {
- RESOURCE_ID -> COL_ID
- RESOURCE_START_TIME -> COL_START_TIME
- RESOURCE_STOP_TIME -> COL_STOP_TIME
- RESOURCE_CPU_COUNT -> COL_CPU_COUNT
- RESOURCE_CPU_CAPACITY -> COL_CPU_CAPACITY
- RESOURCE_MEM_CAPACITY -> COL_MEM_CAPACITY
+ resourceID -> colID
+ resourceStartTime -> colStartTime
+ resourceStopTime -> colStopTime
+ resourceCpuCount -> colCpuCount
+ resourceCpuCapacity -> colCpuCapacity
+ resourceMemCapacity -> colMemCapacity
else -> -1
}
}
- override fun setBoolean(index: Int, value: Boolean) {
+ override fun setBoolean(
+ index: Int,
+ value: Boolean,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun setInt(index: Int, value: Int) {
- check(_isActive) { "No active row" }
+ override fun setInt(
+ index: Int,
+ value: Int,
+ ) {
+ check(localIsActive) { "No active row" }
when (index) {
- COL_CPU_COUNT -> _cpuCount = value
+ colCpuCount -> localCpuCount = value
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
- override fun setLong(index: Int, value: Long) {
+ override fun setLong(
+ index: Int,
+ value: Long,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun setFloat(index: Int, value: Float) {
+ override fun setFloat(
+ index: Int,
+ value: Float,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun setDouble(index: Int, value: Double) {
- check(_isActive) { "No active row" }
+ override fun setDouble(
+ index: Int,
+ value: Double,
+ ) {
+ check(localIsActive) { "No active row" }
when (index) {
- COL_CPU_CAPACITY -> _cpuCapacity = value
- COL_MEM_CAPACITY -> _memCapacity = value
+ colCpuCapacity -> localCpuCapacity = value
+ colMemCapacity -> localMemCapacity = value
else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
- override fun setString(index: Int, value: String) {
- check(_isActive) { "No active row" }
+ override fun setString(
+ index: Int,
+ value: String,
+ ) {
+ check(localIsActive) { "No active row" }
when (index) {
- COL_ID -> _id = value
+ colID -> localId = value
else -> throw IllegalArgumentException("Invalid column index $index")
}
}
- override fun setUUID(index: Int, value: UUID) {
+ override fun setUUID(
+ index: Int,
+ value: UUID,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun setInstant(index: Int, value: Instant) {
- check(_isActive) { "No active row" }
+ override fun setInstant(
+ index: Int,
+ value: Instant,
+ ) {
+ check(localIsActive) { "No active row" }
when (index) {
- COL_START_TIME -> _startTime = value
- COL_STOP_TIME -> _stopTime = value
+ colStartTime -> localStartTime = value
+ colStopTime -> localStopTime = value
else -> throw IllegalArgumentException("Invalid column index $index")
}
}
- override fun setDuration(index: Int, value: Duration) {
+ override fun setDuration(
+ index: Int,
+ value: Duration,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun <T> setList(index: Int, value: List<T>) {
+ override fun <T> setList(
+ index: Int,
+ value: List<T>,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun <T> setSet(index: Int, value: Set<T>) {
+ override fun <T> setSet(
+ index: Int,
+ value: Set<T>,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
- override fun <K, V> setMap(index: Int, value: Map<K, V>) {
+ override fun <K, V> setMap(
+ index: Int,
+ value: Map<K, V>,
+ ) {
throw IllegalArgumentException("Invalid column or type [index $index]")
}
@@ -152,10 +188,10 @@ internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resour
writer.close()
}
- private val COL_ID = 0
- private val COL_START_TIME = 1
- private val COL_STOP_TIME = 2
- private val COL_CPU_COUNT = 3
- private val COL_CPU_CAPACITY = 4
- private val COL_MEM_CAPACITY = 5
+ private val colID = 0
+ private val colStartTime = 1
+ private val colStopTime = 2
+ private val colCpuCount = 3
+ private val colCpuCapacity = 4
+ private val colMemCapacity = 5
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index c4790538..9abe872f 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -34,18 +34,18 @@ import org.opendc.trace.TableWriter
import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
-import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY
-import org.opendc.trace.conv.RESOURCE_START_TIME
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_DURATION
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
-import org.opendc.trace.conv.RESOURCE_STOP_TIME
import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS
import org.opendc.trace.conv.TABLE_RESOURCES
import org.opendc.trace.conv.TABLE_RESOURCE_STATES
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStartTime
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateDuration
+import org.opendc.trace.conv.resourceStateTimestamp
+import org.opendc.trace.conv.resourceStopTime
import org.opendc.trace.opendc.parquet.ResourceReadSupport
import org.opendc.trace.opendc.parquet.ResourceStateReadSupport
import org.opendc.trace.opendc.parquet.ResourceStateWriteSupport
@@ -86,39 +86,49 @@ public class OdcVmTraceFormat : TraceFormat {
override fun getTables(path: Path): List<String> = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES, TABLE_INTERFERENCE_GROUPS)
- override fun getDetails(path: Path, table: String): TableDetails {
+ override fun getDetails(
+ path: Path,
+ table: String,
+ ): TableDetails {
return when (table) {
- TABLE_RESOURCES -> TableDetails(
- listOf(
- TableColumn(RESOURCE_ID, TableColumnType.String),
- TableColumn(RESOURCE_START_TIME, TableColumnType.Instant),
- TableColumn(RESOURCE_STOP_TIME, TableColumnType.Instant),
- TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int),
- TableColumn(RESOURCE_CPU_CAPACITY, TableColumnType.Double),
- TableColumn(RESOURCE_MEM_CAPACITY, TableColumnType.Double)
+ TABLE_RESOURCES ->
+ TableDetails(
+ listOf(
+ TableColumn(resourceID, TableColumnType.String),
+ TableColumn(resourceStartTime, TableColumnType.Instant),
+ TableColumn(resourceStopTime, TableColumnType.Instant),
+ TableColumn(resourceCpuCount, TableColumnType.Int),
+ TableColumn(resourceCpuCapacity, TableColumnType.Double),
+ TableColumn(resourceMemCapacity, TableColumnType.Double),
+ ),
)
- )
- TABLE_RESOURCE_STATES -> TableDetails(
- listOf(
- TableColumn(RESOURCE_ID, TableColumnType.String),
- TableColumn(RESOURCE_STATE_TIMESTAMP, TableColumnType.Instant),
- TableColumn(RESOURCE_STATE_DURATION, TableColumnType.Duration),
- TableColumn(RESOURCE_CPU_COUNT, TableColumnType.Int),
- TableColumn(RESOURCE_STATE_CPU_USAGE, TableColumnType.Double)
+ TABLE_RESOURCE_STATES ->
+ TableDetails(
+ listOf(
+ TableColumn(resourceID, TableColumnType.String),
+ TableColumn(resourceStateTimestamp, TableColumnType.Instant),
+ TableColumn(resourceStateDuration, TableColumnType.Duration),
+ TableColumn(resourceCpuCount, TableColumnType.Int),
+ TableColumn(resourceStateCpuUsage, TableColumnType.Double),
+ ),
)
- )
- TABLE_INTERFERENCE_GROUPS -> TableDetails(
- listOf(
- TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)),
- TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double),
- TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double)
+ TABLE_INTERFERENCE_GROUPS ->
+ TableDetails(
+ listOf(
+ TableColumn(INTERFERENCE_GROUP_MEMBERS, TableColumnType.Set(TableColumnType.String)),
+ TableColumn(INTERFERENCE_GROUP_TARGET, TableColumnType.Double),
+ TableColumn(INTERFERENCE_GROUP_SCORE, TableColumnType.Double),
+ ),
)
- )
else -> throw IllegalArgumentException("Table $table not supported")
}
}
- override fun newReader(path: Path, table: String, projection: List<String>?): TableReader {
+ override fun newReader(
+ path: Path,
+ table: String,
+ projection: List<String>?,
+ ): TableReader {
return when (table) {
TABLE_RESOURCES -> {
val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection))
@@ -130,11 +140,12 @@ public class OdcVmTraceFormat : TraceFormat {
}
TABLE_INTERFERENCE_GROUPS -> {
val modelPath = path.resolve("interference-model.json")
- val parser = if (modelPath.exists()) {
- jsonFactory.createParser(modelPath.toFile())
- } else {
- jsonFactory.createParser("[]") // If model does not exist, return empty model
- }
+ val parser =
+ if (modelPath.exists()) {
+ jsonFactory.createParser(modelPath.toFile())
+ } else {
+ jsonFactory.createParser("[]") // If model does not exist, return empty model
+ }
OdcVmInterferenceJsonTableReader(parser)
}
@@ -142,26 +153,31 @@ public class OdcVmTraceFormat : TraceFormat {
}
}
- override fun newWriter(path: Path, table: String): TableWriter {
+ override fun newWriter(
+ path: Path,
+ table: String,
+ ): TableWriter {
return when (table) {
TABLE_RESOURCES -> {
- val writer = LocalParquetWriter.builder(path.resolve("meta.parquet"), ResourceWriteSupport())
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .withPageWriteChecksumEnabled(true)
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- .build()
+ val writer =
+ LocalParquetWriter.builder(path.resolve("meta.parquet"), ResourceWriteSupport())
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withPageWriteChecksumEnabled(true)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .build()
OdcVmResourceTableWriter(writer)
}
TABLE_RESOURCE_STATES -> {
- val writer = LocalParquetWriter.builder(path.resolve("trace.parquet"), ResourceStateWriteSupport())
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .withDictionaryEncoding("id", true)
- .withBloomFilterEnabled("id", true)
- .withPageWriteChecksumEnabled(true)
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- .build()
+ val writer =
+ LocalParquetWriter.builder(path.resolve("trace.parquet"), ResourceStateWriteSupport())
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withDictionaryEncoding("id", true)
+ .withBloomFilterEnabled("id", true)
+ .withPageWriteChecksumEnabled(true)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .build()
OdcVmResourceStateTableWriter(writer)
}
TABLE_INTERFERENCE_GROUPS -> {
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt
index c6db45b5..13eefe72 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt
@@ -33,5 +33,5 @@ internal data class Resource(
val stopTime: Instant,
val cpuCount: Int,
val cpuCapacity: Double,
- val memCapacity: Double
+ val memCapacity: Double,
)
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
index 52911d5f..8bada02e 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
@@ -31,12 +31,12 @@ import org.apache.parquet.schema.MessageType
import org.apache.parquet.schema.PrimitiveType
import org.apache.parquet.schema.Types
import org.opendc.trace.TableColumn
-import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY
-import org.opendc.trace.conv.RESOURCE_START_TIME
-import org.opendc.trace.conv.RESOURCE_STOP_TIME
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStartTime
+import org.opendc.trace.conv.resourceStopTime
/**
* A [ReadSupport] instance for [Resource] objects.
@@ -45,18 +45,19 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read
/**
* Mapping from field names to [TableColumn]s.
*/
- private val fieldMap = mapOf(
- "id" to RESOURCE_ID,
- "submissionTime" to RESOURCE_START_TIME,
- "start_time" to RESOURCE_START_TIME,
- "endTime" to RESOURCE_STOP_TIME,
- "stop_time" to RESOURCE_STOP_TIME,
- "maxCores" to RESOURCE_CPU_COUNT,
- "cpu_count" to RESOURCE_CPU_COUNT,
- "cpu_capacity" to RESOURCE_CPU_CAPACITY,
- "requiredMemory" to RESOURCE_MEM_CAPACITY,
- "mem_capacity" to RESOURCE_MEM_CAPACITY
- )
+ private val fieldMap =
+ mapOf(
+ "id" to resourceID,
+ "submissionTime" to resourceStartTime,
+ "start_time" to resourceStartTime,
+ "endTime" to resourceStopTime,
+ "stop_time" to resourceStopTime,
+ "maxCores" to resourceCpuCount,
+ "cpu_count" to resourceCpuCount,
+ "cpu_capacity" to resourceCpuCapacity,
+ "requiredMemory" to resourceMemCapacity,
+ "mem_capacity" to resourceMemCapacity,
+ )
override fun init(context: InitContext): ReadContext {
val projectedSchema =
@@ -84,7 +85,7 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read
configuration: Configuration,
keyValueMetaData: Map<String, String>,
fileSchema: MessageType,
- readContext: ReadContext
+ readContext: ReadContext,
): RecordMaterializer<Resource> = ResourceRecordMaterializer(readContext.requestedSchema)
companion object {
@@ -92,64 +93,67 @@ internal class ResourceReadSupport(private val projection: List<String>?) : Read
* Parquet read schema (version 2.0) for the "resources" table in the trace.
*/
@JvmStatic
- val READ_SCHEMA_V2_0: MessageType = Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("submissionTime"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("endTime"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("maxCores"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("requiredMemory")
- )
- .named("resource")
+ val READ_SCHEMA_V2_0: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("submissionTime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("endTime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("maxCores"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("requiredMemory"),
+ )
+ .named("resource")
/**
* Parquet read schema (version 2.1) for the "resources" table in the trace.
*/
@JvmStatic
- val READ_SCHEMA_V2_1: MessageType = Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("start_time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("stop_time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_capacity"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("mem_capacity")
- )
- .named("resource")
+ val READ_SCHEMA_V2_1: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("start_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("stop_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ )
+ .named("resource")
/**
* Parquet read schema for the "resources" table in the trace.
*/
@JvmStatic
- val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0
- .union(READ_SCHEMA_V2_1)
+ val READ_SCHEMA: MessageType =
+ READ_SCHEMA_V2_0
+ .union(READ_SCHEMA_V2_1)
}
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt
index 936a684a..6e2afa7a 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt
@@ -37,75 +37,91 @@ internal class ResourceRecordMaterializer(schema: MessageType) : RecordMateriali
/**
* State of current record being read.
*/
- private var _id = ""
- private var _startTime = Instant.MIN
- private var _stopTime = Instant.MIN
- private var _cpuCount = 0
- private var _cpuCapacity = 0.0
- private var _memCapacity = 0.0
+ private var localId = ""
+ private var localStartTime = Instant.MIN
+ private var localStopTime = Instant.MIN
+ private var localCpuCount = 0
+ private var localCpuCapacity = 0.0
+ private var localMemCapacity = 0.0
/**
* Root converter for the record.
*/
- private val root = object : GroupConverter() {
- /**
- * The converters for the columns of the schema.
- */
- private val converters = schema.fields.map { type ->
- when (type.name) {
- "id" -> object : PrimitiveConverter() {
- override fun addBinary(value: Binary) {
- _id = value.toStringUsingUTF8()
- }
- }
- "start_time", "submissionTime" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- _startTime = Instant.ofEpochMilli(value)
- }
- }
- "stop_time", "endTime" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- _stopTime = Instant.ofEpochMilli(value)
- }
- }
- "cpu_count", "maxCores" -> object : PrimitiveConverter() {
- override fun addInt(value: Int) {
- _cpuCount = value
- }
- }
- "cpu_capacity" -> object : PrimitiveConverter() {
- override fun addDouble(value: Double) {
- _cpuCapacity = value
- }
- }
- "mem_capacity", "requiredMemory" -> object : PrimitiveConverter() {
- override fun addDouble(value: Double) {
- _memCapacity = value
- }
+ private val root =
+ object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters =
+ schema.fields.map { type ->
+ when (type.name) {
+ "id" ->
+ object : PrimitiveConverter() {
+ override fun addBinary(value: Binary) {
+ localId = value.toStringUsingUTF8()
+ }
+ }
+ "start_time", "submissionTime" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localStartTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "stop_time", "endTime" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localStopTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "cpu_count", "maxCores" ->
+ object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ localCpuCount = value
+ }
+ }
+ "cpu_capacity" ->
+ object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ localCpuCapacity = value
+ }
+ }
+ "mem_capacity", "requiredMemory" ->
+ object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ localMemCapacity = value
+ }
- override fun addLong(value: Long) {
- _memCapacity = value.toDouble()
+ override fun addLong(value: Long) {
+ localMemCapacity = value.toDouble()
+ }
+ }
+ else -> error("Unknown column $type")
}
}
- else -> error("Unknown column $type")
- }
- }
- override fun start() {
- _id = ""
- _startTime = Instant.MIN
- _stopTime = Instant.MIN
- _cpuCount = 0
- _cpuCapacity = 0.0
- _memCapacity = 0.0
- }
+ override fun start() {
+ localId = ""
+ localStartTime = Instant.MIN
+ localStopTime = Instant.MIN
+ localCpuCount = 0
+ localCpuCapacity = 0.0
+ localMemCapacity = 0.0
+ }
- override fun end() {}
+ override fun end() {}
- override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
- }
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
- override fun getCurrentRecord(): Resource = Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity)
+ override fun getCurrentRecord(): Resource =
+ Resource(
+ localId,
+ localStartTime,
+ localStopTime,
+ localCpuCount,
+ localCpuCapacity,
+ localMemCapacity,
+ )
override fun getRootConverter(): GroupConverter = root
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt
index 9ad58764..483f444c 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt
@@ -30,5 +30,5 @@ internal class ResourceState(
val timestamp: Instant,
val duration: Duration,
val cpuCount: Int,
- val cpuUsage: Double
+ val cpuUsage: Double,
)
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
index 56366cd8..21e206a9 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
@@ -31,11 +31,11 @@ import org.apache.parquet.schema.MessageType
import org.apache.parquet.schema.PrimitiveType
import org.apache.parquet.schema.Types
import org.opendc.trace.TableColumn
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_DURATION
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateDuration
+import org.opendc.trace.conv.resourceStateTimestamp
/**
* A [ReadSupport] instance for [ResourceState] objects.
@@ -44,16 +44,17 @@ internal class ResourceStateReadSupport(private val projection: List<String>?) :
/**
* Mapping from field names to [TableColumn]s.
*/
- private val fieldMap = mapOf(
- "id" to RESOURCE_ID,
- "time" to RESOURCE_STATE_TIMESTAMP,
- "timestamp" to RESOURCE_STATE_TIMESTAMP,
- "duration" to RESOURCE_STATE_DURATION,
- "cores" to RESOURCE_CPU_COUNT,
- "cpu_count" to RESOURCE_CPU_COUNT,
- "cpuUsage" to RESOURCE_STATE_CPU_USAGE,
- "cpu_usage" to RESOURCE_STATE_CPU_USAGE
- )
+ private val fieldMap =
+ mapOf(
+ "id" to resourceID,
+ "time" to resourceStateTimestamp,
+ "timestamp" to resourceStateTimestamp,
+ "duration" to resourceStateDuration,
+ "cores" to resourceCpuCount,
+ "cpu_count" to resourceCpuCount,
+ "cpuUsage" to resourceStateCpuUsage,
+ "cpu_usage" to resourceStateCpuUsage,
+ )
override fun init(context: InitContext): ReadContext {
val projectedSchema =
@@ -81,7 +82,7 @@ internal class ResourceStateReadSupport(private val projection: List<String>?) :
configuration: Configuration,
keyValueMetaData: Map<String, String>,
fileSchema: MessageType,
- readContext: ReadContext
+ readContext: ReadContext,
): RecordMaterializer<ResourceState> = ResourceStateRecordMaterializer(readContext.requestedSchema)
companion object {
@@ -89,53 +90,55 @@ internal class ResourceStateReadSupport(private val projection: List<String>?) :
* Parquet read schema (version 2.0) for the "resource states" table in the trace.
*/
@JvmStatic
- val READ_SCHEMA_V2_0: MessageType = Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("duration"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cores"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpuUsage")
- )
- .named("resource_state")
+ val READ_SCHEMA_V2_0: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cores"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpuUsage"),
+ )
+ .named("resource_state")
/**
* Parquet read schema (version 2.1) for the "resource states" table in the trace.
*/
@JvmStatic
- val READ_SCHEMA_V2_1: MessageType = Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("timestamp"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("duration"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_usage")
- )
- .named("resource_state")
+ val READ_SCHEMA_V2_1: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_usage"),
+ )
+ .named("resource_state")
/**
* Parquet read schema for the "resource states" table in the trace.
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt
index a813a5af..72d24e78 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt
@@ -38,69 +38,77 @@ internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMate
/**
* State of current record being read.
*/
- private var _id = ""
- private var _timestamp = Instant.MIN
- private var _duration = Duration.ZERO
- private var _cpuCount = 0
- private var _cpuUsage = 0.0
+ private var localId = ""
+ private var localTimestamp = Instant.MIN
+ private var localDuration = Duration.ZERO
+ private var localCpuCount = 0
+ private var localCpuUsage = 0.0
/**
* Root converter for the record.
*/
- private val root = object : GroupConverter() {
- /**
- * The converters for the columns of the schema.
- */
- private val converters = schema.fields.map { type ->
- when (type.name) {
- "id" -> object : PrimitiveConverter() {
- override fun addBinary(value: Binary) {
- _id = value.toStringUsingUTF8()
+ private val root =
+ object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters =
+ schema.fields.map { type ->
+ when (type.name) {
+ "id" ->
+ object : PrimitiveConverter() {
+ override fun addBinary(value: Binary) {
+ localId = value.toStringUsingUTF8()
+ }
+ }
+ "timestamp", "time" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localTimestamp = Instant.ofEpochMilli(value)
+ }
+ }
+ "duration" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ localDuration = Duration.ofMillis(value)
+ }
+ }
+ "cpu_count", "cores" ->
+ object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ localCpuCount = value
+ }
+ }
+ "cpu_usage", "cpuUsage" ->
+ object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ localCpuUsage = value
+ }
+ }
+ "flops" ->
+ object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ // Ignore to support v1 format
+ }
+ }
+ else -> error("Unknown column $type")
}
}
- "timestamp", "time" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- _timestamp = Instant.ofEpochMilli(value)
- }
- }
- "duration" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- _duration = Duration.ofMillis(value)
- }
- }
- "cpu_count", "cores" -> object : PrimitiveConverter() {
- override fun addInt(value: Int) {
- _cpuCount = value
- }
- }
- "cpu_usage", "cpuUsage" -> object : PrimitiveConverter() {
- override fun addDouble(value: Double) {
- _cpuUsage = value
- }
- }
- "flops" -> object : PrimitiveConverter() {
- override fun addLong(value: Long) {
- // Ignore to support v1 format
- }
- }
- else -> error("Unknown column $type")
- }
- }
- override fun start() {
- _id = ""
- _timestamp = Instant.MIN
- _duration = Duration.ZERO
- _cpuCount = 0
- _cpuUsage = 0.0
- }
+ override fun start() {
+ localId = ""
+ localTimestamp = Instant.MIN
+ localDuration = Duration.ZERO
+ localCpuCount = 0
+ localCpuUsage = 0.0
+ }
- override fun end() {}
+ override fun end() {}
- override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
- }
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
- override fun getCurrentRecord(): ResourceState = ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage)
+ override fun getCurrentRecord(): ResourceState = ResourceState(localId, localTimestamp, localDuration, localCpuCount, localCpuUsage)
override fun getRootConverter(): GroupConverter = root
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt
index 0bbec4d2..2a6d8c12 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt
@@ -52,7 +52,10 @@ internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() {
write(recordConsumer, record)
}
- private fun write(consumer: RecordConsumer, record: ResourceState) {
+ private fun write(
+ consumer: RecordConsumer,
+ record: ResourceState,
+ ) {
consumer.startMessage()
consumer.startField("id", 0)
@@ -83,26 +86,27 @@ internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() {
* Parquet schema for the "resource states" table in the trace.
*/
@JvmStatic
- val WRITE_SCHEMA: MessageType = Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("timestamp"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("duration"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_usage")
- )
- .named("resource_state")
+ val WRITE_SCHEMA: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_usage"),
+ )
+ .named("resource_state")
}
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt
index cd428754..ed62e2ce 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt
@@ -53,7 +53,10 @@ internal class ResourceWriteSupport : WriteSupport<Resource>() {
write(recordConsumer, record)
}
- private fun write(consumer: RecordConsumer, record: Resource) {
+ private fun write(
+ consumer: RecordConsumer,
+ record: Resource,
+ ) {
consumer.startMessage()
consumer.startField("id", 0)
@@ -88,30 +91,31 @@ internal class ResourceWriteSupport : WriteSupport<Resource>() {
* Parquet schema for the "resources" table in the trace.
*/
@JvmStatic
- val WRITE_SCHEMA: MessageType = Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("start_time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("stop_time"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_capacity"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("mem_capacity")
- )
- .named("resource")
+ val WRITE_SCHEMA: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("start_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("stop_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ )
+ .named("resource")
}
}
diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
index d3c3b35b..c9fa21c3 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
@@ -40,17 +40,17 @@ import org.opendc.trace.TableWriter
import org.opendc.trace.conv.INTERFERENCE_GROUP_MEMBERS
import org.opendc.trace.conv.INTERFERENCE_GROUP_SCORE
import org.opendc.trace.conv.INTERFERENCE_GROUP_TARGET
-import org.opendc.trace.conv.RESOURCE_CPU_CAPACITY
-import org.opendc.trace.conv.RESOURCE_CPU_COUNT
-import org.opendc.trace.conv.RESOURCE_ID
-import org.opendc.trace.conv.RESOURCE_MEM_CAPACITY
-import org.opendc.trace.conv.RESOURCE_START_TIME
-import org.opendc.trace.conv.RESOURCE_STATE_CPU_USAGE
-import org.opendc.trace.conv.RESOURCE_STATE_TIMESTAMP
-import org.opendc.trace.conv.RESOURCE_STOP_TIME
import org.opendc.trace.conv.TABLE_INTERFERENCE_GROUPS
import org.opendc.trace.conv.TABLE_RESOURCES
import org.opendc.trace.conv.TABLE_RESOURCE_STATES
+import org.opendc.trace.conv.resourceCpuCapacity
+import org.opendc.trace.conv.resourceCpuCount
+import org.opendc.trace.conv.resourceID
+import org.opendc.trace.conv.resourceMemCapacity
+import org.opendc.trace.conv.resourceStartTime
+import org.opendc.trace.conv.resourceStateCpuUsage
+import org.opendc.trace.conv.resourceStateTimestamp
+import org.opendc.trace.conv.resourceStopTime
import org.opendc.trace.testkit.TableReaderTestKit
import org.opendc.trace.testkit.TableWriterTestKit
import java.nio.file.Files
@@ -88,19 +88,19 @@ internal class OdcVmTraceFormatTest {
@ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
fun testResources(name: String) {
val path = Paths.get("src/test/resources/$name")
- val reader = format.newReader(path, TABLE_RESOURCES, listOf(RESOURCE_ID, RESOURCE_START_TIME))
+ val reader = format.newReader(path, TABLE_RESOURCES, listOf(resourceID, resourceStartTime))
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.getString(RESOURCE_ID)) },
- { assertEquals(Instant.ofEpochMilli(1376314846000), reader.getInstant(RESOURCE_START_TIME)) },
+ { assertEquals("1019", reader.getString(resourceID)) },
+ { assertEquals(Instant.ofEpochMilli(1376314846000), reader.getInstant(resourceStartTime)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals("1023", reader.getString(RESOURCE_ID)) },
+ { assertEquals("1023", reader.getString(resourceID)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals("1052", reader.getString(RESOURCE_ID)) },
+ { assertEquals("1052", reader.getString(resourceID)) },
{ assertTrue(reader.nextRow()) },
- { assertEquals("1073", reader.getString(RESOURCE_ID)) },
- { assertFalse(reader.nextRow()) }
+ { assertEquals("1073", reader.getString(resourceID)) },
+ { assertFalse(reader.nextRow()) },
)
reader.close()
@@ -112,12 +112,12 @@ internal class OdcVmTraceFormatTest {
val writer = format.newWriter(path, TABLE_RESOURCES)
writer.startRow()
- writer.setString(RESOURCE_ID, "1019")
- writer.setInstant(RESOURCE_START_TIME, Instant.EPOCH)
- writer.setInstant(RESOURCE_STOP_TIME, Instant.EPOCH)
- writer.setInt(RESOURCE_CPU_COUNT, 1)
- writer.setDouble(RESOURCE_CPU_CAPACITY, 1024.0)
- writer.setDouble(RESOURCE_MEM_CAPACITY, 1024.0)
+ writer.setString(resourceID, "1019")
+ writer.setInstant(resourceStartTime, Instant.EPOCH)
+ writer.setInstant(resourceStopTime, Instant.EPOCH)
+ writer.setInt(resourceCpuCount, 1)
+ writer.setDouble(resourceCpuCapacity, 1024.0)
+ writer.setDouble(resourceMemCapacity, 1024.0)
writer.endRow()
writer.close()
@@ -125,13 +125,13 @@ internal class OdcVmTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.getString(RESOURCE_ID)) },
- { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_START_TIME)) },
- { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_STOP_TIME)) },
- { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
- { assertEquals(1024.0, reader.getDouble(RESOURCE_CPU_CAPACITY)) },
- { assertEquals(1024.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) },
- { assertFalse(reader.nextRow()) }
+ { assertEquals("1019", reader.getString(resourceID)) },
+ { assertEquals(Instant.EPOCH, reader.getInstant(resourceStartTime)) },
+ { assertEquals(Instant.EPOCH, reader.getInstant(resourceStopTime)) },
+ { assertEquals(1, reader.getInt(resourceCpuCount)) },
+ { assertEquals(1024.0, reader.getDouble(resourceCpuCapacity)) },
+ { assertEquals(1024.0, reader.getDouble(resourceMemCapacity)) },
+ { assertFalse(reader.nextRow()) },
)
reader.close()
@@ -141,17 +141,18 @@ internal class OdcVmTraceFormatTest {
@ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
fun testSmoke(name: String) {
val path = Paths.get("src/test/resources/$name")
- val reader = format.newReader(
- path,
- TABLE_RESOURCE_STATES,
- listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_CPU_USAGE)
- )
+ val reader =
+ format.newReader(
+ path,
+ TABLE_RESOURCE_STATES,
+ listOf(resourceID, resourceStateTimestamp, resourceStateCpuUsage),
+ )
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.getString(RESOURCE_ID)) },
- { assertEquals(1376314846, reader.getInstant(RESOURCE_STATE_TIMESTAMP)?.epochSecond) },
- { assertEquals(0.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE), 0.01) }
+ { assertEquals("1019", reader.getString(resourceID)) },
+ { assertEquals(1376314846, reader.getInstant(resourceStateTimestamp)?.epochSecond) },
+ { assertEquals(0.0, reader.getDouble(resourceStateCpuUsage), 0.01) },
)
reader.close()
@@ -163,10 +164,10 @@ internal class OdcVmTraceFormatTest {
val writer = format.newWriter(path, TABLE_RESOURCE_STATES)
writer.startRow()
- writer.setString(RESOURCE_ID, "1019")
- writer.setInstant(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH)
- writer.setDouble(RESOURCE_STATE_CPU_USAGE, 23.0)
- writer.setInt(RESOURCE_CPU_COUNT, 1)
+ writer.setString(resourceID, "1019")
+ writer.setInstant(resourceStateTimestamp, Instant.EPOCH)
+ writer.setDouble(resourceStateCpuUsage, 23.0)
+ writer.setInt(resourceCpuCount, 1)
writer.endRow()
writer.close()
@@ -174,11 +175,11 @@ internal class OdcVmTraceFormatTest {
assertAll(
{ assertTrue(reader.nextRow()) },
- { assertEquals("1019", reader.getString(RESOURCE_ID)) },
- { assertEquals(Instant.EPOCH, reader.getInstant(RESOURCE_STATE_TIMESTAMP)) },
- { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
- { assertEquals(23.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE)) },
- { assertFalse(reader.nextRow()) }
+ { assertEquals("1019", reader.getString(resourceID)) },
+ { assertEquals(Instant.EPOCH, reader.getInstant(resourceStateTimestamp)) },
+ { assertEquals(1, reader.getInt(resourceCpuCount)) },
+ { assertEquals(23.0, reader.getDouble(resourceStateCpuUsage)) },
+ { assertFalse(reader.nextRow()) },
)
reader.close()
@@ -187,11 +188,12 @@ internal class OdcVmTraceFormatTest {
@Test
fun testInterferenceGroups() {
val path = Paths.get("src/test/resources/trace-v2.1")
- val reader = format.newReader(
- path,
- TABLE_INTERFERENCE_GROUPS,
- listOf(INTERFERENCE_GROUP_MEMBERS, INTERFERENCE_GROUP_TARGET, INTERFERENCE_GROUP_SCORE)
- )
+ val reader =
+ format.newReader(
+ path,
+ TABLE_INTERFERENCE_GROUPS,
+ listOf(INTERFERENCE_GROUP_MEMBERS, INTERFERENCE_GROUP_TARGET, INTERFERENCE_GROUP_SCORE),
+ )
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -202,7 +204,7 @@ internal class OdcVmTraceFormatTest {
{ assertEquals(setOf("1023", "1052", "1073"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) },
{ assertEquals(0.0, reader.getDouble(INTERFERENCE_GROUP_TARGET)) },
{ assertEquals(0.7133055555552751, reader.getDouble(INTERFERENCE_GROUP_SCORE)) },
- { assertFalse(reader.nextRow()) }
+ { assertFalse(reader.nextRow()) },
)
reader.close()
@@ -247,7 +249,7 @@ internal class OdcVmTraceFormatTest {
{ assertEquals(setOf("a", "b", "d"), reader.getSet(INTERFERENCE_GROUP_MEMBERS, String::class.java)) },
{ assertEquals(0.5, reader.getDouble(INTERFERENCE_GROUP_TARGET)) },
{ assertEquals(0.9, reader.getDouble(INTERFERENCE_GROUP_SCORE)) },
- { assertFalse(reader.nextRow()) }
+ { assertFalse(reader.nextRow()) },
)
reader.close()