summaryrefslogtreecommitdiff
path: root/opendc-trace
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-01 22:54:08 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-02 15:37:03 +0200
commitea5e79fc77072e6151ee7952581b97e35a2027fb (patch)
treee2a0ab7d3efbcd29dce0e8fabb44dc6c26610804 /opendc-trace
parentee057033b4c534fdd3e8a9d2320d75035d30f27a (diff)
perf(trace/opendc): Read records using low-level API
This change updates the OpenDC VM format reader implementation to use the low-level record reading APIs provided by the `parquet-mr` library for improved performance. Previously, we used the `parquet-avro` library to read/write Avro records in Parquet format, but that library carries considerable overhead.
Diffstat (limited to 'opendc-trace')
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt74
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt124
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt65
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt117
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt65
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt37
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt111
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt107
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt34
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt105
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt102
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt105
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt114
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt60
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt6
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt55
16 files changed, 1022 insertions, 259 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
index b82da888..7a01b881 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
@@ -22,38 +22,30 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.ResourceState
import org.opendc.trace.util.parquet.LocalParquetReader
-import java.time.Duration
-import java.time.Instant
/**
* A [TableReader] implementation for the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader<ResourceState>) : TableReader {
/**
* The current record.
*/
- private var record: GenericRecord? = null
-
- /**
- * A flag to indicate that the columns have been initialized.
- */
- private var hasInitializedColumns = false
+ private var record: ResourceState? = null
override fun nextRow(): Boolean {
- val record = reader.read()
- this.record = record
+ try {
+ val record = reader.read()
+ this.record = record
- if (!hasInitializedColumns && record != null) {
- initColumns(record.schema)
- hasInitializedColumns = true
+ return record != null
+ } catch (e: Throwable) {
+ this.record = null
+ throw e
}
-
- return record != null
}
override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
@@ -67,36 +59,36 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_ID -> record[AVRO_COL_ID].toString()
- COL_TIMESTAMP -> Instant.ofEpochMilli(record[AVRO_COL_TIMESTAMP] as Long)
- COL_DURATION -> Duration.ofMillis(record[AVRO_COL_DURATION] as Long)
- COL_CPU_COUNT -> getInt(index)
- COL_CPU_USAGE -> getDouble(index)
- else -> throw IllegalArgumentException("Invalid column")
+ COL_ID -> record.id
+ COL_TIMESTAMP -> record.timestamp
+ COL_DURATION -> record.duration
+ COL_CPU_COUNT -> record.cpuCount
+ COL_CPU_USAGE -> record.cpuUsage
+ else -> throw IllegalArgumentException("Invalid column index $index")
}
}
override fun getBoolean(index: Int): Boolean {
- throw IllegalArgumentException("Invalid column")
+ throw IllegalArgumentException("Invalid column or type [index $index]")
}
override fun getInt(index: Int): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int
- else -> throw IllegalArgumentException("Invalid column")
+ COL_CPU_COUNT -> record.cpuCount
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
override fun getLong(index: Int): Long {
- throw IllegalArgumentException("Invalid column")
+ throw IllegalArgumentException("Invalid column or type [index $index]")
}
override fun getDouble(index: Int): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_USAGE -> (record[AVRO_COL_CPU_USAGE] as Number).toDouble()
- else -> throw IllegalArgumentException("Invalid column")
+ COL_CPU_USAGE -> record.cpuUsage
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
@@ -106,28 +98,6 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
override fun toString(): String = "OdcVmResourceStateTableReader"
- /**
- * Initialize the columns for the reader based on [schema].
- */
- private fun initColumns(schema: Schema) {
- try {
- AVRO_COL_ID = schema.getField("id").pos()
- AVRO_COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos()
- AVRO_COL_DURATION = schema.getField("duration").pos()
- AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
- AVRO_COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
- } catch (e: NullPointerException) {
- // This happens when the field we are trying to access does not exist
- throw IllegalArgumentException("Invalid schema", e)
- }
- }
-
- private var AVRO_COL_ID = -1
- private var AVRO_COL_TIMESTAMP = -1
- private var AVRO_COL_DURATION = -1
- private var AVRO_COL_CPU_COUNT = -1
- private var AVRO_COL_CPU_USAGE = -1
-
private val COL_ID = 0
private val COL_TIMESTAMP = 1
private val COL_DURATION = 2
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
index 01b9750c..97af5b59 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
@@ -22,84 +22,85 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
-import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.ResourceState
import java.time.Duration
import java.time.Instant
/**
* A [TableWriter] implementation for the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceStateTableWriter(
- private val writer: ParquetWriter<GenericRecord>,
- private val schema: Schema
-) : TableWriter {
+internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<ResourceState>) : TableWriter {
/**
- * The current builder for the record that is being written.
+ * The current state for the record that is being written.
*/
- private var builder: GenericRecordBuilder? = null
-
- /**
- * The fields belonging to the resource state schema.
- */
- private val fields = schema.fields
+ private var _isActive = false
+ private var _id: String = ""
+ private var _timestamp: Instant = Instant.MIN
+ private var _duration: Duration = Duration.ZERO
+ private var _cpuCount: Int = 0
+ private var _cpuUsage: Double = Double.NaN
override fun startRow() {
- builder = GenericRecordBuilder(schema)
+ _isActive = true
+ _id = ""
+ _timestamp = Instant.MIN
+ _duration = Duration.ZERO
+ _cpuCount = 0
+ _cpuUsage = Double.NaN
}
override fun endRow() {
- val builder = checkNotNull(builder) { "No active row" }
- this.builder = null
-
- val record = builder.build()
- val id = record[COL_ID] as String
- val timestamp = record[COL_TIMESTAMP] as Long
+ check(_isActive) { "No active row" }
+ _isActive = false
- check(lastId != id || timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" }
+ check(lastId != _id || _timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" }
- writer.write(builder.build())
+ writer.write(ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage))
- lastId = id
- lastTimestamp = timestamp
+ lastId = _id
+ lastTimestamp = _timestamp
}
- override fun resolve(column: TableColumn<*>): Int {
- val schema = schema
- return when (column) {
- RESOURCE_ID -> schema.getField("id").pos()
- RESOURCE_STATE_TIMESTAMP -> (schema.getField("timestamp") ?: schema.getField("time")).pos()
- RESOURCE_STATE_DURATION -> schema.getField("duration").pos()
- RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
- RESOURCE_STATE_CPU_USAGE -> (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
- else -> -1
- }
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
override fun set(index: Int, value: Any) {
- val builder = checkNotNull(builder) { "No active row" }
-
- builder.set(
- fields[index],
- when (index) {
- COL_TIMESTAMP -> (value as Instant).toEpochMilli()
- COL_DURATION -> (value as Duration).toMillis()
- else -> value
- }
- )
+ check(_isActive) { "No active row" }
+
+ when (index) {
+ COL_ID -> _id = value as String
+ COL_TIMESTAMP -> _timestamp = value as Instant
+ COL_DURATION -> _duration = value as Duration
+ COL_CPU_COUNT -> _cpuCount = value as Int
+ COL_CPU_USAGE -> _cpuUsage = value as Double
+ }
}
- override fun setBoolean(index: Int, value: Boolean) = set(index, value)
+ override fun setBoolean(index: Int, value: Boolean) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
- override fun setInt(index: Int, value: Int) = set(index, value)
+ override fun setInt(index: Int, value: Int) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_CPU_COUNT -> _cpuCount = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
- override fun setLong(index: Int, value: Long) = set(index, value)
+ override fun setLong(index: Int, value: Long) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
- override fun setDouble(index: Int, value: Double) = set(index, value)
+ override fun setDouble(index: Int, value: Double) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_CPU_USAGE -> _cpuUsage = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
override fun flush() {
// Not available
@@ -113,12 +114,19 @@ internal class OdcVmResourceStateTableWriter(
* Last column values that are used to check for correct partitioning.
*/
private var lastId: String? = null
- private var lastTimestamp: Long = Long.MIN_VALUE
-
- /**
- * Columns with special behavior.
- */
- private val COL_ID = resolve(RESOURCE_ID)
- private val COL_TIMESTAMP = resolve(RESOURCE_STATE_TIMESTAMP)
- private val COL_DURATION = resolve(RESOURCE_STATE_DURATION)
+ private var lastTimestamp: Instant = Instant.MAX
+
+ private val COL_ID = 0
+ private val COL_TIMESTAMP = 1
+ private val COL_DURATION = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_CPU_USAGE = 4
+
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
+ RESOURCE_STATE_DURATION to COL_DURATION,
+ RESOURCE_CPU_COUNT to COL_CPU_COUNT,
+ RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE,
+ )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
index 4909e70e..6102332f 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
@@ -22,37 +22,30 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.Resource
import org.opendc.trace.util.parquet.LocalParquetReader
-import java.time.Instant
/**
- * A [TableReader] implementation for the resources table in the OpenDC virtual machine trace format.
+ * A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<Resource>) : TableReader {
/**
* The current record.
*/
- private var record: GenericRecord? = null
-
- /**
- * A flag to indicate that the columns have been initialized.
- */
- private var hasInitializedColumns = false
+ private var record: Resource? = null
override fun nextRow(): Boolean {
- val record = reader.read()
- this.record = record
+ try {
+ val record = reader.read()
+ this.record = record
- if (!hasInitializedColumns && record != null) {
- initColumns(record.schema)
- hasInitializedColumns = true
+ return record != null
+ } catch (e: Throwable) {
+ this.record = null
+ throw e
}
-
- return record != null
}
override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
@@ -66,9 +59,9 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_ID -> record[AVRO_COL_ID].toString()
- COL_START_TIME -> Instant.ofEpochMilli(record[AVRO_COL_START_TIME] as Long)
- COL_STOP_TIME -> Instant.ofEpochMilli(record[AVRO_COL_STOP_TIME] as Long)
+ COL_ID -> record.id
+ COL_START_TIME -> record.startTime
+ COL_STOP_TIME -> record.stopTime
COL_CPU_COUNT -> getInt(index)
COL_CPU_CAPACITY -> getDouble(index)
COL_MEM_CAPACITY -> getDouble(index)
@@ -84,7 +77,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int
+ COL_CPU_COUNT -> record.cpuCount
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -97,8 +90,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_CAPACITY -> if (AVRO_COL_CPU_CAPACITY >= 0) (record[AVRO_COL_CPU_CAPACITY] as Number).toDouble() else 0.0
- COL_MEM_CAPACITY -> (record[AVRO_COL_MEM_CAPACITY] as Number).toDouble()
+ COL_CPU_CAPACITY -> record.cpuCapacity
+ COL_MEM_CAPACITY -> record.memCapacity
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -109,30 +102,6 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
override fun toString(): String = "OdcVmResourceTableReader"
- /**
- * Initialize the columns for the reader based on [schema].
- */
- private fun initColumns(schema: Schema) {
- try {
- AVRO_COL_ID = schema.getField("id").pos()
- AVRO_COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
- AVRO_COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
- AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
- AVRO_COL_CPU_CAPACITY = schema.getField("cpu_capacity")?.pos() ?: -1
- AVRO_COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
- } catch (e: NullPointerException) {
- // This happens when the field we are trying to access does not exist
- throw IllegalArgumentException("Invalid schema")
- }
- }
-
- private var AVRO_COL_ID = -1
- private var AVRO_COL_START_TIME = -1
- private var AVRO_COL_STOP_TIME = -1
- private var AVRO_COL_CPU_COUNT = -1
- private var AVRO_COL_CPU_CAPACITY = -1
- private var AVRO_COL_MEM_CAPACITY = -1
-
private val COL_ID = 0
private val COL_START_TIME = 1
private val COL_STOP_TIME = 2
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
index edc89ee6..cae65faa 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
@@ -22,74 +22,82 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
-import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.Resource
import java.time.Instant
-import kotlin.math.roundToLong
/**
* A [TableWriter] implementation for the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceTableWriter(
- private val writer: ParquetWriter<GenericRecord>,
- private val schema: Schema
-) : TableWriter {
+internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resource>) : TableWriter {
/**
- * The current builder for the record that is being written.
+ * The current state for the record that is being written.
*/
- private var builder: GenericRecordBuilder? = null
-
- /**
- * The fields belonging to the resource schema.
- */
- private val fields = schema.fields
+ private var _isActive = false
+ private var _id: String = ""
+ private var _startTime: Instant = Instant.MIN
+ private var _stopTime: Instant = Instant.MIN
+ private var _cpuCount: Int = 0
+ private var _cpuCapacity: Double = Double.NaN
+ private var _memCapacity: Double = Double.NaN
override fun startRow() {
- builder = GenericRecordBuilder(schema)
+ _isActive = true
+ _id = ""
+ _startTime = Instant.MIN
+ _stopTime = Instant.MIN
+ _cpuCount = 0
+ _cpuCapacity = Double.NaN
+ _memCapacity = Double.NaN
}
override fun endRow() {
- val builder = checkNotNull(builder) { "No active row" }
- this.builder = null
- writer.write(builder.build())
+ check(_isActive) { "No active row" }
+ _isActive = false
+ writer.write(Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity))
}
- override fun resolve(column: TableColumn<*>): Int {
- val schema = schema
- return when (column) {
- RESOURCE_ID -> schema.getField("id").pos()
- RESOURCE_START_TIME -> (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
- RESOURCE_STOP_TIME -> (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
- RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
- RESOURCE_CPU_CAPACITY -> schema.getField("cpu_capacity").pos()
- RESOURCE_MEM_CAPACITY -> (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
- else -> -1
- }
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
override fun set(index: Int, value: Any) {
- val builder = checkNotNull(builder) { "No active row" }
- builder.set(
- fields[index],
- when (index) {
- COL_START_TIME, COL_STOP_TIME -> (value as Instant).toEpochMilli()
- COL_MEM_CAPACITY -> (value as Double).roundToLong()
- else -> value
- }
- )
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_ID -> _id = value as String
+ COL_START_TIME -> _startTime = value as Instant
+ COL_STOP_TIME -> _stopTime = value as Instant
+ COL_CPU_COUNT -> _cpuCount = value as Int
+ COL_CPU_CAPACITY -> _cpuCapacity = value as Double
+ COL_MEM_CAPACITY -> _memCapacity = value as Double
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
}
- override fun setBoolean(index: Int, value: Boolean) = set(index, value)
+ override fun setBoolean(index: Int, value: Boolean) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
- override fun setInt(index: Int, value: Int) = set(index, value)
+ override fun setInt(index: Int, value: Int) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_CPU_COUNT -> _cpuCount = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
- override fun setLong(index: Int, value: Long) = set(index, value)
+ override fun setLong(index: Int, value: Long) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
- override fun setDouble(index: Int, value: Double) = set(index, value)
+ override fun setDouble(index: Int, value: Double) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_CPU_CAPACITY -> _cpuCapacity = value
+ COL_MEM_CAPACITY -> _memCapacity = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
override fun flush() {
// Not available
@@ -99,10 +107,19 @@ internal class OdcVmResourceTableWriter(
writer.close()
}
- /**
- * Columns with special behavior.
- */
- private val COL_START_TIME = resolve(RESOURCE_START_TIME)
- private val COL_STOP_TIME = resolve(RESOURCE_STOP_TIME)
- private val COL_MEM_CAPACITY = resolve(RESOURCE_MEM_CAPACITY)
+ private val COL_ID = 0
+ private val COL_START_TIME = 1
+ private val COL_STOP_TIME = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_CPU_CAPACITY = 4
+ private val COL_MEM_CAPACITY = 5
+
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_START_TIME to COL_START_TIME,
+ RESOURCE_STOP_TIME to COL_STOP_TIME,
+ RESOURCE_CPU_COUNT to COL_CPU_COUNT,
+ RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY,
+ RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY,
+ )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index 1a15c7b3..155f8cf3 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -22,19 +22,19 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericRecord
-import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.ResourceReadSupport
+import org.opendc.trace.opendc.parquet.ResourceStateReadSupport
+import org.opendc.trace.opendc.parquet.ResourceStateWriteSupport
+import org.opendc.trace.opendc.parquet.ResourceWriteSupport
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
-import org.opendc.trace.util.parquet.LocalOutputFile
import org.opendc.trace.util.parquet.LocalParquetReader
-import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA
+import org.opendc.trace.util.parquet.LocalParquetWriter
import shaded.parquet.com.fasterxml.jackson.core.JsonEncoding
import shaded.parquet.com.fasterxml.jackson.core.JsonFactory
import java.nio.file.Files
@@ -105,11 +105,11 @@ public class OdcVmTraceFormat : TraceFormat {
override fun newReader(path: Path, table: String): TableReader {
return when (table) {
TABLE_RESOURCES -> {
- val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet"))
+ val reader = LocalParquetReader(path.resolve("meta.parquet"), LocalParquetReader.custom(ResourceReadSupport()))
OdcVmResourceTableReader(reader)
}
TABLE_RESOURCE_STATES -> {
- val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet"))
+ val reader = LocalParquetReader(path.resolve("trace.parquet"), LocalParquetReader.custom(ResourceStateReadSupport()))
OdcVmResourceStateTableReader(reader)
}
TABLE_INTERFERENCE_GROUPS -> {
@@ -128,24 +128,24 @@ public class OdcVmTraceFormat : TraceFormat {
override fun newWriter(path: Path, table: String): TableWriter {
return when (table) {
TABLE_RESOURCES -> {
- val schema = RESOURCES_SCHEMA
- val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("meta.parquet")))
- .withSchema(schema)
+ val writer = LocalParquetWriter.builder(path.resolve("meta.parquet"), ResourceWriteSupport())
.withCompressionCodec(CompressionCodecName.ZSTD)
+ .withPageWriteChecksumEnabled(true)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()
- OdcVmResourceTableWriter(writer, schema)
+ OdcVmResourceTableWriter(writer)
}
TABLE_RESOURCE_STATES -> {
- val schema = RESOURCE_STATES_SCHEMA
- val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("trace.parquet")))
- .withSchema(schema)
+ val writer = LocalParquetWriter.builder(path.resolve("trace.parquet"), ResourceStateWriteSupport())
.withCompressionCodec(CompressionCodecName.ZSTD)
.withDictionaryEncoding("id", true)
.withBloomFilterEnabled("id", true)
+ .withPageWriteChecksumEnabled(true)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()
- OdcVmResourceStateTableWriter(writer, schema)
+ OdcVmResourceStateTableWriter(writer)
}
TABLE_INTERFERENCE_GROUPS -> {
val generator = jsonFactory.createGenerator(path.resolve("interference-model.json").toFile(), JsonEncoding.UTF8)
@@ -154,37 +154,4 @@ public class OdcVmTraceFormat : TraceFormat {
else -> throw IllegalArgumentException("Table $table not supported")
}
}
-
- public companion object {
- /**
- * Schema for the resources table in the trace.
- */
- @JvmStatic
- public val RESOURCES_SCHEMA: Schema = SchemaBuilder
- .record("resource")
- .namespace("org.opendc.trace.opendc")
- .fields()
- .requiredString("id")
- .name("start_time").type(TIMESTAMP_SCHEMA).noDefault()
- .name("stop_time").type(TIMESTAMP_SCHEMA).noDefault()
- .requiredInt("cpu_count")
- .requiredDouble("cpu_capacity")
- .requiredLong("mem_capacity")
- .endRecord()
-
- /**
- * Schema for the resource states table in the trace.
- */
- @JvmStatic
- public val RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder
- .record("resource_state")
- .namespace("org.opendc.trace.opendc")
- .fields()
- .requiredString("id")
- .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
- .requiredLong("duration")
- .requiredInt("cpu_count")
- .requiredDouble("cpu_usage")
- .endRecord()
- }
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt
new file mode 100644
index 00000000..c6db45b5
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import java.time.Instant
+
+/**
+ * A description of a resource in a trace.
+ */
+internal data class Resource(
+ val id: String,
+ val startTime: Instant,
+ val stopTime: Instant,
+ val cpuCount: Int,
+ val cpuCapacity: Double,
+ val memCapacity: Double
+)
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
new file mode 100644
index 00000000..47cce914
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.*
+
+/**
+ * A [ReadSupport] instance for [Resource] objects.
+ */
+internal class ResourceReadSupport : ReadSupport<Resource>() {
+ override fun init(context: InitContext): ReadContext {
+ return ReadContext(READ_SCHEMA)
+ }
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext
+ ): RecordMaterializer<Resource> = ResourceRecordMaterializer(readContext.requestedSchema)
+
+ companion object {
+ /**
+ * Parquet read schema (version 2.0) for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_0: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("submissionTime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("endTime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("maxCores"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("requiredMemory"),
+ )
+ .named("resource")
+
+ /**
+ * Parquet read schema (version 2.1) for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_1: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("start_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("stop_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ )
+ .named("resource")
+
+ /**
+ * Parquet read schema for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0
+ .union(READ_SCHEMA_V2_1)
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt
new file mode 100644
index 00000000..3adb0709
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.parquet.io.api.*
+import org.apache.parquet.schema.MessageType
+import java.time.Instant
+
+/**
+ * A [RecordMaterializer] for [Resource] records.
+ */
+internal class ResourceRecordMaterializer(schema: MessageType) : RecordMaterializer<Resource>() {
+ /**
+ * State of current record being read.
+ */
+ private var _id = ""
+ private var _startTime = Instant.MIN
+ private var _stopTime = Instant.MIN
+ private var _cpuCount = 0
+ private var _cpuCapacity = 0.0
+ private var _memCapacity = 0.0
+
+ /**
+ * Root converter for the record.
+ */
+ private val root = object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters = schema.fields.map { type ->
+ when (type.name) {
+ "id" -> object : PrimitiveConverter() {
+ override fun addBinary(value: Binary) {
+ _id = value.toStringUsingUTF8()
+ }
+ }
+ "start_time", "submissionTime" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _startTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "stop_time", "endTime" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _stopTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "cpu_count", "maxCores" -> object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ _cpuCount = value
+ }
+ }
+ "cpu_capacity" -> object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ _cpuCapacity = value
+ }
+ }
+ "mem_capacity", "requiredMemory" -> object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ _memCapacity = value
+ }
+
+ override fun addLong(value: Long) {
+ _memCapacity = value.toDouble()
+ }
+ }
+ else -> error("Unknown column $type")
+ }
+ }
+
+ override fun start() {
+ _id = ""
+ _startTime = Instant.MIN
+ _stopTime = Instant.MIN
+ _cpuCount = 0
+ _cpuCapacity = 0.0
+ _memCapacity = 0.0
+ }
+
+ override fun end() {}
+
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
+
+ override fun getCurrentRecord(): Resource = Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity)
+
+ override fun getRootConverter(): GroupConverter = root
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt
new file mode 100644
index 00000000..9ad58764
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import java.time.Duration
+import java.time.Instant
+
+internal class ResourceState(
+ val id: String,
+ val timestamp: Instant,
+ val duration: Duration,
+ val cpuCount: Int,
+ val cpuUsage: Double
+)
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
new file mode 100644
index 00000000..17840ceb
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.*
+
+/**
+ * A [ReadSupport] instance for [ResourceState] objects.
+ */
+internal class ResourceStateReadSupport : ReadSupport<ResourceState>() {
+ override fun init(context: InitContext): ReadContext {
+ return ReadContext(READ_SCHEMA)
+ }
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext
+ ): RecordMaterializer<ResourceState> = ResourceStateRecordMaterializer(readContext.requestedSchema)
+
+ companion object {
+ /**
+ * Parquet read schema (version 2.0) for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_0: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cores"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpuUsage")
+ )
+ .named("resource_state")
+
+ /**
+ * Parquet read schema (version 2.1) for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_1: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_usage")
+ )
+ .named("resource_state")
+
+ /**
+ * Parquet read schema for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0.union(READ_SCHEMA_V2_1)
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt
new file mode 100644
index 00000000..f8b0c3c2
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt
@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.parquet.io.api.*
+import org.apache.parquet.schema.MessageType
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A [RecordMaterializer] for [ResourceState] records.
+ */
+internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMaterializer<ResourceState>() {
+ /**
+ * State of current record being read.
+ */
+ private var _id = ""
+ private var _timestamp = Instant.MIN
+ private var _duration = Duration.ZERO
+ private var _cpuCount = 0
+ private var _cpuUsage = 0.0
+
+ /**
+ * Root converter for the record.
+ */
+ private val root = object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters = schema.fields.map { type ->
+ when (type.name) {
+ "id" -> object : PrimitiveConverter() {
+ override fun addBinary(value: Binary) {
+ _id = value.toStringUsingUTF8()
+ }
+ }
+ "timestamp", "time" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _timestamp = Instant.ofEpochMilli(value)
+ }
+ }
+ "duration" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _duration = Duration.ofMillis(value)
+ }
+ }
+ "cpu_count", "cores" -> object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ _cpuCount = value
+ }
+ }
+ "cpu_usage", "cpuUsage" -> object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ _cpuUsage = value
+ }
+ }
+ "flops" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ // Ignore to support v1 format
+ }
+ }
+ else -> error("Unknown column $type")
+ }
+ }
+
+ override fun start() {
+ _id = ""
+ _timestamp = Instant.MIN
+ _duration = Duration.ZERO
+ _cpuCount = 0
+ _cpuUsage = 0.0
+ }
+
+ override fun end() {}
+
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
+
+ override fun getCurrentRecord(): ResourceState = ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage)
+
+ override fun getRootConverter(): GroupConverter = root
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt
new file mode 100644
index 00000000..e2f3df31
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
+
+/**
+ * Support for writing [Resource] instances to Parquet format.
+ */
+internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() {
+ /**
+ * The current active record consumer.
+ */
+ private lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(WRITE_SCHEMA, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: ResourceState) {
+ write(recordConsumer, record)
+ }
+
+ private fun write(consumer: RecordConsumer, record: ResourceState) {
+ consumer.startMessage()
+
+ consumer.startField("id", 0)
+ consumer.addBinary(Binary.fromCharSequence(record.id))
+ consumer.endField("id", 0)
+
+ consumer.startField("timestamp", 1)
+ consumer.addLong(record.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 1)
+
+ consumer.startField("duration", 2)
+ consumer.addLong(record.duration.toMillis())
+ consumer.endField("duration", 2)
+
+ consumer.startField("cpu_count", 3)
+ consumer.addInteger(record.cpuCount)
+ consumer.endField("cpu_count", 3)
+
+ consumer.startField("cpu_usage", 4)
+ consumer.addDouble(record.cpuUsage)
+ consumer.endField("cpu_usage", 4)
+
+ consumer.endMessage()
+ }
+
+ companion object {
+ /**
+ * Parquet schema for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val WRITE_SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_usage")
+ )
+ .named("resource_state")
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt
new file mode 100644
index 00000000..14cadabb
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
+import kotlin.math.roundToLong
+
+/**
+ * Support for writing [Resource] instances to Parquet format.
+ */
+internal class ResourceWriteSupport : WriteSupport<Resource>() {
+ /**
+ * The current active record consumer.
+ */
+ private lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(WRITE_SCHEMA, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: Resource) {
+ write(recordConsumer, record)
+ }
+
+ private fun write(consumer: RecordConsumer, record: Resource) {
+ consumer.startMessage()
+
+ consumer.startField("id", 0)
+ consumer.addBinary(Binary.fromCharSequence(record.id))
+ consumer.endField("id", 0)
+
+ consumer.startField("start_time", 1)
+ consumer.addLong(record.startTime.toEpochMilli())
+ consumer.endField("start_time", 1)
+
+ consumer.startField("stop_time", 2)
+ consumer.addLong(record.stopTime.toEpochMilli())
+ consumer.endField("stop_time", 2)
+
+ consumer.startField("cpu_count", 3)
+ consumer.addInteger(record.cpuCount)
+ consumer.endField("cpu_count", 3)
+
+ consumer.startField("cpu_capacity", 4)
+ consumer.addDouble(record.cpuCapacity)
+ consumer.endField("cpu_capacity", 4)
+
+ consumer.startField("mem_capacity", 5)
+ consumer.addLong(record.memCapacity.roundToLong())
+ consumer.endField("mem_capacity", 5)
+
+ consumer.endMessage()
+ }
+
+ companion object {
+ /**
+ * Parquet schema for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val WRITE_SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("start_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("stop_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ )
+ .named("resource")
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
index c8742624..dec0fef9 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
@@ -29,7 +29,9 @@ import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.opendc.trace.conv.*
+import java.nio.file.Files
import java.nio.file.Paths
+import java.time.Instant
/**
* Test suite for the [OdcVmTraceFormat] implementation.
@@ -78,6 +80,37 @@ internal class OdcVmTraceFormatTest {
reader.close()
}
+ @Test
+ fun testResourcesWrite() {
+ val path = Files.createTempDirectory("opendc")
+ val writer = format.newWriter(path, TABLE_RESOURCES)
+
+ writer.startRow()
+ writer.set(RESOURCE_ID, "1019")
+ writer.set(RESOURCE_START_TIME, Instant.EPOCH)
+ writer.set(RESOURCE_STOP_TIME, Instant.EPOCH)
+ writer.setInt(RESOURCE_CPU_COUNT, 1)
+ writer.setDouble(RESOURCE_CPU_CAPACITY, 1024.0)
+ writer.setDouble(RESOURCE_MEM_CAPACITY, 1024.0)
+ writer.endRow()
+ writer.close()
+
+ val reader = format.newReader(path, TABLE_RESOURCES)
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("1019", reader.get(RESOURCE_ID)) },
+ { assertEquals(Instant.EPOCH, reader.get(RESOURCE_START_TIME)) },
+ { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STOP_TIME)) },
+ { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
+ { assertEquals(1024.0, reader.getDouble(RESOURCE_CPU_CAPACITY)) },
+ { assertEquals(1024.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) },
+ { assertFalse(reader.nextRow()) },
+ )
+
+ reader.close()
+ }
+
@ParameterizedTest
@ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
fun testSmoke(name: String) {
@@ -95,6 +128,33 @@ internal class OdcVmTraceFormatTest {
}
@Test
+ fun testResourceStatesWrite() {
+ val path = Files.createTempDirectory("opendc")
+ val writer = format.newWriter(path, TABLE_RESOURCE_STATES)
+
+ writer.startRow()
+ writer.set(RESOURCE_ID, "1019")
+ writer.set(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH)
+ writer.setDouble(RESOURCE_STATE_CPU_USAGE, 23.0)
+ writer.setInt(RESOURCE_CPU_COUNT, 1)
+ writer.endRow()
+ writer.close()
+
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES)
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("1019", reader.get(RESOURCE_ID)) },
+ { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STATE_TIMESTAMP)) },
+ { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
+ { assertEquals(23.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE)) },
+ { assertFalse(reader.nextRow()) },
+ )
+
+ reader.close()
+ }
+
+ @Test
fun testInterferenceGroups() {
val path = Paths.get("src/test/resources/trace-v2.1")
val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS)
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
index bb2bb10d..3e6f19a2 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
@@ -40,8 +40,10 @@ import kotlin.io.path.isDirectory
* @param path The path to the Parquet file or directory to read.
* @param factory Function to construct a [ParquetReader] for a local [InputFile].
*/
-public class LocalParquetReader<out T>(path: Path,
- private val factory: (InputFile) -> ParquetReader<T> = avro()) : AutoCloseable {
+public class LocalParquetReader<out T>(
+ path: Path,
+ private val factory: (InputFile) -> ParquetReader<T> = avro()
+) : AutoCloseable {
/**
* The input files to process.
*/
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt
new file mode 100644
index 00000000..b5eb1deb
--- /dev/null
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.util.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.OutputFile
+import java.nio.file.Path
+
+/**
+ * Helper class for writing Parquet records to local disk.
+ */
+public class LocalParquetWriter {
+ /**
+ * A [ParquetWriter.Builder] implementation supporting custom [OutputFile]s and [WriteSupport] implementations.
+ */
+ public class Builder<T> internal constructor(
+ output: OutputFile,
+ private val writeSupport: WriteSupport<T>
+ ) : ParquetWriter.Builder<T, Builder<T>>(output) {
+ override fun self(): Builder<T> = this
+
+ override fun getWriteSupport(conf: Configuration): WriteSupport<T> = writeSupport
+ }
+
+ public companion object {
+ /**
+ * Create a [Builder] instance that writes a Parquet file at the specified [path].
+ */
+ @JvmStatic
+ public fun <T> builder(path: Path, writeSupport: WriteSupport<T>): Builder<T> =
+ Builder(LocalOutputFile(path), writeSupport)
+ }
+}