summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-workload/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-01 23:33:39 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-02 15:37:03 +0200
commita1374a63f81fafc5da565072bae2ecae2e0fed28 (patch)
treebb43b4112dda862f72ba8afb49cec3336d99e628 /opendc-compute/opendc-compute-workload/src
parentea5e79fc77072e6151ee7952581b97e35a2027fb (diff)
refactor(compute): Do not use Avro when exporting experiment data
This change updates the `ParquetDataWriter` class to not use the `parquet-avro` library for exporting experiment data, but instead to use the low-level APIs to directly write the data in Parquet format.
Diffstat (limited to 'opendc-compute/opendc-compute-workload/src')
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt32
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt214
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt196
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt121
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt38
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt79
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt73
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt67
8 files changed, 673 insertions, 147 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
index 84387bbc..c854d874 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
@@ -23,14 +23,12 @@
package org.opendc.compute.workload.export.parquet
import mu.KotlinLogging
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.trace.util.parquet.LocalOutputFile
+import org.opendc.trace.util.parquet.LocalParquetWriter
import java.io.File
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.BlockingQueue
@@ -38,10 +36,13 @@ import kotlin.concurrent.thread
/**
* A writer that writes data in Parquet format.
+ *
+ * @param path The path to the file to write the data to.
+ * @param writeSupport The [WriteSupport] implementation for converting the records to Parquet format.
*/
public abstract class ParquetDataWriter<in T>(
path: File,
- private val schema: Schema,
+ private val writeSupport: WriteSupport<T>,
bufferSize: Int = 4096
) : AutoCloseable {
/**
@@ -52,7 +53,7 @@ public abstract class ParquetDataWriter<in T>(
/**
* The queue of records to process.
*/
- private val queue: BlockingQueue<GenericData.Record> = ArrayBlockingQueue(bufferSize)
+ private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize)
/**
* An exception to be propagated to the actual writer.
@@ -64,15 +65,15 @@ public abstract class ParquetDataWriter<in T>(
*/
private val writerThread = thread(start = false, name = this.toString()) {
val writer = let {
- val builder = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path))
- .withSchema(schema)
+ val builder = LocalParquetWriter.builder(path.toPath(), writeSupport)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.withCompressionCodec(CompressionCodecName.ZSTD)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
buildWriter(builder)
}
val queue = queue
- val buf = mutableListOf<GenericData.Record>()
+ val buf = mutableListOf<T>()
var shouldStop = false
try {
@@ -101,16 +102,11 @@ public abstract class ParquetDataWriter<in T>(
/**
* Build the [ParquetWriter] used to write the Parquet files.
*/
- protected open fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ protected open fun buildWriter(builder: LocalParquetWriter.Builder<@UnsafeVariance T>): ParquetWriter<@UnsafeVariance T> {
return builder.build()
}
/**
- * Convert the specified [data] into a Parquet record.
- */
- protected abstract fun convert(builder: GenericRecordBuilder, data: T)
-
- /**
* Write the specified metrics to the database.
*/
public fun write(data: T) {
@@ -119,9 +115,7 @@ public abstract class ParquetDataWriter<in T>(
throw IllegalStateException("Writer thread failed", exception)
}
- val builder = GenericRecordBuilder(schema)
- convert(builder, data)
- queue.put(builder.build())
+ queue.put(data)
}
/**
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
index 72dbba90..0d5b6b34 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
@@ -22,81 +22,189 @@
package org.opendc.compute.workload.export.parquet
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
import org.opendc.telemetry.compute.table.HostTableReader
-import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA
-import org.opendc.trace.util.avro.UUID_SCHEMA
-import org.opendc.trace.util.avro.optional
+import org.opendc.trace.util.parquet.LocalParquetWriter
import java.io.File
+import java.util.*
/**
* A Parquet event writer for [HostTableReader]s.
*/
public class ParquetHostDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<HostTableReader>(path, SCHEMA, bufferSize) {
+ ParquetDataWriter<HostTableReader>(path, HostDataWriteSupport(), bufferSize) {
- override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ override fun buildWriter(builder: LocalParquetWriter.Builder<HostTableReader>): ParquetWriter<HostTableReader> {
return builder
.withDictionaryEncoding("host_id", true)
.build()
}
- override fun convert(builder: GenericRecordBuilder, data: HostTableReader) {
- builder["timestamp"] = data.timestamp.toEpochMilli()
+ override fun toString(): String = "host-writer"
- builder["host_id"] = data.host.id
+ /**
+ * A [WriteSupport] implementation for a [HostTableReader].
+ */
+ private class HostDataWriteSupport : WriteSupport<HostTableReader>() {
+ lateinit var recordConsumer: RecordConsumer
- builder["uptime"] = data.uptime
- builder["downtime"] = data.downtime
- val bootTime = data.bootTime
- builder["boot_time"] = bootTime?.toEpochMilli()
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(SCHEMA, emptyMap())
+ }
- builder["cpu_count"] = data.host.cpuCount
- builder["cpu_limit"] = data.cpuLimit
- builder["cpu_time_active"] = data.cpuActiveTime
- builder["cpu_time_idle"] = data.cpuIdleTime
- builder["cpu_time_steal"] = data.cpuStealTime
- builder["cpu_time_lost"] = data.cpuLostTime
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
- builder["mem_limit"] = data.host.memCapacity
+ override fun write(record: HostTableReader) {
+ write(recordConsumer, record)
+ }
- builder["power_total"] = data.powerTotal
+ private fun write(consumer: RecordConsumer, data: HostTableReader) {
+ consumer.startMessage()
- builder["guests_terminated"] = data.guestsTerminated
- builder["guests_running"] = data.guestsRunning
- builder["guests_error"] = data.guestsError
- builder["guests_invalid"] = data.guestsInvalid
- }
+ consumer.startField("timestamp", 0)
+ consumer.addLong(data.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 0)
- override fun toString(): String = "host-writer"
+ consumer.startField("host_id", 1)
+ consumer.addBinary(UUID.fromString(data.host.id).toBinary())
+ consumer.endField("host_id", 1)
+
+ consumer.startField("uptime", 2)
+ consumer.addLong(data.uptime)
+ consumer.endField("uptime", 2)
+
+ consumer.startField("downtime", 3)
+ consumer.addLong(data.downtime)
+ consumer.endField("downtime", 3)
+
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ consumer.startField("boot_time", 4)
+ consumer.addLong(bootTime.toEpochMilli())
+ consumer.endField("boot_time", 4)
+ }
+
+ consumer.startField("cpu_count", 5)
+ consumer.addInteger(data.host.cpuCount)
+ consumer.endField("cpu_count", 5)
+
+ consumer.startField("cpu_limit", 6)
+ consumer.addDouble(data.cpuLimit)
+ consumer.endField("cpu_limit", 6)
+
+ consumer.startField("cpu_time_active", 7)
+ consumer.addLong(data.cpuActiveTime)
+ consumer.endField("cpu_time_active", 7)
+
+ consumer.startField("cpu_time_idle", 8)
+ consumer.addLong(data.cpuIdleTime)
+ consumer.endField("cpu_time_idle", 8)
+
+ consumer.startField("cpu_time_steal", 9)
+ consumer.addLong(data.cpuStealTime)
+ consumer.endField("cpu_time_steal", 9)
+
+ consumer.startField("cpu_time_lost", 10)
+ consumer.addLong(data.cpuLostTime)
+ consumer.endField("cpu_time_lost", 10)
+
+ consumer.startField("mem_limit", 11)
+ consumer.addLong(data.host.memCapacity)
+ consumer.endField("mem_limit", 11)
+
+ consumer.startField("power_total", 12)
+ consumer.addDouble(data.powerTotal)
+ consumer.endField("power_total", 12)
+
+ consumer.startField("guests_terminated", 13)
+ consumer.addInteger(data.guestsTerminated)
+ consumer.endField("guests_terminated", 13)
+
+ consumer.startField("guests_running", 14)
+ consumer.addInteger(data.guestsRunning)
+ consumer.endField("guests_running", 14)
+
+ consumer.startField("guests_error", 15)
+ consumer.addInteger(data.guestsError)
+ consumer.endField("guests_error", 15)
+
+ consumer.startField("guests_invalid", 16)
+ consumer.addInteger(data.guestsInvalid)
+ consumer.endField("guests_invalid", 16)
+
+ consumer.endMessage()
+ }
+ }
private companion object {
- private val SCHEMA: Schema = SchemaBuilder
- .record("host")
- .namespace("org.opendc.telemetry.compute")
- .fields()
- .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
- .name("host_id").type(UUID_SCHEMA).noDefault()
- .requiredLong("uptime")
- .requiredLong("downtime")
- .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
- .requiredInt("cpu_count")
- .requiredDouble("cpu_limit")
- .requiredLong("cpu_time_active")
- .requiredLong("cpu_time_idle")
- .requiredLong("cpu_time_steal")
- .requiredLong("cpu_time_lost")
- .requiredLong("mem_limit")
- .requiredDouble("power_total")
- .requiredInt("guests_terminated")
- .requiredInt("guests_running")
- .requiredInt("guests_error")
- .requiredInt("guests_invalid")
- .endRecord()
+ /**
+ * The schema of the host data.
+ */
+ val SCHEMA: MessageType = Types
+ .buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+ .length(16)
+ .`as`(LogicalTypeAnnotation.uuidType())
+ .named("host_id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("uptime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("downtime"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("boot_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_limit"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_active"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_idle"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_steal"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_lost"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_limit"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("power_total"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_terminated"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_running"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_error"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_invalid"),
+ )
+ .named("host")
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
index aac6115f..5d11629b 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
@@ -22,73 +22,177 @@
package org.opendc.compute.workload.export.parquet
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
import org.opendc.telemetry.compute.table.ServerTableReader
-import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA
-import org.opendc.trace.util.avro.UUID_SCHEMA
-import org.opendc.trace.util.avro.optional
+import org.opendc.trace.util.parquet.LocalParquetWriter
import java.io.File
+import java.util.*
/**
* A Parquet event writer for [ServerTableReader]s.
*/
public class ParquetServerDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServerTableReader>(path, SCHEMA, bufferSize) {
+ ParquetDataWriter<ServerTableReader>(path, ServerDataWriteSupport(), bufferSize) {
- override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ override fun buildWriter(builder: LocalParquetWriter.Builder<ServerTableReader>): ParquetWriter<ServerTableReader> {
return builder
.withDictionaryEncoding("server_id", true)
.withDictionaryEncoding("host_id", true)
.build()
}
- override fun convert(builder: GenericRecordBuilder, data: ServerTableReader) {
- builder["timestamp"] = data.timestamp.toEpochMilli()
+ override fun toString(): String = "server-writer"
- builder["server_id"] = data.server.id
- builder["host_id"] = data.host?.id
+ /**
+ * A [WriteSupport] implementation for a [ServerTableReader].
+ */
+ private class ServerDataWriteSupport : WriteSupport<ServerTableReader>() {
+ lateinit var recordConsumer: RecordConsumer
- builder["uptime"] = data.uptime
- builder["downtime"] = data.downtime
- builder["boot_time"] = data.bootTime?.toEpochMilli()
- builder["provision_time"] = data.provisionTime?.toEpochMilli()
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(SCHEMA, emptyMap())
+ }
- builder["cpu_count"] = data.server.cpuCount
- builder["cpu_limit"] = data.cpuLimit
- builder["cpu_time_active"] = data.cpuActiveTime
- builder["cpu_time_idle"] = data.cpuIdleTime
- builder["cpu_time_steal"] = data.cpuStealTime
- builder["cpu_time_lost"] = data.cpuLostTime
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
- builder["mem_limit"] = data.server.memCapacity
- }
+ override fun write(record: ServerTableReader) {
+ write(recordConsumer, record)
+ }
- override fun toString(): String = "server-writer"
+ private fun write(consumer: RecordConsumer, data: ServerTableReader) {
+ consumer.startMessage()
+
+ consumer.startField("timestamp", 0)
+ consumer.addLong(data.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 0)
+
+ consumer.startField("server_id", 1)
+ consumer.addBinary(UUID.fromString(data.server.id).toBinary())
+ consumer.endField("server_id", 1)
+
+ val hostId = data.host?.id
+ if (hostId != null) {
+ consumer.startField("host_id", 2)
+ consumer.addBinary(UUID.fromString(hostId).toBinary())
+ consumer.endField("host_id", 2)
+ }
+
+ consumer.startField("uptime", 3)
+ consumer.addLong(data.uptime)
+ consumer.endField("uptime", 3)
+
+ consumer.startField("downtime", 4)
+ consumer.addLong(data.downtime)
+ consumer.endField("downtime", 4)
+
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ consumer.startField("boot_time", 5)
+ consumer.addLong(bootTime.toEpochMilli())
+ consumer.endField("boot_time", 5)
+ }
+
+ val provisionTime = data.provisionTime
+ if (provisionTime != null) {
+ consumer.startField("provision_time", 6)
+ consumer.addLong(provisionTime.toEpochMilli())
+ consumer.endField("provision_time", 6)
+ }
+
+ consumer.startField("cpu_count", 7)
+ consumer.addInteger(data.server.cpuCount)
+ consumer.endField("cpu_count", 7)
+
+ consumer.startField("cpu_limit", 8)
+ consumer.addDouble(data.cpuLimit)
+ consumer.endField("cpu_limit", 8)
+
+ consumer.startField("cpu_time_active", 9)
+ consumer.addLong(data.cpuActiveTime)
+ consumer.endField("cpu_time_active", 9)
+
+ consumer.startField("cpu_time_idle", 10)
+ consumer.addLong(data.cpuIdleTime)
+ consumer.endField("cpu_time_idle", 10)
+
+ consumer.startField("cpu_time_steal", 11)
+ consumer.addLong(data.cpuStealTime)
+ consumer.endField("cpu_time_steal", 11)
+
+ consumer.startField("cpu_time_lost", 12)
+ consumer.addLong(data.cpuLostTime)
+ consumer.endField("cpu_time_lost", 12)
+
+ consumer.startField("mem_limit", 13)
+ consumer.addLong(data.server.memCapacity)
+ consumer.endField("mem_limit", 13)
+
+ consumer.endMessage()
+ }
+ }
private companion object {
- private val SCHEMA: Schema = SchemaBuilder
- .record("server")
- .namespace("org.opendc.telemetry.compute")
- .fields()
- .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
- .name("server_id").type(UUID_SCHEMA).noDefault()
- .name("host_id").type(UUID_SCHEMA.optional()).noDefault()
- .requiredLong("uptime")
- .requiredLong("downtime")
- .name("provision_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
- .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
- .requiredInt("cpu_count")
- .requiredDouble("cpu_limit")
- .requiredLong("cpu_time_active")
- .requiredLong("cpu_time_idle")
- .requiredLong("cpu_time_steal")
- .requiredLong("cpu_time_lost")
- .requiredLong("mem_limit")
- .endRecord()
+ /**
+ * The schema of the server data.
+ */
+ val SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+ .length(16)
+ .`as`(LogicalTypeAnnotation.uuidType())
+ .named("server_id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+ .length(16)
+ .`as`(LogicalTypeAnnotation.uuidType())
+ .named("host_id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("uptime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("downtime"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("provision_time"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("boot_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_limit"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_active"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_idle"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_steal"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_lost"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_limit")
+ )
+ .named("server")
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
index 2db30bc4..5ad3b95e 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
@@ -22,45 +22,108 @@
package org.opendc.compute.workload.export.parquet
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericRecordBuilder
+import io.opentelemetry.context.ContextKey.named
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
import org.opendc.telemetry.compute.table.ServiceTableReader
-import org.opendc.trace.util.avro.TIMESTAMP_SCHEMA
import java.io.File
/**
* A Parquet event writer for [ServiceTableReader]s.
*/
public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServiceTableReader>(path, SCHEMA, bufferSize) {
-
- override fun convert(builder: GenericRecordBuilder, data: ServiceTableReader) {
- builder["timestamp"] = data.timestamp.toEpochMilli()
- builder["hosts_up"] = data.hostsUp
- builder["hosts_down"] = data.hostsDown
- builder["servers_pending"] = data.serversPending
- builder["servers_active"] = data.serversActive
- builder["attempts_success"] = data.attemptsSuccess
- builder["attempts_failure"] = data.attemptsFailure
- builder["attempts_error"] = data.attemptsError
- }
+ ParquetDataWriter<ServiceTableReader>(path, ServiceDataWriteSupport(), bufferSize) {
override fun toString(): String = "service-writer"
+ /**
+ * A [WriteSupport] implementation for a [ServiceTableReader].
+ */
+ private class ServiceDataWriteSupport : WriteSupport<ServiceTableReader>() {
+ lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(SCHEMA, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: ServiceTableReader) {
+ write(recordConsumer, record)
+ }
+
+ private fun write(consumer: RecordConsumer, data: ServiceTableReader) {
+ consumer.startMessage()
+
+ consumer.startField("timestamp", 0)
+ consumer.addLong(data.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 0)
+
+ consumer.startField("hosts_up", 1)
+ consumer.addInteger(data.hostsUp)
+ consumer.endField("hosts_up", 1)
+
+ consumer.startField("hosts_down", 2)
+ consumer.addInteger(data.hostsDown)
+ consumer.endField("hosts_down", 2)
+
+ consumer.startField("servers_pending", 3)
+ consumer.addInteger(data.serversPending)
+ consumer.endField("servers_pending", 3)
+
+ consumer.startField("servers_active", 4)
+ consumer.addInteger(data.serversActive)
+ consumer.endField("servers_active", 4)
+
+ consumer.startField("attempts_success", 5)
+ consumer.addInteger(data.attemptsSuccess)
+ consumer.endField("attempts_pending", 5)
+
+ consumer.startField("attempts_failure", 6)
+ consumer.addInteger(data.attemptsFailure)
+ consumer.endField("attempts_failure", 6)
+
+ consumer.startField("attempts_error", 7)
+ consumer.addInteger(data.attemptsError)
+ consumer.endField("attempts_error", 7)
+
+ consumer.endMessage()
+ }
+ }
+
private companion object {
- private val SCHEMA: Schema = SchemaBuilder
- .record("service")
- .namespace("org.opendc.telemetry.compute")
- .fields()
- .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
- .requiredInt("hosts_up")
- .requiredInt("hosts_down")
- .requiredInt("servers_pending")
- .requiredInt("servers_active")
- .requiredInt("attempts_success")
- .requiredInt("attempts_failure")
- .requiredInt("attempts_error")
- .endRecord()
+ private val SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("hosts_up"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("hosts_down"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("servers_pending"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("servers_active"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("attempts_success"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("attempts_failure"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("attempts_error"),
+ )
+ .named("service")
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt
new file mode 100644
index 00000000..9921f5b8
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import org.apache.parquet.io.api.Binary
+import java.nio.ByteBuffer
+import java.util.UUID
+
+/**
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+internal fun UUID.toBinary(): Binary {
+ val bb = ByteBuffer.wrap(ByteArray(16))
+ bb.putLong(mostSignificantBits)
+ bb.putLong(leastSignificantBits)
+ return Binary.fromConstantByteBuffer(bb)
+}
diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt
new file mode 100644
index 00000000..dae03513
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.opendc.telemetry.compute.table.HostInfo
+import org.opendc.telemetry.compute.table.HostTableReader
+import java.nio.file.Files
+import java.time.Instant
+
+/**
+ * Test suite for [ParquetHostDataWriter]
+ */
+class HostDataWriterTest {
+ /**
+ * The path to write the data file to.
+ */
+ private val path = Files.createTempFile("opendc", "parquet")
+
+ /**
+ * The writer used to write the data.
+ */
+ private val writer = ParquetHostDataWriter(path.toFile(), bufferSize = 4096)
+
+ @AfterEach
+ fun tearDown() {
+ writer.close()
+ Files.deleteIfExists(path)
+ }
+
+ @Test
+ fun testSmoke() {
+ assertDoesNotThrow {
+ writer.write(object : HostTableReader {
+ override val timestamp: Instant = Instant.now()
+ override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096)
+ override val guestsTerminated: Int = 0
+ override val guestsRunning: Int = 0
+ override val guestsError: Int = 0
+ override val guestsInvalid: Int = 0
+ override val cpuLimit: Double = 4096.0
+ override val cpuUsage: Double = 1.0
+ override val cpuDemand: Double = 1.0
+ override val cpuUtilization: Double = 0.0
+ override val cpuActiveTime: Long = 1
+ override val cpuIdleTime: Long = 1
+ override val cpuStealTime: Long = 1
+ override val cpuLostTime: Long = 1
+ override val powerUsage: Double = 1.0
+ override val powerTotal: Double = 1.0
+ override val uptime: Long = 1
+ override val downtime: Long = 1
+ override val bootTime: Instant? = null
+ })
+ }
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt
new file mode 100644
index 00000000..280f5ef8
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.opendc.telemetry.compute.table.HostInfo
+import org.opendc.telemetry.compute.table.ServerInfo
+import org.opendc.telemetry.compute.table.ServerTableReader
+import java.nio.file.Files
+import java.time.Instant
+
+/**
+ * Test suite for [ParquetServerDataWriter]
+ */
+class ServerDataWriterTest {
+ /**
+ * The path to write the data file to.
+ */
+ private val path = Files.createTempFile("opendc", "parquet")
+
+ /**
+ * The writer used to write the data.
+ */
+ private val writer = ParquetServerDataWriter(path.toFile(), bufferSize = 4096)
+
+ @AfterEach
+ fun tearDown() {
+ writer.close()
+ Files.deleteIfExists(path)
+ }
+
+ @Test
+ fun testSmoke() {
+ assertDoesNotThrow {
+ writer.write(object : ServerTableReader {
+ override val timestamp: Instant = Instant.now()
+ override val server: ServerInfo = ServerInfo("id", "test", "vm", "x86", "test", "test", 2, 4096)
+ override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096)
+ override val cpuLimit: Double = 4096.0
+ override val cpuActiveTime: Long = 1
+ override val cpuIdleTime: Long = 1
+ override val cpuStealTime: Long = 1
+ override val cpuLostTime: Long = 1
+ override val uptime: Long = 1
+ override val downtime: Long = 1
+ override val provisionTime: Instant = timestamp
+ override val bootTime: Instant? = null
+ })
+ }
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt
new file mode 100644
index 00000000..7ffa7186
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.opendc.telemetry.compute.table.ServiceTableReader
+import java.nio.file.Files
+import java.time.Instant
+
+/**
+ * Test suite for [ParquetServiceDataWriter]
+ */
+class ServiceDataWriterTest {
+ /**
+ * The path to write the data file to.
+ */
+ private val path = Files.createTempFile("opendc", "parquet")
+
+ /**
+ * The writer used to write the data.
+ */
+ private val writer = ParquetServiceDataWriter(path.toFile(), bufferSize = 4096)
+
+ @AfterEach
+ fun tearDown() {
+ writer.close()
+ Files.deleteIfExists(path)
+ }
+
+ @Test
+ fun testSmoke() {
+ assertDoesNotThrow {
+ writer.write(object : ServiceTableReader {
+ override val timestamp: Instant = Instant.now()
+ override val hostsUp: Int = 1
+ override val hostsDown: Int = 0
+ override val serversPending: Int = 1
+ override val serversActive: Int = 1
+ override val attemptsSuccess: Int = 1
+ override val attemptsFailure: Int = 0
+ override val attemptsError: Int = 0
+ })
+ }
+ }
+}