diff options
9 files changed, 675 insertions, 147 deletions
diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts index 9ced95a7..319b2ae3 100644 --- a/opendc-compute/opendc-compute-workload/build.gradle.kts +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -39,4 +39,6 @@ dependencies { implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) + + testImplementation(libs.slf4j.simple) } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt index 84387bbc..c854d874 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt @@ -23,14 +23,12 @@ package org.opendc.compute.workload.export.parquet import mu.KotlinLogging -import org.apache.avro.Schema -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.column.ParquetProperties import org.apache.parquet.hadoop.ParquetFileWriter import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.trace.util.parquet.LocalOutputFile +import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.BlockingQueue @@ -38,10 +36,13 @@ import kotlin.concurrent.thread /** * A writer that writes data in Parquet format. + * + * @param path The path to the file to write the data to. + * @param writeSupport The [WriteSupport] implementation for converting the records to Parquet format. */ public abstract class ParquetDataWriter<in T>( path: File, - private val schema: Schema, + private val writeSupport: WriteSupport<T>, bufferSize: Int = 4096 ) : AutoCloseable { /** @@ -52,7 +53,7 @@ public abstract class ParquetDataWriter<in T>( /** * The queue of records to process. */ - private val queue: BlockingQueue<GenericData.Record> = ArrayBlockingQueue(bufferSize) + private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize) /** * An exception to be propagated to the actual writer. @@ -64,15 +65,15 @@ public abstract class ParquetDataWriter<in T>( */ private val writerThread = thread(start = false, name = this.toString()) { val writer = let { - val builder = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path)) - .withSchema(schema) + val builder = LocalParquetWriter.builder(path.toPath(), writeSupport) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) .withCompressionCodec(CompressionCodecName.ZSTD) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) buildWriter(builder) } val queue = queue - val buf = mutableListOf<GenericData.Record>() + val buf = mutableListOf<T>() var shouldStop = false try { @@ -101,16 +102,11 @@ public abstract class ParquetDataWriter<in T>( /** * Build the [ParquetWriter] used to write the Parquet files. */ - protected open fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + protected open fun buildWriter(builder: LocalParquetWriter.Builder<@UnsafeVariance T>): ParquetWriter<@UnsafeVariance T> { return builder.build() } /** - * Convert the specified [data] into a Parquet record. - */ - protected abstract fun convert(builder: GenericRecordBuilder, data: T) - - /** * Write the specified metrics to the database. */ public fun write(data: T) { @@ -119,9 +115,7 @@ public abstract class ParquetDataWriter<in T>( throw IllegalStateException("Writer thread failed", exception) } - val builder = GenericRecordBuilder(schema) - convert(builder, data) - queue.put(builder.build()) + queue.put(data) } /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt index 72dbba90..0d5b6b34 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt @@ -22,81 +22,189 @@ package org.opendc.compute.workload.export.parquet -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter +import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.* import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA -import org.opendc.trace.util.avro.UUID_SCHEMA -import org.opendc.trace.util.avro.optional +import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File +import java.util.* /** * A Parquet event writer for [HostTableReader]s. */ public class ParquetHostDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<HostTableReader>(path, SCHEMA, bufferSize) { + ParquetDataWriter<HostTableReader>(path, HostDataWriteSupport(), bufferSize) { - override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + override fun buildWriter(builder: LocalParquetWriter.Builder<HostTableReader>): ParquetWriter<HostTableReader> { return builder .withDictionaryEncoding("host_id", true) .build() } - override fun convert(builder: GenericRecordBuilder, data: HostTableReader) { - builder["timestamp"] = data.timestamp.toEpochMilli() + override fun toString(): String = "host-writer" - builder["host_id"] = data.host.id + /** + * A [WriteSupport] implementation for a [HostTableReader]. + */ + private class HostDataWriteSupport : WriteSupport<HostTableReader>() { + lateinit var recordConsumer: RecordConsumer - builder["uptime"] = data.uptime - builder["downtime"] = data.downtime - val bootTime = data.bootTime - builder["boot_time"] = bootTime?.toEpochMilli() + override fun init(configuration: Configuration): WriteContext { + return WriteContext(SCHEMA, emptyMap()) + } - builder["cpu_count"] = data.host.cpuCount - builder["cpu_limit"] = data.cpuLimit - builder["cpu_time_active"] = data.cpuActiveTime - builder["cpu_time_idle"] = data.cpuIdleTime - builder["cpu_time_steal"] = data.cpuStealTime - builder["cpu_time_lost"] = data.cpuLostTime + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } - builder["mem_limit"] = data.host.memCapacity + override fun write(record: HostTableReader) { + write(recordConsumer, record) + } - builder["power_total"] = data.powerTotal + private fun write(consumer: RecordConsumer, data: HostTableReader) { + consumer.startMessage() - builder["guests_terminated"] = data.guestsTerminated - builder["guests_running"] = data.guestsRunning - builder["guests_error"] = data.guestsError - builder["guests_invalid"] = data.guestsInvalid - } + consumer.startField("timestamp", 0) + consumer.addLong(data.timestamp.toEpochMilli()) + consumer.endField("timestamp", 0) - override fun toString(): String = "host-writer" + consumer.startField("host_id", 1) + consumer.addBinary(UUID.fromString(data.host.id).toBinary()) + consumer.endField("host_id", 1) + + consumer.startField("uptime", 2) + consumer.addLong(data.uptime) + consumer.endField("uptime", 2) + + consumer.startField("downtime", 3) + consumer.addLong(data.downtime) + consumer.endField("downtime", 3) + + val bootTime = data.bootTime + if (bootTime != null) { + consumer.startField("boot_time", 4) + consumer.addLong(bootTime.toEpochMilli()) + consumer.endField("boot_time", 4) + } + + consumer.startField("cpu_count", 5) + consumer.addInteger(data.host.cpuCount) + consumer.endField("cpu_count", 5) + + consumer.startField("cpu_limit", 6) + consumer.addDouble(data.cpuLimit) + consumer.endField("cpu_limit", 6) + + consumer.startField("cpu_time_active", 7) + consumer.addLong(data.cpuActiveTime) + consumer.endField("cpu_time_active", 7) + + consumer.startField("cpu_time_idle", 8) + consumer.addLong(data.cpuIdleTime) + consumer.endField("cpu_time_idle", 8) + + consumer.startField("cpu_time_steal", 9) + consumer.addLong(data.cpuStealTime) + consumer.endField("cpu_time_steal", 9) + + consumer.startField("cpu_time_lost", 10) + consumer.addLong(data.cpuLostTime) + consumer.endField("cpu_time_lost", 10) + + consumer.startField("mem_limit", 11) + consumer.addLong(data.host.memCapacity) + consumer.endField("mem_limit", 11) + + consumer.startField("power_total", 12) + consumer.addDouble(data.powerTotal) + consumer.endField("power_total", 12) + + consumer.startField("guests_terminated", 13) + consumer.addInteger(data.guestsTerminated) + consumer.endField("guests_terminated", 13) + + consumer.startField("guests_running", 14) + consumer.addInteger(data.guestsRunning) + consumer.endField("guests_running", 14) + + consumer.startField("guests_error", 15) + consumer.addInteger(data.guestsError) + consumer.endField("guests_error", 15) + + consumer.startField("guests_invalid", 16) + consumer.addInteger(data.guestsInvalid) + consumer.endField("guests_invalid", 16) + + consumer.endMessage() + } + } private companion object { - private val SCHEMA: Schema = SchemaBuilder - .record("host") - .namespace("org.opendc.telemetry.compute") - .fields() - .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() - .name("host_id").type(UUID_SCHEMA).noDefault() - .requiredLong("uptime") - .requiredLong("downtime") - .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() - .requiredInt("cpu_count") - .requiredDouble("cpu_limit") - .requiredLong("cpu_time_active") - .requiredLong("cpu_time_idle") - .requiredLong("cpu_time_steal") - .requiredLong("cpu_time_lost") - .requiredLong("mem_limit") - .requiredDouble("power_total") - .requiredInt("guests_terminated") - .requiredInt("guests_running") - .requiredInt("guests_error") - .requiredInt("guests_invalid") - .endRecord() + /** + * The schema of the host data. + */ + val SCHEMA: MessageType = Types + .buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .length(16) + .`as`(LogicalTypeAnnotation.uuidType()) + .named("host_id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("uptime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("downtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("boot_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_limit"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_active"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_idle"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_steal"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_lost"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_limit"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("power_total"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_terminated"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_running"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_error"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_invalid"), + ) + .named("host") } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt index aac6115f..5d11629b 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt @@ -22,73 +22,177 @@ package org.opendc.compute.workload.export.parquet -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter +import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.* import org.opendc.telemetry.compute.table.ServerTableReader -import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA -import org.opendc.trace.util.avro.UUID_SCHEMA -import org.opendc.trace.util.avro.optional +import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File +import java.util.* /** * A Parquet event writer for [ServerTableReader]s. */ public class ParquetServerDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<ServerTableReader>(path, SCHEMA, bufferSize) { + ParquetDataWriter<ServerTableReader>(path, ServerDataWriteSupport(), bufferSize) { - override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + override fun buildWriter(builder: LocalParquetWriter.Builder<ServerTableReader>): ParquetWriter<ServerTableReader> { return builder .withDictionaryEncoding("server_id", true) .withDictionaryEncoding("host_id", true) .build() } - override fun convert(builder: GenericRecordBuilder, data: ServerTableReader) { - builder["timestamp"] = data.timestamp.toEpochMilli() + override fun toString(): String = "server-writer" - builder["server_id"] = data.server.id - builder["host_id"] = data.host?.id + /** + * A [WriteSupport] implementation for a [ServerTableReader]. + */ + private class ServerDataWriteSupport : WriteSupport<ServerTableReader>() { + lateinit var recordConsumer: RecordConsumer - builder["uptime"] = data.uptime - builder["downtime"] = data.downtime - builder["boot_time"] = data.bootTime?.toEpochMilli() - builder["provision_time"] = data.provisionTime?.toEpochMilli() + override fun init(configuration: Configuration): WriteContext { + return WriteContext(SCHEMA, emptyMap()) + } - builder["cpu_count"] = data.server.cpuCount - builder["cpu_limit"] = data.cpuLimit - builder["cpu_time_active"] = data.cpuActiveTime - builder["cpu_time_idle"] = data.cpuIdleTime - builder["cpu_time_steal"] = data.cpuStealTime - builder["cpu_time_lost"] = data.cpuLostTime + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } - builder["mem_limit"] = data.server.memCapacity - } + override fun write(record: ServerTableReader) { + write(recordConsumer, record) + } - override fun toString(): String = "server-writer" + private fun write(consumer: RecordConsumer, data: ServerTableReader) { + consumer.startMessage() + + consumer.startField("timestamp", 0) + consumer.addLong(data.timestamp.toEpochMilli()) + consumer.endField("timestamp", 0) + + consumer.startField("server_id", 1) + consumer.addBinary(UUID.fromString(data.server.id).toBinary()) + consumer.endField("server_id", 1) + + val hostId = data.host?.id + if (hostId != null) { + consumer.startField("host_id", 2) + consumer.addBinary(UUID.fromString(hostId).toBinary()) + consumer.endField("host_id", 2) + } + + consumer.startField("uptime", 3) + consumer.addLong(data.uptime) + consumer.endField("uptime", 3) + + consumer.startField("downtime", 4) + consumer.addLong(data.downtime) + consumer.endField("downtime", 4) + + val bootTime = data.bootTime + if (bootTime != null) { + consumer.startField("boot_time", 5) + consumer.addLong(bootTime.toEpochMilli()) + consumer.endField("boot_time", 5) + } + + val provisionTime = data.provisionTime + if (provisionTime != null) { + consumer.startField("provision_time", 6) + consumer.addLong(provisionTime.toEpochMilli()) + consumer.endField("provision_time", 6) + } + + consumer.startField("cpu_count", 7) + consumer.addInteger(data.server.cpuCount) + consumer.endField("cpu_count", 7) + + consumer.startField("cpu_limit", 8) + consumer.addDouble(data.cpuLimit) + consumer.endField("cpu_limit", 8) + + consumer.startField("cpu_time_active", 9) + consumer.addLong(data.cpuActiveTime) + consumer.endField("cpu_time_active", 9) + + consumer.startField("cpu_time_idle", 10) + consumer.addLong(data.cpuIdleTime) + consumer.endField("cpu_time_idle", 10) + + consumer.startField("cpu_time_steal", 11) + consumer.addLong(data.cpuStealTime) + consumer.endField("cpu_time_steal", 11) + + consumer.startField("cpu_time_lost", 12) + consumer.addLong(data.cpuLostTime) + consumer.endField("cpu_time_lost", 12) + + consumer.startField("mem_limit", 13) + consumer.addLong(data.server.memCapacity) + consumer.endField("mem_limit", 13) + + consumer.endMessage() + } + } private companion object { - private val SCHEMA: Schema = SchemaBuilder - .record("server") - .namespace("org.opendc.telemetry.compute") - .fields() - .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() - .name("server_id").type(UUID_SCHEMA).noDefault() - .name("host_id").type(UUID_SCHEMA.optional()).noDefault() - .requiredLong("uptime") - .requiredLong("downtime") - .name("provision_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() - .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() - .requiredInt("cpu_count") - .requiredDouble("cpu_limit") - .requiredLong("cpu_time_active") - .requiredLong("cpu_time_idle") - .requiredLong("cpu_time_steal") - .requiredLong("cpu_time_lost") - .requiredLong("mem_limit") - .endRecord() + /** + * The schema of the server data. + */ + val SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .length(16) + .`as`(LogicalTypeAnnotation.uuidType()) + .named("server_id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .length(16) + .`as`(LogicalTypeAnnotation.uuidType()) + .named("host_id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("uptime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("downtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("provision_time"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("boot_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_limit"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_active"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_idle"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_steal"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_lost"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_limit") + ) + .named("server") } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt index 2db30bc4..5ad3b95e 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt @@ -22,45 +22,108 @@ package org.opendc.compute.workload.export.parquet -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericRecordBuilder +import io.opentelemetry.context.ContextKey.named +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.* import org.opendc.telemetry.compute.table.ServiceTableReader -import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA import java.io.File /** * A Parquet event writer for [ServiceTableReader]s. */ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<ServiceTableReader>(path, SCHEMA, bufferSize) { - - override fun convert(builder: GenericRecordBuilder, data: ServiceTableReader) { - builder["timestamp"] = data.timestamp.toEpochMilli() - builder["hosts_up"] = data.hostsUp - builder["hosts_down"] = data.hostsDown - builder["servers_pending"] = data.serversPending - builder["servers_active"] = data.serversActive - builder["attempts_success"] = data.attemptsSuccess - builder["attempts_failure"] = data.attemptsFailure - builder["attempts_error"] = data.attemptsError - } + ParquetDataWriter<ServiceTableReader>(path, ServiceDataWriteSupport(), bufferSize) { override fun toString(): String = "service-writer" + /** + * A [WriteSupport] implementation for a [ServiceTableReader]. + */ + private class ServiceDataWriteSupport : WriteSupport<ServiceTableReader>() { + lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(SCHEMA, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: ServiceTableReader) { + write(recordConsumer, record) + } + + private fun write(consumer: RecordConsumer, data: ServiceTableReader) { + consumer.startMessage() + + consumer.startField("timestamp", 0) + consumer.addLong(data.timestamp.toEpochMilli()) + consumer.endField("timestamp", 0) + + consumer.startField("hosts_up", 1) + consumer.addInteger(data.hostsUp) + consumer.endField("hosts_up", 1) + + consumer.startField("hosts_down", 2) + consumer.addInteger(data.hostsDown) + consumer.endField("hosts_down", 2) + + consumer.startField("servers_pending", 3) + consumer.addInteger(data.serversPending) + consumer.endField("servers_pending", 3) + + consumer.startField("servers_active", 4) + consumer.addInteger(data.serversActive) + consumer.endField("servers_active", 4) + + consumer.startField("attempts_success", 5) + consumer.addInteger(data.attemptsSuccess) + consumer.endField("attempts_pending", 5) + + consumer.startField("attempts_failure", 6) + consumer.addInteger(data.attemptsFailure) + consumer.endField("attempts_failure", 6) + + consumer.startField("attempts_error", 7) + consumer.addInteger(data.attemptsError) + consumer.endField("attempts_error", 7) + + consumer.endMessage() + } + } + private companion object { - private val SCHEMA: Schema = SchemaBuilder - .record("service") - .namespace("org.opendc.telemetry.compute") - .fields() - .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() - .requiredInt("hosts_up") - .requiredInt("hosts_down") - .requiredInt("servers_pending") - .requiredInt("servers_active") - .requiredInt("attempts_success") - .requiredInt("attempts_failure") - .requiredInt("attempts_error") - .endRecord() + private val SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("hosts_up"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("hosts_down"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("servers_pending"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("servers_active"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("attempts_success"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("attempts_failure"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("attempts_error"), + ) + .named("service") } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt new file mode 100644 index 00000000..9921f5b8 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.apache.parquet.io.api.Binary +import java.nio.ByteBuffer +import java.util.UUID + +/** + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +internal fun UUID.toBinary(): Binary { + val bb = ByteBuffer.wrap(ByteArray(16)) + bb.putLong(mostSignificantBits) + bb.putLong(leastSignificantBits) + return Binary.fromConstantByteBuffer(bb) +} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt new file mode 100644 index 00000000..dae03513 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.opendc.telemetry.compute.table.HostInfo +import org.opendc.telemetry.compute.table.HostTableReader +import java.nio.file.Files +import java.time.Instant + +/** + * Test suite for [ParquetHostDataWriter] + */ +class HostDataWriterTest { + /** + * The path to write the data file to. + */ + private val path = Files.createTempFile("opendc", "parquet") + + /** + * The writer used to write the data. + */ + private val writer = ParquetHostDataWriter(path.toFile(), bufferSize = 4096) + + @AfterEach + fun tearDown() { + writer.close() + Files.deleteIfExists(path) + } + + @Test + fun testSmoke() { + assertDoesNotThrow { + writer.write(object : HostTableReader { + override val timestamp: Instant = Instant.now() + override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096) + override val guestsTerminated: Int = 0 + override val guestsRunning: Int = 0 + override val guestsError: Int = 0 + override val guestsInvalid: Int = 0 + override val cpuLimit: Double = 4096.0 + override val cpuUsage: Double = 1.0 + override val cpuDemand: Double = 1.0 + override val cpuUtilization: Double = 0.0 + override val cpuActiveTime: Long = 1 + override val cpuIdleTime: Long = 1 + override val cpuStealTime: Long = 1 + override val cpuLostTime: Long = 1 + override val powerUsage: Double = 1.0 + override val powerTotal: Double = 1.0 + override val uptime: Long = 1 + override val downtime: Long = 1 + override val bootTime: Instant? = null + }) + } + } +} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt new file mode 100644 index 00000000..280f5ef8 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.opendc.telemetry.compute.table.HostInfo +import org.opendc.telemetry.compute.table.ServerInfo +import org.opendc.telemetry.compute.table.ServerTableReader +import java.nio.file.Files +import java.time.Instant + +/** + * Test suite for [ParquetServerDataWriter] + */ +class ServerDataWriterTest { + /** + * The path to write the data file to. + */ + private val path = Files.createTempFile("opendc", "parquet") + + /** + * The writer used to write the data. + */ + private val writer = ParquetServerDataWriter(path.toFile(), bufferSize = 4096) + + @AfterEach + fun tearDown() { + writer.close() + Files.deleteIfExists(path) + } + + @Test + fun testSmoke() { + assertDoesNotThrow { + writer.write(object : ServerTableReader { + override val timestamp: Instant = Instant.now() + override val server: ServerInfo = ServerInfo("id", "test", "vm", "x86", "test", "test", 2, 4096) + override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096) + override val cpuLimit: Double = 4096.0 + override val cpuActiveTime: Long = 1 + override val cpuIdleTime: Long = 1 + override val cpuStealTime: Long = 1 + override val cpuLostTime: Long = 1 + override val uptime: Long = 1 + override val downtime: Long = 1 + override val provisionTime: Instant = timestamp + override val bootTime: Instant? = null + }) + } + } +} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt new file mode 100644 index 00000000..7ffa7186 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.opendc.telemetry.compute.table.ServiceTableReader +import java.nio.file.Files +import java.time.Instant + +/** + * Test suite for [ParquetServiceDataWriter] + */ +class ServiceDataWriterTest { + /** + * The path to write the data file to. + */ + private val path = Files.createTempFile("opendc", "parquet") + + /** + * The writer used to write the data. + */ + private val writer = ParquetServiceDataWriter(path.toFile(), bufferSize = 4096) + + @AfterEach + fun tearDown() { + writer.close() + Files.deleteIfExists(path) + } + + @Test + fun testSmoke() { + assertDoesNotThrow { + writer.write(object : ServiceTableReader { + override val timestamp: Instant = Instant.now() + override val hostsUp: Int = 1 + override val hostsDown: Int = 0 + override val serversPending: Int = 1 + override val serversActive: Int = 1 + override val attemptsSuccess: Int = 1 + override val attemptsFailure: Int = 0 + override val attemptsError: Int = 0 + }) + } + } +} |
