diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-01 22:54:08 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-02 15:37:03 +0200 |
| commit | ea5e79fc77072e6151ee7952581b97e35a2027fb (patch) | |
| tree | e2a0ab7d3efbcd29dce0e8fabb44dc6c26610804 /opendc-trace/opendc-trace-opendc | |
| parent | ee057033b4c534fdd3e8a9d2320d75035d30f27a (diff) | |
perf(trace/opendc): Read records using low-level API
This change updates the OpenDC VM format reader implementation to use
the low-level record reading APIs provided by the `parquet-mr` library
for improved performance. Previously, we used the `parquet-avro` library
to read/write Avro records in Parquet format, but that library carries
considerable overhead.
Diffstat (limited to 'opendc-trace/opendc-trace-opendc')
14 files changed, 963 insertions, 257 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index b82da888..7a01b881 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -22,38 +22,30 @@ package org.opendc.trace.opendc -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.opendc.parquet.ResourceState import org.opendc.trace.util.parquet.LocalParquetReader -import java.time.Duration -import java.time.Instant /** * A [TableReader] implementation for the OpenDC virtual machine trace format. */ -internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader { +internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader<ResourceState>) : TableReader { /** * The current record. */ - private var record: GenericRecord? = null - - /** - * A flag to indicate that the columns have been initialized. - */ - private var hasInitializedColumns = false + private var record: ResourceState? = null override fun nextRow(): Boolean { - val record = reader.read() - this.record = record + try { + val record = reader.read() + this.record = record - if (!hasInitializedColumns && record != null) { - initColumns(record.schema) - hasInitializedColumns = true + return record != null + } catch (e: Throwable) { + this.record = null + throw e } - - return record != null } override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 @@ -67,36 +59,36 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_ID -> record[AVRO_COL_ID].toString() - COL_TIMESTAMP -> Instant.ofEpochMilli(record[AVRO_COL_TIMESTAMP] as Long) - COL_DURATION -> Duration.ofMillis(record[AVRO_COL_DURATION] as Long) - COL_CPU_COUNT -> getInt(index) - COL_CPU_USAGE -> getDouble(index) - else -> throw IllegalArgumentException("Invalid column") + COL_ID -> record.id + COL_TIMESTAMP -> record.timestamp + COL_DURATION -> record.duration + COL_CPU_COUNT -> record.cpuCount + COL_CPU_USAGE -> record.cpuUsage + else -> throw IllegalArgumentException("Invalid column index $index") } } override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") + throw IllegalArgumentException("Invalid column or type [index $index]") } override fun getInt(index: Int): Int { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int - else -> throw IllegalArgumentException("Invalid column") + COL_CPU_COUNT -> record.cpuCount + else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") + throw IllegalArgumentException("Invalid column or type [index $index]") } override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_CPU_USAGE -> (record[AVRO_COL_CPU_USAGE] as Number).toDouble() - else -> throw IllegalArgumentException("Invalid column") + COL_CPU_USAGE -> record.cpuUsage + else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } @@ -106,28 +98,6 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea override fun toString(): String = "OdcVmResourceStateTableReader" - /** - * Initialize the columns for the reader based on [schema]. - */ - private fun initColumns(schema: Schema) { - try { - AVRO_COL_ID = schema.getField("id").pos() - AVRO_COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos() - AVRO_COL_DURATION = schema.getField("duration").pos() - AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos() - AVRO_COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() - } catch (e: NullPointerException) { - // This happens when the field we are trying to access does not exist - throw IllegalArgumentException("Invalid schema", e) - } - } - - private var AVRO_COL_ID = -1 - private var AVRO_COL_TIMESTAMP = -1 - private var AVRO_COL_DURATION = -1 - private var AVRO_COL_CPU_COUNT = -1 - private var AVRO_COL_CPU_USAGE = -1 - private val COL_ID = 0 private val COL_TIMESTAMP = 1 private val COL_DURATION = 2 diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt index 01b9750c..97af5b59 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt @@ -22,84 +22,85 @@ package org.opendc.trace.opendc -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord -import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.opendc.parquet.ResourceState import java.time.Duration import java.time.Instant /** * A [TableWriter] implementation for the OpenDC virtual machine trace format. */ -internal class OdcVmResourceStateTableWriter( - private val writer: ParquetWriter<GenericRecord>, - private val schema: Schema -) : TableWriter { +internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<ResourceState>) : TableWriter { /** - * The current builder for the record that is being written. + * The current state for the record that is being written. */ - private var builder: GenericRecordBuilder? = null - - /** - * The fields belonging to the resource state schema. - */ - private val fields = schema.fields + private var _isActive = false + private var _id: String = "" + private var _timestamp: Instant = Instant.MIN + private var _duration: Duration = Duration.ZERO + private var _cpuCount: Int = 0 + private var _cpuUsage: Double = Double.NaN override fun startRow() { - builder = GenericRecordBuilder(schema) + _isActive = true + _id = "" + _timestamp = Instant.MIN + _duration = Duration.ZERO + _cpuCount = 0 + _cpuUsage = Double.NaN } override fun endRow() { - val builder = checkNotNull(builder) { "No active row" } - this.builder = null - - val record = builder.build() - val id = record[COL_ID] as String - val timestamp = record[COL_TIMESTAMP] as Long + check(_isActive) { "No active row" } + _isActive = false - check(lastId != id || timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" } + check(lastId != _id || _timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" } - writer.write(builder.build()) + writer.write(ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage)) - lastId = id - lastTimestamp = timestamp + lastId = _id + lastTimestamp = _timestamp } - override fun resolve(column: TableColumn<*>): Int { - val schema = schema - return when (column) { - RESOURCE_ID -> schema.getField("id").pos() - RESOURCE_STATE_TIMESTAMP -> (schema.getField("timestamp") ?: schema.getField("time")).pos() - RESOURCE_STATE_DURATION -> schema.getField("duration").pos() - RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("cores")).pos() - RESOURCE_STATE_CPU_USAGE -> (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() - else -> -1 - } - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 override fun set(index: Int, value: Any) { - val builder = checkNotNull(builder) { "No active row" } - - builder.set( - fields[index], - when (index) { - COL_TIMESTAMP -> (value as Instant).toEpochMilli() - COL_DURATION -> (value as Duration).toMillis() - else -> value - } - ) + check(_isActive) { "No active row" } + + when (index) { + COL_ID -> _id = value as String + COL_TIMESTAMP -> _timestamp = value as Instant + COL_DURATION -> _duration = value as Duration + COL_CPU_COUNT -> _cpuCount = value as Int + COL_CPU_USAGE -> _cpuUsage = value as Double + } } - override fun setBoolean(index: Int, value: Boolean) = set(index, value) + override fun setBoolean(index: Int, value: Boolean) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } - override fun setInt(index: Int, value: Int) = set(index, value) + override fun setInt(index: Int, value: Int) { + check(_isActive) { "No active row" } + when (index) { + COL_CPU_COUNT -> _cpuCount = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } - override fun setLong(index: Int, value: Long) = set(index, value) + override fun setLong(index: Int, value: Long) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } - override fun setDouble(index: Int, value: Double) = set(index, value) + override fun setDouble(index: Int, value: Double) { + check(_isActive) { "No active row" } + when (index) { + COL_CPU_USAGE -> _cpuUsage = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } override fun flush() { // Not available @@ -113,12 +114,19 @@ internal class OdcVmResourceStateTableWriter( * Last column values that are used to check for correct partitioning. */ private var lastId: String? = null - private var lastTimestamp: Long = Long.MIN_VALUE - - /** - * Columns with special behavior. - */ - private val COL_ID = resolve(RESOURCE_ID) - private val COL_TIMESTAMP = resolve(RESOURCE_STATE_TIMESTAMP) - private val COL_DURATION = resolve(RESOURCE_STATE_DURATION) + private var lastTimestamp: Instant = Instant.MAX + + private val COL_ID = 0 + private val COL_TIMESTAMP = 1 + private val COL_DURATION = 2 + private val COL_CPU_COUNT = 3 + private val COL_CPU_USAGE = 4 + + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, + RESOURCE_STATE_DURATION to COL_DURATION, + RESOURCE_CPU_COUNT to COL_CPU_COUNT, + RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, + ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt index 4909e70e..6102332f 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt @@ -22,37 +22,30 @@ package org.opendc.trace.opendc -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.opendc.parquet.Resource import org.opendc.trace.util.parquet.LocalParquetReader -import java.time.Instant /** - * A [TableReader] implementation for the resources table in the OpenDC virtual machine trace format. + * A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format. */ -internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader { +internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<Resource>) : TableReader { /** * The current record. */ - private var record: GenericRecord? = null - - /** - * A flag to indicate that the columns have been initialized. - */ - private var hasInitializedColumns = false + private var record: Resource? = null override fun nextRow(): Boolean { - val record = reader.read() - this.record = record + try { + val record = reader.read() + this.record = record - if (!hasInitializedColumns && record != null) { - initColumns(record.schema) - hasInitializedColumns = true + return record != null + } catch (e: Throwable) { + this.record = null + throw e } - - return record != null } override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 @@ -66,9 +59,9 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_ID -> record[AVRO_COL_ID].toString() - COL_START_TIME -> Instant.ofEpochMilli(record[AVRO_COL_START_TIME] as Long) - COL_STOP_TIME -> Instant.ofEpochMilli(record[AVRO_COL_STOP_TIME] as Long) + COL_ID -> record.id + COL_START_TIME -> record.startTime + COL_STOP_TIME -> record.stopTime COL_CPU_COUNT -> getInt(index) COL_CPU_CAPACITY -> getDouble(index) COL_MEM_CAPACITY -> getDouble(index) @@ -84,7 +77,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int + COL_CPU_COUNT -> record.cpuCount else -> throw IllegalArgumentException("Invalid column") } } @@ -97,8 +90,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_CPU_CAPACITY -> if (AVRO_COL_CPU_CAPACITY >= 0) (record[AVRO_COL_CPU_CAPACITY] as Number).toDouble() else 0.0 - COL_MEM_CAPACITY -> (record[AVRO_COL_MEM_CAPACITY] as Number).toDouble() + COL_CPU_CAPACITY -> record.cpuCapacity + COL_MEM_CAPACITY -> record.memCapacity else -> throw IllegalArgumentException("Invalid column") } } @@ -109,30 +102,6 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G override fun toString(): String = "OdcVmResourceTableReader" - /** - * Initialize the columns for the reader based on [schema]. - */ - private fun initColumns(schema: Schema) { - try { - AVRO_COL_ID = schema.getField("id").pos() - AVRO_COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos() - AVRO_COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos() - AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos() - AVRO_COL_CPU_CAPACITY = schema.getField("cpu_capacity")?.pos() ?: -1 - AVRO_COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos() - } catch (e: NullPointerException) { - // This happens when the field we are trying to access does not exist - throw IllegalArgumentException("Invalid schema") - } - } - - private var AVRO_COL_ID = -1 - private var AVRO_COL_START_TIME = -1 - private var AVRO_COL_STOP_TIME = -1 - private var AVRO_COL_CPU_COUNT = -1 - private var AVRO_COL_CPU_CAPACITY = -1 - private var AVRO_COL_MEM_CAPACITY = -1 - private val COL_ID = 0 private val COL_START_TIME = 1 private val COL_STOP_TIME = 2 diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt index edc89ee6..cae65faa 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt @@ -22,74 +22,82 @@ package org.opendc.trace.opendc -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord -import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.opendc.parquet.Resource import java.time.Instant -import kotlin.math.roundToLong /** * A [TableWriter] implementation for the OpenDC virtual machine trace format. */ -internal class OdcVmResourceTableWriter( - private val writer: ParquetWriter<GenericRecord>, - private val schema: Schema -) : TableWriter { +internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resource>) : TableWriter { /** - * The current builder for the record that is being written. + * The current state for the record that is being written. */ - private var builder: GenericRecordBuilder? = null - - /** - * The fields belonging to the resource schema. - */ - private val fields = schema.fields + private var _isActive = false + private var _id: String = "" + private var _startTime: Instant = Instant.MIN + private var _stopTime: Instant = Instant.MIN + private var _cpuCount: Int = 0 + private var _cpuCapacity: Double = Double.NaN + private var _memCapacity: Double = Double.NaN override fun startRow() { - builder = GenericRecordBuilder(schema) + _isActive = true + _id = "" + _startTime = Instant.MIN + _stopTime = Instant.MIN + _cpuCount = 0 + _cpuCapacity = Double.NaN + _memCapacity = Double.NaN } override fun endRow() { - val builder = checkNotNull(builder) { "No active row" } - this.builder = null - writer.write(builder.build()) + check(_isActive) { "No active row" } + _isActive = false + writer.write(Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity)) } - override fun resolve(column: TableColumn<*>): Int { - val schema = schema - return when (column) { - RESOURCE_ID -> schema.getField("id").pos() - RESOURCE_START_TIME -> (schema.getField("start_time") ?: schema.getField("submissionTime")).pos() - RESOURCE_STOP_TIME -> (schema.getField("stop_time") ?: schema.getField("endTime")).pos() - RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos() - RESOURCE_CPU_CAPACITY -> schema.getField("cpu_capacity").pos() - RESOURCE_MEM_CAPACITY -> (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos() - else -> -1 - } - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 override fun set(index: Int, value: Any) { - val builder = checkNotNull(builder) { "No active row" } - builder.set( - fields[index], - when (index) { - COL_START_TIME, COL_STOP_TIME -> (value as Instant).toEpochMilli() - COL_MEM_CAPACITY -> (value as Double).roundToLong() - else -> value - } - ) + check(_isActive) { "No active row" } + when (index) { + COL_ID -> _id = value as String + COL_START_TIME -> _startTime = value as Instant + COL_STOP_TIME -> _stopTime = value as Instant + COL_CPU_COUNT -> _cpuCount = value as Int + COL_CPU_CAPACITY -> _cpuCapacity = value as Double + COL_MEM_CAPACITY -> _memCapacity = value as Double + else -> throw IllegalArgumentException("Invalid column index $index") + } } - override fun setBoolean(index: Int, value: Boolean) = set(index, value) + override fun setBoolean(index: Int, value: Boolean) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } - override fun setInt(index: Int, value: Int) = set(index, value) + override fun setInt(index: Int, value: Int) { + check(_isActive) { "No active row" } + when (index) { + COL_CPU_COUNT -> _cpuCount = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } - override fun setLong(index: Int, value: Long) = set(index, value) + override fun setLong(index: Int, value: Long) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } - override fun setDouble(index: Int, value: Double) = set(index, value) + override fun setDouble(index: Int, value: Double) { + check(_isActive) { "No active row" } + when (index) { + COL_CPU_CAPACITY -> _cpuCapacity = value + COL_MEM_CAPACITY -> _memCapacity = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } override fun flush() { // Not available @@ -99,10 +107,19 @@ internal class OdcVmResourceTableWriter( writer.close() } - /** - * Columns with special behavior. - */ - private val COL_START_TIME = resolve(RESOURCE_START_TIME) - private val COL_STOP_TIME = resolve(RESOURCE_STOP_TIME) - private val COL_MEM_CAPACITY = resolve(RESOURCE_MEM_CAPACITY) + private val COL_ID = 0 + private val COL_START_TIME = 1 + private val COL_STOP_TIME = 2 + private val COL_CPU_COUNT = 3 + private val COL_CPU_CAPACITY = 4 + private val COL_MEM_CAPACITY = 5 + + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_START_TIME to COL_START_TIME, + RESOURCE_STOP_TIME to COL_STOP_TIME, + RESOURCE_CPU_COUNT to COL_CPU_COUNT, + RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY, + RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY, + ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 1a15c7b3..155f8cf3 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -22,19 +22,19 @@ package org.opendc.trace.opendc -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericRecord -import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.column.ParquetProperties import org.apache.parquet.hadoop.ParquetFileWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.opendc.parquet.ResourceReadSupport +import org.opendc.trace.opendc.parquet.ResourceStateReadSupport +import org.opendc.trace.opendc.parquet.ResourceStateWriteSupport +import org.opendc.trace.opendc.parquet.ResourceWriteSupport import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import org.opendc.trace.util.parquet.LocalOutputFile import org.opendc.trace.util.parquet.LocalParquetReader -import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA +import org.opendc.trace.util.parquet.LocalParquetWriter import shaded.parquet.com.fasterxml.jackson.core.JsonEncoding import shaded.parquet.com.fasterxml.jackson.core.JsonFactory import java.nio.file.Files @@ -105,11 +105,11 @@ public class OdcVmTraceFormat : TraceFormat { override fun newReader(path: Path, table: String): TableReader { return when (table) { TABLE_RESOURCES -> { - val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet")) + val reader = LocalParquetReader(path.resolve("meta.parquet"), LocalParquetReader.custom(ResourceReadSupport())) OdcVmResourceTableReader(reader) } TABLE_RESOURCE_STATES -> { - val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet")) + val reader = LocalParquetReader(path.resolve("trace.parquet"), LocalParquetReader.custom(ResourceStateReadSupport())) OdcVmResourceStateTableReader(reader) } TABLE_INTERFERENCE_GROUPS -> { @@ -128,24 +128,24 @@ public class OdcVmTraceFormat : TraceFormat { override fun newWriter(path: Path, table: String): TableWriter { return when (table) { TABLE_RESOURCES -> { - val schema = RESOURCES_SCHEMA - val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("meta.parquet"))) - .withSchema(schema) + val writer = LocalParquetWriter.builder(path.resolve("meta.parquet"), ResourceWriteSupport()) .withCompressionCodec(CompressionCodecName.ZSTD) + .withPageWriteChecksumEnabled(true) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build() - OdcVmResourceTableWriter(writer, schema) + OdcVmResourceTableWriter(writer) } TABLE_RESOURCE_STATES -> { - val schema = RESOURCE_STATES_SCHEMA - val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("trace.parquet"))) - .withSchema(schema) + val writer = LocalParquetWriter.builder(path.resolve("trace.parquet"), ResourceStateWriteSupport()) .withCompressionCodec(CompressionCodecName.ZSTD) .withDictionaryEncoding("id", true) .withBloomFilterEnabled("id", true) + .withPageWriteChecksumEnabled(true) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build() - OdcVmResourceStateTableWriter(writer, schema) + OdcVmResourceStateTableWriter(writer) } TABLE_INTERFERENCE_GROUPS -> { val generator = jsonFactory.createGenerator(path.resolve("interference-model.json").toFile(), JsonEncoding.UTF8) @@ -154,37 +154,4 @@ public class OdcVmTraceFormat : TraceFormat { else -> throw IllegalArgumentException("Table $table not supported") } } - - public companion object { - /** - * Schema for the resources table in the trace. - */ - @JvmStatic - public val RESOURCES_SCHEMA: Schema = SchemaBuilder - .record("resource") - .namespace("org.opendc.trace.opendc") - .fields() - .requiredString("id") - .name("start_time").type(TIMESTAMP_SCHEMA).noDefault() - .name("stop_time").type(TIMESTAMP_SCHEMA).noDefault() - .requiredInt("cpu_count") - .requiredDouble("cpu_capacity") - .requiredLong("mem_capacity") - .endRecord() - - /** - * Schema for the resource states table in the trace. - */ - @JvmStatic - public val RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder - .record("resource_state") - .namespace("org.opendc.trace.opendc") - .fields() - .requiredString("id") - .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() - .requiredLong("duration") - .requiredInt("cpu_count") - .requiredDouble("cpu_usage") - .endRecord() - } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt new file mode 100644 index 00000000..c6db45b5 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import java.time.Instant + +/** + * A description of a resource in a trace. + */ +internal data class Resource( + val id: String, + val startTime: Instant, + val stopTime: Instant, + val cpuCount: Int, + val cpuCapacity: Double, + val memCapacity: Double +) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt new file mode 100644 index 00000000..47cce914 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.* + +/** + * A [ReadSupport] instance for [Resource] objects. + */ +internal class ResourceReadSupport : ReadSupport<Resource>() { + override fun init(context: InitContext): ReadContext { + return ReadContext(READ_SCHEMA) + } + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType, + readContext: ReadContext + ): RecordMaterializer<Resource> = ResourceRecordMaterializer(readContext.requestedSchema) + + companion object { + /** + * Parquet read schema (version 2.0) for the "resources" table in the trace. + */ + @JvmStatic + val READ_SCHEMA_V2_0: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("submissionTime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("endTime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("maxCores"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("requiredMemory"), + ) + .named("resource") + + /** + * Parquet read schema (version 2.1) for the "resources" table in the trace. + */ + @JvmStatic + val READ_SCHEMA_V2_1: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("start_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("stop_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + ) + .named("resource") + + /** + * Parquet read schema for the "resources" table in the trace. + */ + @JvmStatic + val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0 + .union(READ_SCHEMA_V2_1) + } +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt new file mode 100644 index 00000000..3adb0709 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import org.apache.parquet.io.api.* +import org.apache.parquet.schema.MessageType +import java.time.Instant + +/** + * A [RecordMaterializer] for [Resource] records. + */ +internal class ResourceRecordMaterializer(schema: MessageType) : RecordMaterializer<Resource>() { + /** + * State of current record being read. + */ + private var _id = "" + private var _startTime = Instant.MIN + private var _stopTime = Instant.MIN + private var _cpuCount = 0 + private var _cpuCapacity = 0.0 + private var _memCapacity = 0.0 + + /** + * Root converter for the record. + */ + private val root = object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = schema.fields.map { type -> + when (type.name) { + "id" -> object : PrimitiveConverter() { + override fun addBinary(value: Binary) { + _id = value.toStringUsingUTF8() + } + } + "start_time", "submissionTime" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _startTime = Instant.ofEpochMilli(value) + } + } + "stop_time", "endTime" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _stopTime = Instant.ofEpochMilli(value) + } + } + "cpu_count", "maxCores" -> object : PrimitiveConverter() { + override fun addInt(value: Int) { + _cpuCount = value + } + } + "cpu_capacity" -> object : PrimitiveConverter() { + override fun addDouble(value: Double) { + _cpuCapacity = value + } + } + "mem_capacity", "requiredMemory" -> object : PrimitiveConverter() { + override fun addDouble(value: Double) { + _memCapacity = value + } + + override fun addLong(value: Long) { + _memCapacity = value.toDouble() + } + } + else -> error("Unknown column $type") + } + } + + override fun start() { + _id = "" + _startTime = Instant.MIN + _stopTime = Instant.MIN + _cpuCount = 0 + _cpuCapacity = 0.0 + _memCapacity = 0.0 + } + + override fun end() {} + + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } + + override fun getCurrentRecord(): Resource = Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity) + + override fun getRootConverter(): GroupConverter = root +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt new file mode 100644 index 00000000..9ad58764 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import java.time.Duration +import java.time.Instant + +internal class ResourceState( + val id: String, + val timestamp: Instant, + val duration: Duration, + val cpuCount: Int, + val cpuUsage: Double +) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt new file mode 100644 index 00000000..17840ceb --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.* + +/** + * A [ReadSupport] instance for [ResourceState] objects. + */ +internal class ResourceStateReadSupport : ReadSupport<ResourceState>() { + override fun init(context: InitContext): ReadContext { + return ReadContext(READ_SCHEMA) + } + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType, + readContext: ReadContext + ): RecordMaterializer<ResourceState> = ResourceStateRecordMaterializer(readContext.requestedSchema) + + companion object { + /** + * Parquet read schema (version 2.0) for the "resource states" table in the trace. + */ + @JvmStatic + val READ_SCHEMA_V2_0: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cores"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpuUsage") + ) + .named("resource_state") + + /** + * Parquet read schema (version 2.1) for the "resource states" table in the trace. + */ + @JvmStatic + val READ_SCHEMA_V2_1: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage") + ) + .named("resource_state") + + /** + * Parquet read schema for the "resource states" table in the trace. + */ + @JvmStatic + val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0.union(READ_SCHEMA_V2_1) + } +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt new file mode 100644 index 00000000..f8b0c3c2 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import org.apache.parquet.io.api.* +import org.apache.parquet.schema.MessageType +import java.time.Duration +import java.time.Instant + +/** + * A [RecordMaterializer] for [ResourceState] records. + */ +internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMaterializer<ResourceState>() { + /** + * State of current record being read. + */ + private var _id = "" + private var _timestamp = Instant.MIN + private var _duration = Duration.ZERO + private var _cpuCount = 0 + private var _cpuUsage = 0.0 + + /** + * Root converter for the record. + */ + private val root = object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = schema.fields.map { type -> + when (type.name) { + "id" -> object : PrimitiveConverter() { + override fun addBinary(value: Binary) { + _id = value.toStringUsingUTF8() + } + } + "timestamp", "time" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _timestamp = Instant.ofEpochMilli(value) + } + } + "duration" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _duration = Duration.ofMillis(value) + } + } + "cpu_count", "cores" -> object : PrimitiveConverter() { + override fun addInt(value: Int) { + _cpuCount = value + } + } + "cpu_usage", "cpuUsage" -> object : PrimitiveConverter() { + override fun addDouble(value: Double) { + _cpuUsage = value + } + } + "flops" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + // Ignore to support v1 format + } + } + else -> error("Unknown column $type") + } + } + + override fun start() { + _id = "" + _timestamp = Instant.MIN + _duration = Duration.ZERO + _cpuCount = 0 + _cpuUsage = 0.0 + } + + override fun end() {} + + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } + + override fun getCurrentRecord(): ResourceState = ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage) + + override fun getRootConverter(): GroupConverter = root +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt new file mode 100644 index 00000000..e2f3df31 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.Binary +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.* + +/** + * Support for writing [Resource] instances to Parquet format. + */ +internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() { + /** + * The current active record consumer. + */ + private lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(WRITE_SCHEMA, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: ResourceState) { + write(recordConsumer, record) + } + + private fun write(consumer: RecordConsumer, record: ResourceState) { + consumer.startMessage() + + consumer.startField("id", 0) + consumer.addBinary(Binary.fromCharSequence(record.id)) + consumer.endField("id", 0) + + consumer.startField("timestamp", 1) + consumer.addLong(record.timestamp.toEpochMilli()) + consumer.endField("timestamp", 1) + + consumer.startField("duration", 2) + consumer.addLong(record.duration.toMillis()) + consumer.endField("duration", 2) + + consumer.startField("cpu_count", 3) + consumer.addInteger(record.cpuCount) + consumer.endField("cpu_count", 3) + + consumer.startField("cpu_usage", 4) + consumer.addDouble(record.cpuUsage) + consumer.endField("cpu_usage", 4) + + consumer.endMessage() + } + + companion object { + /** + * Parquet schema for the "resource states" table in the trace. + */ + @JvmStatic + val WRITE_SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage") + ) + .named("resource_state") + } +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt new file mode 100644 index 00000000..14cadabb --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.Binary +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.* +import kotlin.math.roundToLong + +/** + * Support for writing [Resource] instances to Parquet format. + */ +internal class ResourceWriteSupport : WriteSupport<Resource>() { + /** + * The current active record consumer. + */ + private lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(WRITE_SCHEMA, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: Resource) { + write(recordConsumer, record) + } + + private fun write(consumer: RecordConsumer, record: Resource) { + consumer.startMessage() + + consumer.startField("id", 0) + consumer.addBinary(Binary.fromCharSequence(record.id)) + consumer.endField("id", 0) + + consumer.startField("start_time", 1) + consumer.addLong(record.startTime.toEpochMilli()) + consumer.endField("start_time", 1) + + consumer.startField("stop_time", 2) + consumer.addLong(record.stopTime.toEpochMilli()) + consumer.endField("stop_time", 2) + + consumer.startField("cpu_count", 3) + consumer.addInteger(record.cpuCount) + consumer.endField("cpu_count", 3) + + consumer.startField("cpu_capacity", 4) + consumer.addDouble(record.cpuCapacity) + consumer.endField("cpu_capacity", 4) + + consumer.startField("mem_capacity", 5) + consumer.addLong(record.memCapacity.roundToLong()) + consumer.endField("mem_capacity", 5) + + consumer.endMessage() + } + + companion object { + /** + * Parquet schema for the "resources" table in the trace. + */ + @JvmStatic + val WRITE_SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("start_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("stop_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + ) + .named("resource") + } +} diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index c8742624..dec0fef9 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -29,7 +29,9 @@ import org.junit.jupiter.api.assertThrows import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import org.opendc.trace.conv.* +import java.nio.file.Files import java.nio.file.Paths +import java.time.Instant /** * Test suite for the [OdcVmTraceFormat] implementation. @@ -78,6 +80,37 @@ internal class OdcVmTraceFormatTest { reader.close() } + @Test + fun testResourcesWrite() { + val path = Files.createTempDirectory("opendc") + val writer = format.newWriter(path, TABLE_RESOURCES) + + writer.startRow() + writer.set(RESOURCE_ID, "1019") + writer.set(RESOURCE_START_TIME, Instant.EPOCH) + writer.set(RESOURCE_STOP_TIME, Instant.EPOCH) + writer.setInt(RESOURCE_CPU_COUNT, 1) + writer.setDouble(RESOURCE_CPU_CAPACITY, 1024.0) + writer.setDouble(RESOURCE_MEM_CAPACITY, 1024.0) + writer.endRow() + writer.close() + + val reader = format.newReader(path, TABLE_RESOURCES) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("1019", reader.get(RESOURCE_ID)) }, + { assertEquals(Instant.EPOCH, reader.get(RESOURCE_START_TIME)) }, + { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STOP_TIME)) }, + { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, + { assertEquals(1024.0, reader.getDouble(RESOURCE_CPU_CAPACITY)) }, + { assertEquals(1024.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) }, + { assertFalse(reader.nextRow()) }, + ) + + reader.close() + } + @ParameterizedTest @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testSmoke(name: String) { @@ -95,6 +128,33 @@ internal class OdcVmTraceFormatTest { } @Test + fun testResourceStatesWrite() { + val path = Files.createTempDirectory("opendc") + val writer = format.newWriter(path, TABLE_RESOURCE_STATES) + + writer.startRow() + writer.set(RESOURCE_ID, "1019") + writer.set(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH) + writer.setDouble(RESOURCE_STATE_CPU_USAGE, 23.0) + writer.setInt(RESOURCE_CPU_COUNT, 1) + writer.endRow() + writer.close() + + val reader = format.newReader(path, TABLE_RESOURCE_STATES) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("1019", reader.get(RESOURCE_ID)) }, + { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STATE_TIMESTAMP)) }, + { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, + { assertEquals(23.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE)) }, + { assertFalse(reader.nextRow()) }, + ) + + reader.close() + } + + @Test fun testInterferenceGroups() { val path = Paths.get("src/test/resources/trace-v2.1") val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS) |
