summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-opendc/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-02 16:06:44 +0200
committerGitHub <noreply@github.com>2022-05-02 16:06:44 +0200
commitc78285f6346236053979aa98113ba9e6d7efb21e (patch)
tree44221b3a39516a235a0b41adf525a79a60abb998 /opendc-trace/opendc-trace-opendc/src
parent44ddd27a745f2dfe4b6ffef1b7657d156dd61489 (diff)
parente4d3a8add5388182cf7a12b1099678a0b769b106 (diff)
merge: Add support for SQL via Apache Calcite (#78)
This pull request integrates initial support for SQL queries via Apache Calcite into the OpenDC codebase. Our vision is that users of OpenDC should be able to use SQL queries to access and process most of the experiment data generated by simulations. This pull request moves towards this goal by adding the ability to query workload traces supported by OpenDC using SQL. We also provide a CLI for querying the data in workload traces via `opendc-trace-tools`: ```bash opendc-trace-tools query -i data/bitbrains-small -f opendc-vm "SELECT MAX(cpu_count) FROM resource_states" ``` ## Implementation Notes :hammer_and_pick: * Add Calcite (SQL) integration * Add support for writing via SQL * Add support for writing via SQL * Support custom Parquet ReadSupport implementations * Read records using low-level Parquet API * Do not use Avro when exporting experiment data * Do not use Avro when reading WTF trace * Drop dependency on Avro * Add support for projections ## External Dependencies :four_leaf_clover: * Apache Calcite ## Breaking API Changes :warning: * The existing code for reading Parquet traces using Apache Avro has been removed. * `TraceFormat.newReader` now accepts a nullable `projection` parameter
Diffstat (limited to 'opendc-trace/opendc-trace-opendc/src')
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt74
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt124
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt65
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt117
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt67
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt37
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt147
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt107
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt34
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt139
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt102
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt105
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt114
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt77
14 files changed, 1047 insertions, 262 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
index b82da888..7a01b881 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
@@ -22,38 +22,30 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.ResourceState
import org.opendc.trace.util.parquet.LocalParquetReader
-import java.time.Duration
-import java.time.Instant
/**
* A [TableReader] implementation for the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader<ResourceState>) : TableReader {
/**
* The current record.
*/
- private var record: GenericRecord? = null
-
- /**
- * A flag to indicate that the columns have been initialized.
- */
- private var hasInitializedColumns = false
+ private var record: ResourceState? = null
override fun nextRow(): Boolean {
- val record = reader.read()
- this.record = record
+ try {
+ val record = reader.read()
+ this.record = record
- if (!hasInitializedColumns && record != null) {
- initColumns(record.schema)
- hasInitializedColumns = true
+ return record != null
+ } catch (e: Throwable) {
+ this.record = null
+ throw e
}
-
- return record != null
}
override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
@@ -67,36 +59,36 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_ID -> record[AVRO_COL_ID].toString()
- COL_TIMESTAMP -> Instant.ofEpochMilli(record[AVRO_COL_TIMESTAMP] as Long)
- COL_DURATION -> Duration.ofMillis(record[AVRO_COL_DURATION] as Long)
- COL_CPU_COUNT -> getInt(index)
- COL_CPU_USAGE -> getDouble(index)
- else -> throw IllegalArgumentException("Invalid column")
+ COL_ID -> record.id
+ COL_TIMESTAMP -> record.timestamp
+ COL_DURATION -> record.duration
+ COL_CPU_COUNT -> record.cpuCount
+ COL_CPU_USAGE -> record.cpuUsage
+ else -> throw IllegalArgumentException("Invalid column index $index")
}
}
override fun getBoolean(index: Int): Boolean {
- throw IllegalArgumentException("Invalid column")
+ throw IllegalArgumentException("Invalid column or type [index $index]")
}
override fun getInt(index: Int): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int
- else -> throw IllegalArgumentException("Invalid column")
+ COL_CPU_COUNT -> record.cpuCount
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
override fun getLong(index: Int): Long {
- throw IllegalArgumentException("Invalid column")
+ throw IllegalArgumentException("Invalid column or type [index $index]")
}
override fun getDouble(index: Int): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_USAGE -> (record[AVRO_COL_CPU_USAGE] as Number).toDouble()
- else -> throw IllegalArgumentException("Invalid column")
+ COL_CPU_USAGE -> record.cpuUsage
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
@@ -106,28 +98,6 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
override fun toString(): String = "OdcVmResourceStateTableReader"
- /**
- * Initialize the columns for the reader based on [schema].
- */
- private fun initColumns(schema: Schema) {
- try {
- AVRO_COL_ID = schema.getField("id").pos()
- AVRO_COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos()
- AVRO_COL_DURATION = schema.getField("duration").pos()
- AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
- AVRO_COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
- } catch (e: NullPointerException) {
- // This happens when the field we are trying to access does not exist
- throw IllegalArgumentException("Invalid schema", e)
- }
- }
-
- private var AVRO_COL_ID = -1
- private var AVRO_COL_TIMESTAMP = -1
- private var AVRO_COL_DURATION = -1
- private var AVRO_COL_CPU_COUNT = -1
- private var AVRO_COL_CPU_USAGE = -1
-
private val COL_ID = 0
private val COL_TIMESTAMP = 1
private val COL_DURATION = 2
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
index 01b9750c..97af5b59 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
@@ -22,84 +22,85 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
-import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.ResourceState
import java.time.Duration
import java.time.Instant
/**
* A [TableWriter] implementation for the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceStateTableWriter(
- private val writer: ParquetWriter<GenericRecord>,
- private val schema: Schema
-) : TableWriter {
+internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<ResourceState>) : TableWriter {
/**
- * The current builder for the record that is being written.
+ * The current state for the record that is being written.
*/
- private var builder: GenericRecordBuilder? = null
-
- /**
- * The fields belonging to the resource state schema.
- */
- private val fields = schema.fields
+ private var _isActive = false
+ private var _id: String = ""
+ private var _timestamp: Instant = Instant.MIN
+ private var _duration: Duration = Duration.ZERO
+ private var _cpuCount: Int = 0
+ private var _cpuUsage: Double = Double.NaN
override fun startRow() {
- builder = GenericRecordBuilder(schema)
+ _isActive = true
+ _id = ""
+ _timestamp = Instant.MIN
+ _duration = Duration.ZERO
+ _cpuCount = 0
+ _cpuUsage = Double.NaN
}
override fun endRow() {
- val builder = checkNotNull(builder) { "No active row" }
- this.builder = null
-
- val record = builder.build()
- val id = record[COL_ID] as String
- val timestamp = record[COL_TIMESTAMP] as Long
+ check(_isActive) { "No active row" }
+ _isActive = false
- check(lastId != id || timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" }
+ check(lastId != _id || _timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" }
- writer.write(builder.build())
+ writer.write(ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage))
- lastId = id
- lastTimestamp = timestamp
+ lastId = _id
+ lastTimestamp = _timestamp
}
- override fun resolve(column: TableColumn<*>): Int {
- val schema = schema
- return when (column) {
- RESOURCE_ID -> schema.getField("id").pos()
- RESOURCE_STATE_TIMESTAMP -> (schema.getField("timestamp") ?: schema.getField("time")).pos()
- RESOURCE_STATE_DURATION -> schema.getField("duration").pos()
- RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
- RESOURCE_STATE_CPU_USAGE -> (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
- else -> -1
- }
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
override fun set(index: Int, value: Any) {
- val builder = checkNotNull(builder) { "No active row" }
-
- builder.set(
- fields[index],
- when (index) {
- COL_TIMESTAMP -> (value as Instant).toEpochMilli()
- COL_DURATION -> (value as Duration).toMillis()
- else -> value
- }
- )
+ check(_isActive) { "No active row" }
+
+ when (index) {
+ COL_ID -> _id = value as String
+ COL_TIMESTAMP -> _timestamp = value as Instant
+ COL_DURATION -> _duration = value as Duration
+ COL_CPU_COUNT -> _cpuCount = value as Int
+ COL_CPU_USAGE -> _cpuUsage = value as Double
+ }
}
- override fun setBoolean(index: Int, value: Boolean) = set(index, value)
+ override fun setBoolean(index: Int, value: Boolean) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
- override fun setInt(index: Int, value: Int) = set(index, value)
+ override fun setInt(index: Int, value: Int) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_CPU_COUNT -> _cpuCount = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
- override fun setLong(index: Int, value: Long) = set(index, value)
+ override fun setLong(index: Int, value: Long) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
- override fun setDouble(index: Int, value: Double) = set(index, value)
+ override fun setDouble(index: Int, value: Double) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_CPU_USAGE -> _cpuUsage = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
override fun flush() {
// Not available
@@ -113,12 +114,19 @@ internal class OdcVmResourceStateTableWriter(
* Last column values that are used to check for correct partitioning.
*/
private var lastId: String? = null
- private var lastTimestamp: Long = Long.MIN_VALUE
-
- /**
- * Columns with special behavior.
- */
- private val COL_ID = resolve(RESOURCE_ID)
- private val COL_TIMESTAMP = resolve(RESOURCE_STATE_TIMESTAMP)
- private val COL_DURATION = resolve(RESOURCE_STATE_DURATION)
+ private var lastTimestamp: Instant = Instant.MAX
+
+ private val COL_ID = 0
+ private val COL_TIMESTAMP = 1
+ private val COL_DURATION = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_CPU_USAGE = 4
+
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
+ RESOURCE_STATE_DURATION to COL_DURATION,
+ RESOURCE_CPU_COUNT to COL_CPU_COUNT,
+ RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE,
+ )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
index 4909e70e..6102332f 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
@@ -22,37 +22,30 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.Resource
import org.opendc.trace.util.parquet.LocalParquetReader
-import java.time.Instant
/**
- * A [TableReader] implementation for the resources table in the OpenDC virtual machine trace format.
+ * A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<Resource>) : TableReader {
/**
* The current record.
*/
- private var record: GenericRecord? = null
-
- /**
- * A flag to indicate that the columns have been initialized.
- */
- private var hasInitializedColumns = false
+ private var record: Resource? = null
override fun nextRow(): Boolean {
- val record = reader.read()
- this.record = record
+ try {
+ val record = reader.read()
+ this.record = record
- if (!hasInitializedColumns && record != null) {
- initColumns(record.schema)
- hasInitializedColumns = true
+ return record != null
+ } catch (e: Throwable) {
+ this.record = null
+ throw e
}
-
- return record != null
}
override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
@@ -66,9 +59,9 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_ID -> record[AVRO_COL_ID].toString()
- COL_START_TIME -> Instant.ofEpochMilli(record[AVRO_COL_START_TIME] as Long)
- COL_STOP_TIME -> Instant.ofEpochMilli(record[AVRO_COL_STOP_TIME] as Long)
+ COL_ID -> record.id
+ COL_START_TIME -> record.startTime
+ COL_STOP_TIME -> record.stopTime
COL_CPU_COUNT -> getInt(index)
COL_CPU_CAPACITY -> getDouble(index)
COL_MEM_CAPACITY -> getDouble(index)
@@ -84,7 +77,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int
+ COL_CPU_COUNT -> record.cpuCount
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -97,8 +90,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_CAPACITY -> if (AVRO_COL_CPU_CAPACITY >= 0) (record[AVRO_COL_CPU_CAPACITY] as Number).toDouble() else 0.0
- COL_MEM_CAPACITY -> (record[AVRO_COL_MEM_CAPACITY] as Number).toDouble()
+ COL_CPU_CAPACITY -> record.cpuCapacity
+ COL_MEM_CAPACITY -> record.memCapacity
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -109,30 +102,6 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
override fun toString(): String = "OdcVmResourceTableReader"
- /**
- * Initialize the columns for the reader based on [schema].
- */
- private fun initColumns(schema: Schema) {
- try {
- AVRO_COL_ID = schema.getField("id").pos()
- AVRO_COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
- AVRO_COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
- AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
- AVRO_COL_CPU_CAPACITY = schema.getField("cpu_capacity")?.pos() ?: -1
- AVRO_COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
- } catch (e: NullPointerException) {
- // This happens when the field we are trying to access does not exist
- throw IllegalArgumentException("Invalid schema")
- }
- }
-
- private var AVRO_COL_ID = -1
- private var AVRO_COL_START_TIME = -1
- private var AVRO_COL_STOP_TIME = -1
- private var AVRO_COL_CPU_COUNT = -1
- private var AVRO_COL_CPU_CAPACITY = -1
- private var AVRO_COL_MEM_CAPACITY = -1
-
private val COL_ID = 0
private val COL_START_TIME = 1
private val COL_STOP_TIME = 2
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
index edc89ee6..cae65faa 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
@@ -22,74 +22,82 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
-import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.Resource
import java.time.Instant
-import kotlin.math.roundToLong
/**
* A [TableWriter] implementation for the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceTableWriter(
- private val writer: ParquetWriter<GenericRecord>,
- private val schema: Schema
-) : TableWriter {
+internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resource>) : TableWriter {
/**
- * The current builder for the record that is being written.
+ * The current state for the record that is being written.
*/
- private var builder: GenericRecordBuilder? = null
-
- /**
- * The fields belonging to the resource schema.
- */
- private val fields = schema.fields
+ private var _isActive = false
+ private var _id: String = ""
+ private var _startTime: Instant = Instant.MIN
+ private var _stopTime: Instant = Instant.MIN
+ private var _cpuCount: Int = 0
+ private var _cpuCapacity: Double = Double.NaN
+ private var _memCapacity: Double = Double.NaN
override fun startRow() {
- builder = GenericRecordBuilder(schema)
+ _isActive = true
+ _id = ""
+ _startTime = Instant.MIN
+ _stopTime = Instant.MIN
+ _cpuCount = 0
+ _cpuCapacity = Double.NaN
+ _memCapacity = Double.NaN
}
override fun endRow() {
- val builder = checkNotNull(builder) { "No active row" }
- this.builder = null
- writer.write(builder.build())
+ check(_isActive) { "No active row" }
+ _isActive = false
+ writer.write(Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity))
}
- override fun resolve(column: TableColumn<*>): Int {
- val schema = schema
- return when (column) {
- RESOURCE_ID -> schema.getField("id").pos()
- RESOURCE_START_TIME -> (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
- RESOURCE_STOP_TIME -> (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
- RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
- RESOURCE_CPU_CAPACITY -> schema.getField("cpu_capacity").pos()
- RESOURCE_MEM_CAPACITY -> (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
- else -> -1
- }
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
override fun set(index: Int, value: Any) {
- val builder = checkNotNull(builder) { "No active row" }
- builder.set(
- fields[index],
- when (index) {
- COL_START_TIME, COL_STOP_TIME -> (value as Instant).toEpochMilli()
- COL_MEM_CAPACITY -> (value as Double).roundToLong()
- else -> value
- }
- )
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_ID -> _id = value as String
+ COL_START_TIME -> _startTime = value as Instant
+ COL_STOP_TIME -> _stopTime = value as Instant
+ COL_CPU_COUNT -> _cpuCount = value as Int
+ COL_CPU_CAPACITY -> _cpuCapacity = value as Double
+ COL_MEM_CAPACITY -> _memCapacity = value as Double
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
}
- override fun setBoolean(index: Int, value: Boolean) = set(index, value)
+ override fun setBoolean(index: Int, value: Boolean) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
- override fun setInt(index: Int, value: Int) = set(index, value)
+ override fun setInt(index: Int, value: Int) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_CPU_COUNT -> _cpuCount = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
- override fun setLong(index: Int, value: Long) = set(index, value)
+ override fun setLong(index: Int, value: Long) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
- override fun setDouble(index: Int, value: Double) = set(index, value)
+ override fun setDouble(index: Int, value: Double) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_CPU_CAPACITY -> _cpuCapacity = value
+ COL_MEM_CAPACITY -> _memCapacity = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
override fun flush() {
// Not available
@@ -99,10 +107,19 @@ internal class OdcVmResourceTableWriter(
writer.close()
}
- /**
- * Columns with special behavior.
- */
- private val COL_START_TIME = resolve(RESOURCE_START_TIME)
- private val COL_STOP_TIME = resolve(RESOURCE_STOP_TIME)
- private val COL_MEM_CAPACITY = resolve(RESOURCE_MEM_CAPACITY)
+ private val COL_ID = 0
+ private val COL_START_TIME = 1
+ private val COL_STOP_TIME = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_CPU_CAPACITY = 4
+ private val COL_MEM_CAPACITY = 5
+
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_START_TIME to COL_START_TIME,
+ RESOURCE_STOP_TIME to COL_STOP_TIME,
+ RESOURCE_CPU_COUNT to COL_CPU_COUNT,
+ RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY,
+ RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY,
+ )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index 36a1b4a0..d45910c6 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -22,19 +22,19 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericRecord
-import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.ResourceReadSupport
+import org.opendc.trace.opendc.parquet.ResourceStateReadSupport
+import org.opendc.trace.opendc.parquet.ResourceStateWriteSupport
+import org.opendc.trace.opendc.parquet.ResourceWriteSupport
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
-import org.opendc.trace.util.parquet.LocalOutputFile
import org.opendc.trace.util.parquet.LocalParquetReader
-import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
+import org.opendc.trace.util.parquet.LocalParquetWriter
import shaded.parquet.com.fasterxml.jackson.core.JsonEncoding
import shaded.parquet.com.fasterxml.jackson.core.JsonFactory
import java.nio.file.Files
@@ -102,14 +102,14 @@ public class OdcVmTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_RESOURCES -> {
- val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet"))
+ val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection))
OdcVmResourceTableReader(reader)
}
TABLE_RESOURCE_STATES -> {
- val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet"))
+ val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport(projection))
OdcVmResourceStateTableReader(reader)
}
TABLE_INTERFERENCE_GROUPS -> {
@@ -128,24 +128,24 @@ public class OdcVmTraceFormat : TraceFormat {
override fun newWriter(path: Path, table: String): TableWriter {
return when (table) {
TABLE_RESOURCES -> {
- val schema = RESOURCES_SCHEMA
- val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("meta.parquet")))
- .withSchema(schema)
+ val writer = LocalParquetWriter.builder(path.resolve("meta.parquet"), ResourceWriteSupport())
.withCompressionCodec(CompressionCodecName.ZSTD)
+ .withPageWriteChecksumEnabled(true)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()
- OdcVmResourceTableWriter(writer, schema)
+ OdcVmResourceTableWriter(writer)
}
TABLE_RESOURCE_STATES -> {
- val schema = RESOURCE_STATES_SCHEMA
- val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("trace.parquet")))
- .withSchema(schema)
+ val writer = LocalParquetWriter.builder(path.resolve("trace.parquet"), ResourceStateWriteSupport())
.withCompressionCodec(CompressionCodecName.ZSTD)
.withDictionaryEncoding("id", true)
.withBloomFilterEnabled("id", true)
+ .withPageWriteChecksumEnabled(true)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()
- OdcVmResourceStateTableWriter(writer, schema)
+ OdcVmResourceStateTableWriter(writer)
}
TABLE_INTERFERENCE_GROUPS -> {
val generator = jsonFactory.createGenerator(path.resolve("interference-model.json").toFile(), JsonEncoding.UTF8)
@@ -154,37 +154,4 @@ public class OdcVmTraceFormat : TraceFormat {
else -> throw IllegalArgumentException("Table $table not supported")
}
}
-
- public companion object {
- /**
- * Schema for the resources table in the trace.
- */
- @JvmStatic
- public val RESOURCES_SCHEMA: Schema = SchemaBuilder
- .record("resource")
- .namespace("org.opendc.trace.opendc")
- .fields()
- .requiredString("id")
- .name("start_time").type(TIMESTAMP_SCHEMA).noDefault()
- .name("stop_time").type(TIMESTAMP_SCHEMA).noDefault()
- .requiredInt("cpu_count")
- .requiredDouble("cpu_capacity")
- .requiredLong("mem_capacity")
- .endRecord()
-
- /**
- * Schema for the resource states table in the trace.
- */
- @JvmStatic
- public val RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder
- .record("resource_state")
- .namespace("org.opendc.trace.opendc")
- .fields()
- .requiredString("id")
- .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
- .requiredLong("duration")
- .requiredInt("cpu_count")
- .requiredDouble("cpu_usage")
- .endRecord()
- }
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt
new file mode 100644
index 00000000..c6db45b5
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import java.time.Instant
+
+/**
+ * A description of a resource in a trace.
+ */
+internal data class Resource(
+ val id: String,
+ val startTime: Instant,
+ val stopTime: Instant,
+ val cpuCount: Int,
+ val cpuCapacity: Double,
+ val memCapacity: Double
+)
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
new file mode 100644
index 00000000..0d70446d
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.*
+import org.opendc.trace.TableColumn
+import org.opendc.trace.conv.*
+
+/**
+ * A [ReadSupport] instance for [Resource] objects.
+ */
+internal class ResourceReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Resource>() {
+ /**
+ * Mapping from field names to [TableColumn]s.
+ */
+ private val fieldMap = mapOf<String, TableColumn<*>>(
+ "id" to RESOURCE_ID,
+ "submissionTime" to RESOURCE_START_TIME,
+ "start_time" to RESOURCE_START_TIME,
+ "endTime" to RESOURCE_STOP_TIME,
+ "stop_time" to RESOURCE_STOP_TIME,
+ "maxCores" to RESOURCE_CPU_COUNT,
+ "cpu_count" to RESOURCE_CPU_COUNT,
+ "cpu_capacity" to RESOURCE_CPU_CAPACITY,
+ "requiredMemory" to RESOURCE_MEM_CAPACITY,
+ "mem_capacity" to RESOURCE_MEM_CAPACITY,
+ )
+
+ override fun init(context: InitContext): ReadContext {
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val projectionSet = projection.toSet()
+
+ for (field in READ_SCHEMA.fields) {
+ val col = fieldMap[field.name] ?: continue
+ if (col in projectionSet) {
+ addField(field)
+ }
+ }
+ }
+ .named(READ_SCHEMA.name)
+ } else {
+ READ_SCHEMA
+ }
+
+ return ReadContext(projectedSchema)
+ }
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext
+ ): RecordMaterializer<Resource> = ResourceRecordMaterializer(readContext.requestedSchema)
+
+ companion object {
+ /**
+ * Parquet read schema (version 2.0) for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_0: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("submissionTime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("endTime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("maxCores"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("requiredMemory"),
+ )
+ .named("resource")
+
+ /**
+ * Parquet read schema (version 2.1) for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_1: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("start_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("stop_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ )
+ .named("resource")
+
+ /**
+ * Parquet read schema for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0
+ .union(READ_SCHEMA_V2_1)
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt
new file mode 100644
index 00000000..3adb0709
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.parquet.io.api.*
+import org.apache.parquet.schema.MessageType
+import java.time.Instant
+
+/**
+ * A [RecordMaterializer] for [Resource] records.
+ */
+internal class ResourceRecordMaterializer(schema: MessageType) : RecordMaterializer<Resource>() {
+ /**
+ * State of current record being read.
+ */
+ private var _id = ""
+ private var _startTime = Instant.MIN
+ private var _stopTime = Instant.MIN
+ private var _cpuCount = 0
+ private var _cpuCapacity = 0.0
+ private var _memCapacity = 0.0
+
+ /**
+ * Root converter for the record.
+ */
+ private val root = object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters = schema.fields.map { type ->
+ when (type.name) {
+ "id" -> object : PrimitiveConverter() {
+ override fun addBinary(value: Binary) {
+ _id = value.toStringUsingUTF8()
+ }
+ }
+ "start_time", "submissionTime" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _startTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "stop_time", "endTime" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _stopTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "cpu_count", "maxCores" -> object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ _cpuCount = value
+ }
+ }
+ "cpu_capacity" -> object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ _cpuCapacity = value
+ }
+ }
+ "mem_capacity", "requiredMemory" -> object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ _memCapacity = value
+ }
+
+ override fun addLong(value: Long) {
+ _memCapacity = value.toDouble()
+ }
+ }
+ else -> error("Unknown column $type")
+ }
+ }
+
+ override fun start() {
+ _id = ""
+ _startTime = Instant.MIN
+ _stopTime = Instant.MIN
+ _cpuCount = 0
+ _cpuCapacity = 0.0
+ _memCapacity = 0.0
+ }
+
+ override fun end() {}
+
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
+
+ override fun getCurrentRecord(): Resource = Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity)
+
+ override fun getRootConverter(): GroupConverter = root
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt
new file mode 100644
index 00000000..9ad58764
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import java.time.Duration
+import java.time.Instant
+
+internal class ResourceState(
+ val id: String,
+ val timestamp: Instant,
+ val duration: Duration,
+ val cpuCount: Int,
+ val cpuUsage: Double
+)
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
new file mode 100644
index 00000000..97aa00b2
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.*
+import org.opendc.trace.TableColumn
+import org.opendc.trace.conv.*
+
+/**
+ * A [ReadSupport] instance for [ResourceState] objects.
+ */
+internal class ResourceStateReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<ResourceState>() {
+ /**
+ * Mapping from field names to [TableColumn]s.
+ */
+ private val fieldMap = mapOf<String, TableColumn<*>>(
+ "id" to RESOURCE_ID,
+ "time" to RESOURCE_STATE_TIMESTAMP,
+ "timestamp" to RESOURCE_STATE_TIMESTAMP,
+ "duration" to RESOURCE_STATE_DURATION,
+ "cores" to RESOURCE_CPU_COUNT,
+ "cpu_count" to RESOURCE_CPU_COUNT,
+ "cpuUsage" to RESOURCE_STATE_CPU_USAGE,
+ "cpu_usage" to RESOURCE_STATE_CPU_USAGE,
+ )
+
+ override fun init(context: InitContext): ReadContext {
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val projectionSet = projection.toSet()
+
+ for (field in READ_SCHEMA.fields) {
+ val col = fieldMap[field.name] ?: continue
+ if (col in projectionSet) {
+ addField(field)
+ }
+ }
+ }
+ .named(READ_SCHEMA.name)
+ } else {
+ READ_SCHEMA
+ }
+
+ return ReadContext(projectedSchema)
+ }
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext
+ ): RecordMaterializer<ResourceState> = ResourceStateRecordMaterializer(readContext.requestedSchema)
+
+ companion object {
+ /**
+ * Parquet read schema (version 2.0) for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_0: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cores"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpuUsage")
+ )
+ .named("resource_state")
+
+ /**
+ * Parquet read schema (version 2.1) for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_1: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_usage")
+ )
+ .named("resource_state")
+
+ /**
+ * Parquet read schema for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0.union(READ_SCHEMA_V2_1)
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt
new file mode 100644
index 00000000..f8b0c3c2
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt
@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.parquet.io.api.*
+import org.apache.parquet.schema.MessageType
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A [RecordMaterializer] for [ResourceState] records.
+ */
+internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMaterializer<ResourceState>() {
+ /**
+ * State of current record being read.
+ */
+ private var _id = ""
+ private var _timestamp = Instant.MIN
+ private var _duration = Duration.ZERO
+ private var _cpuCount = 0
+ private var _cpuUsage = 0.0
+
+ /**
+ * Root converter for the record.
+ */
+ private val root = object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters = schema.fields.map { type ->
+ when (type.name) {
+ "id" -> object : PrimitiveConverter() {
+ override fun addBinary(value: Binary) {
+ _id = value.toStringUsingUTF8()
+ }
+ }
+ "timestamp", "time" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _timestamp = Instant.ofEpochMilli(value)
+ }
+ }
+ "duration" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _duration = Duration.ofMillis(value)
+ }
+ }
+ "cpu_count", "cores" -> object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ _cpuCount = value
+ }
+ }
+ "cpu_usage", "cpuUsage" -> object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ _cpuUsage = value
+ }
+ }
+ "flops" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ // Ignore to support v1 format
+ }
+ }
+ else -> error("Unknown column $type")
+ }
+ }
+
+ override fun start() {
+ _id = ""
+ _timestamp = Instant.MIN
+ _duration = Duration.ZERO
+ _cpuCount = 0
+ _cpuUsage = 0.0
+ }
+
+ override fun end() {}
+
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
+
+ override fun getCurrentRecord(): ResourceState = ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage)
+
+ override fun getRootConverter(): GroupConverter = root
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt
new file mode 100644
index 00000000..e2f3df31
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
+
+/**
+ * Support for writing [Resource] instances to Parquet format.
+ */
+internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() {
+ /**
+ * The current active record consumer.
+ */
+ private lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(WRITE_SCHEMA, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: ResourceState) {
+ write(recordConsumer, record)
+ }
+
+ private fun write(consumer: RecordConsumer, record: ResourceState) {
+ consumer.startMessage()
+
+ consumer.startField("id", 0)
+ consumer.addBinary(Binary.fromCharSequence(record.id))
+ consumer.endField("id", 0)
+
+ consumer.startField("timestamp", 1)
+ consumer.addLong(record.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 1)
+
+ consumer.startField("duration", 2)
+ consumer.addLong(record.duration.toMillis())
+ consumer.endField("duration", 2)
+
+ consumer.startField("cpu_count", 3)
+ consumer.addInteger(record.cpuCount)
+ consumer.endField("cpu_count", 3)
+
+ consumer.startField("cpu_usage", 4)
+ consumer.addDouble(record.cpuUsage)
+ consumer.endField("cpu_usage", 4)
+
+ consumer.endMessage()
+ }
+
+ companion object {
+ /**
+ * Parquet schema for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val WRITE_SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_usage")
+ )
+ .named("resource_state")
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt
new file mode 100644
index 00000000..14cadabb
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
+import kotlin.math.roundToLong
+
+/**
+ * Support for writing [Resource] instances to Parquet format.
+ */
+internal class ResourceWriteSupport : WriteSupport<Resource>() {
+ /**
+ * The current active record consumer.
+ */
+ private lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(WRITE_SCHEMA, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: Resource) {
+ write(recordConsumer, record)
+ }
+
+ private fun write(consumer: RecordConsumer, record: Resource) {
+ consumer.startMessage()
+
+ consumer.startField("id", 0)
+ consumer.addBinary(Binary.fromCharSequence(record.id))
+ consumer.endField("id", 0)
+
+ consumer.startField("start_time", 1)
+ consumer.addLong(record.startTime.toEpochMilli())
+ consumer.endField("start_time", 1)
+
+ consumer.startField("stop_time", 2)
+ consumer.addLong(record.stopTime.toEpochMilli())
+ consumer.endField("stop_time", 2)
+
+ consumer.startField("cpu_count", 3)
+ consumer.addInteger(record.cpuCount)
+ consumer.endField("cpu_count", 3)
+
+ consumer.startField("cpu_capacity", 4)
+ consumer.addDouble(record.cpuCapacity)
+ consumer.endField("cpu_capacity", 4)
+
+ consumer.startField("mem_capacity", 5)
+ consumer.addLong(record.memCapacity.roundToLong())
+ consumer.endField("mem_capacity", 5)
+
+ consumer.endMessage()
+ }
+
+ companion object {
+ /**
+ * Parquet schema for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val WRITE_SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("start_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("stop_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ )
+ .named("resource")
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
index c8742624..1f4f6195 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
@@ -29,7 +29,9 @@ import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.opendc.trace.conv.*
+import java.nio.file.Files
import java.nio.file.Paths
+import java.time.Instant
/**
* Test suite for the [OdcVmTraceFormat] implementation.
@@ -61,11 +63,12 @@ internal class OdcVmTraceFormatTest {
@ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
fun testResources(name: String) {
val path = Paths.get("src/test/resources/$name")
- val reader = format.newReader(path, TABLE_RESOURCES)
+ val reader = format.newReader(path, TABLE_RESOURCES, listOf(RESOURCE_ID, RESOURCE_START_TIME))
assertAll(
{ assertTrue(reader.nextRow()) },
{ assertEquals("1019", reader.get(RESOURCE_ID)) },
+ { assertEquals(Instant.ofEpochMilli(1376314846000), reader.get(RESOURCE_START_TIME)) },
{ assertTrue(reader.nextRow()) },
{ assertEquals("1023", reader.get(RESOURCE_ID)) },
{ assertTrue(reader.nextRow()) },
@@ -78,11 +81,46 @@ internal class OdcVmTraceFormatTest {
reader.close()
}
+ @Test
+ fun testResourcesWrite() {
+ val path = Files.createTempDirectory("opendc")
+ val writer = format.newWriter(path, TABLE_RESOURCES)
+
+ writer.startRow()
+ writer.set(RESOURCE_ID, "1019")
+ writer.set(RESOURCE_START_TIME, Instant.EPOCH)
+ writer.set(RESOURCE_STOP_TIME, Instant.EPOCH)
+ writer.setInt(RESOURCE_CPU_COUNT, 1)
+ writer.setDouble(RESOURCE_CPU_CAPACITY, 1024.0)
+ writer.setDouble(RESOURCE_MEM_CAPACITY, 1024.0)
+ writer.endRow()
+ writer.close()
+
+ val reader = format.newReader(path, TABLE_RESOURCES, null)
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("1019", reader.get(RESOURCE_ID)) },
+ { assertEquals(Instant.EPOCH, reader.get(RESOURCE_START_TIME)) },
+ { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STOP_TIME)) },
+ { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
+ { assertEquals(1024.0, reader.getDouble(RESOURCE_CPU_CAPACITY)) },
+ { assertEquals(1024.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) },
+ { assertFalse(reader.nextRow()) },
+ )
+
+ reader.close()
+ }
+
@ParameterizedTest
@ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
fun testSmoke(name: String) {
val path = Paths.get("src/test/resources/$name")
- val reader = format.newReader(path, TABLE_RESOURCE_STATES)
+ val reader = format.newReader(
+ path,
+ TABLE_RESOURCE_STATES,
+ listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_CPU_USAGE)
+ )
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -95,9 +133,40 @@ internal class OdcVmTraceFormatTest {
}
@Test
+ fun testResourceStatesWrite() {
+ val path = Files.createTempDirectory("opendc")
+ val writer = format.newWriter(path, TABLE_RESOURCE_STATES)
+
+ writer.startRow()
+ writer.set(RESOURCE_ID, "1019")
+ writer.set(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH)
+ writer.setDouble(RESOURCE_STATE_CPU_USAGE, 23.0)
+ writer.setInt(RESOURCE_CPU_COUNT, 1)
+ writer.endRow()
+ writer.close()
+
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("1019", reader.get(RESOURCE_ID)) },
+ { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STATE_TIMESTAMP)) },
+ { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
+ { assertEquals(23.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE)) },
+ { assertFalse(reader.nextRow()) },
+ )
+
+ reader.close()
+ }
+
+ @Test
fun testInterferenceGroups() {
val path = Paths.get("src/test/resources/trace-v2.1")
- val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS)
+ val reader = format.newReader(
+ path,
+ TABLE_INTERFERENCE_GROUPS,
+ listOf(INTERFERENCE_GROUP_MEMBERS, INTERFERENCE_GROUP_TARGET, INTERFERENCE_GROUP_SCORE)
+ )
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -117,7 +186,7 @@ internal class OdcVmTraceFormatTest {
@Test
fun testInterferenceGroupsEmpty() {
val path = Paths.get("src/test/resources/trace-v2.0")
- val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS)
+ val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS, listOf(INTERFERENCE_GROUP_MEMBERS))
assertFalse(reader.nextRow())
reader.close()