diff options
5 files changed, 225 insertions, 107 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt index c5cb80e2..5684bde9 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt @@ -25,11 +25,13 @@ package org.opendc.experiments.capelin.export.parquet import mu.KotlinLogging import org.apache.avro.Schema import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.example.Paper.schema import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.util.parquet.LocalOutputFile -import java.io.Closeable import java.io.File import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.BlockingQueue @@ -38,50 +40,94 @@ import kotlin.concurrent.thread /** * A writer that writes data in Parquet format. */ -public open class ParquetDataWriter<in T>( - private val path: File, +abstract class ParquetDataWriter<in T>( + path: File, private val schema: Schema, - private val converter: (T, GenericData.Record) -> Unit, - private val bufferSize: Int = 4096 -) : Runnable, Closeable { + bufferSize: Int = 4096 +) : AutoCloseable { /** * The logging instance to use. */ private val logger = KotlinLogging.logger {} /** - * The writer to write the Parquet file. + * The queue of commands to process. */ - private val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .build() + private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize) /** - * The queue of commands to process. + * An exception to be propagated to the actual writer. */ - private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize) + private var exception: Throwable? = null /** * The thread that is responsible for writing the Parquet records. */ - private val writerThread = thread(start = false, name = this.toString()) { run() } + private val writerThread = thread(start = false, name = this.toString()) { + val writer = let { + val builder = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path)) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + buildWriter(builder) + } + + val queue = queue + val buf = mutableListOf<T>() + var shouldStop = false + + try { + while (!shouldStop) { + try { + process(writer, queue.take()) + } catch (e: InterruptedException) { + shouldStop = true + } + + if (queue.drainTo(buf) > 0) { + for (data in buf) { + process(writer, data) + } + buf.clear() + } + } + } catch (e: Throwable) { + logger.error(e) { "Failure in Parquet data writer" } + exception = e + } finally { + writer.close() + } + } + + /** + * Build the [ParquetWriter] used to write the Parquet files. + */ + protected open fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + return builder.build() + } + + /** + * Convert the specified [data] into a Parquet record. + */ + protected abstract fun convert(builder: GenericRecordBuilder, data: T) /** * Write the specified metrics to the database. */ - public fun write(event: T) { - queue.put(Action.Write(event)) + fun write(data: T) { + val exception = exception + if (exception != null) { + throw IllegalStateException("Writer thread failed", exception) + } + + queue.put(data) } /** * Signal the writer to stop. */ - public override fun close() { - queue.put(Action.Stop) + override fun close() { + writerThread.interrupt() writerThread.join() } @@ -90,38 +136,11 @@ public open class ParquetDataWriter<in T>( } /** - * Start the writer thread. + * Process the specified [data] to be written to the Parquet file. */ - override fun run() { - try { - loop@ while (true) { - val action = queue.take() - when (action) { - is Action.Stop -> break@loop - is Action.Write<*> -> { - val record = GenericData.Record(schema) - @Suppress("UNCHECKED_CAST") - converter(action.data as T, record) - writer.write(record) - } - } - } - } catch (e: Throwable) { - logger.error("Writer failed", e) - } finally { - writer.close() - } - } - - public sealed class Action { - /** - * A poison pill that will stop the writer thread. - */ - public object Stop : Action() - - /** - * Write the specified metrics to the database. - */ - public data class Write<out T>(val data: T) : Action() + private fun process(writer: ParquetWriter<GenericData.Record>, data: T) { + val builder = GenericRecordBuilder(schema) + convert(builder, data) + writer.write(builder.build()) } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt index 79b84e9d..b057e932 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt @@ -24,22 +24,33 @@ package org.opendc.experiments.capelin.export.parquet import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.ServerData import org.opendc.telemetry.compute.table.ServiceData import java.io.File /** * A [ComputeMonitor] that logs the events to a Parquet file. */ -public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { +class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable { + private val serverWriter = ParquetServerDataWriter( + File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() }, + bufferSize + ) + private val hostWriter = ParquetHostDataWriter( File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() }, bufferSize ) + private val serviceWriter = ParquetServiceDataWriter( File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() }, bufferSize ) + override fun record(data: ServerData) { + serverWriter.write(data) + } + override fun record(data: HostData) { hostWriter.write(data) } @@ -51,5 +62,6 @@ public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int override fun close() { hostWriter.close() serviceWriter.close() + serverWriter.close() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt index 8912c12e..7062a275 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt @@ -25,6 +25,10 @@ package org.opendc.experiments.capelin.export.parquet import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.compute.service.driver.HostState import org.opendc.telemetry.compute.table.HostData import java.io.File @@ -32,42 +36,52 @@ import java.io.File * A Parquet event writer for [HostData]s. */ public class ParquetHostDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<HostData>(path, schema, convert, bufferSize) { + ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) { - override fun toString(): String = "host-writer" + override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + return builder + .withDictionaryEncoding("host_id", true) + .build() + } - public companion object { - private val convert: (HostData, GenericData.Record) -> Unit = { data, record -> - record.put("host_id", data.host.name) - record.put("state", data.host.state.name) - record.put("timestamp", data.timestamp) - record.put("total_work", data.totalWork) - record.put("granted_work", data.grantedWork) - record.put("overcommitted_work", data.overcommittedWork) - record.put("interfered_work", data.interferedWork) - record.put("cpu_usage", data.cpuUsage) - record.put("cpu_demand", data.cpuDemand) - record.put("power_draw", data.powerDraw) - record.put("instance_count", data.instanceCount) - record.put("cores", data.host.model.cpuCount) - } + override fun convert(builder: GenericRecordBuilder, data: HostData) { + builder["timestamp"] = data.timestamp + builder["host_id"] = data.host.name + builder["powered_on"] = data.host.state == HostState.UP + builder["uptime"] = data.uptime + builder["downtime"] = data.downtime + builder["total_work"] = data.totalWork + builder["granted_work"] = data.grantedWork + builder["overcommitted_work"] = data.overcommittedWork + builder["interfered_work"] = data.interferedWork + builder["cpu_usage"] = data.cpuUsage + builder["cpu_demand"] = data.cpuDemand + builder["power_draw"] = data.powerDraw + builder["num_instances"] = data.instanceCount + builder["num_cpus"] = data.host.model.cpuCount + } + + override fun toString(): String = "host-writer" - private val schema: Schema = SchemaBuilder + companion object { + private val SCHEMA: Schema = SchemaBuilder .record("host") .namespace("org.opendc.telemetry.compute") .fields() - .name("timestamp").type().longType().noDefault() - .name("host_id").type().stringType().noDefault() - .name("state").type().stringType().noDefault() - .name("requested_work").type().longType().noDefault() - .name("granted_work").type().longType().noDefault() - .name("overcommitted_work").type().longType().noDefault() - .name("interfered_work").type().longType().noDefault() - .name("cpu_usage").type().doubleType().noDefault() - .name("cpu_demand").type().doubleType().noDefault() - .name("power_draw").type().doubleType().noDefault() - .name("instance_count").type().intType().noDefault() - .name("cores").type().intType().noDefault() + .requiredLong("timestamp") + .requiredString("host_id") + .requiredBoolean("powered_on") + .requiredLong("uptime") + .requiredLong("downtime") + .requiredDouble("total_work") + .requiredDouble("granted_work") + .requiredDouble("overcommitted_work") + .requiredDouble("interfered_work") + .requiredDouble("cpu_usage") + .requiredDouble("cpu_demand") + .requiredDouble("power_draw") + .requiredInt("num_instances") + .requiredInt("num_cpus") .endRecord() } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt new file mode 100644 index 00000000..9904adde --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.export.parquet + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.opendc.telemetry.compute.table.ServerData +import java.io.File + +/** + * A Parquet event writer for [ServerData]s. + */ +public class ParquetServerDataWriter(path: File, bufferSize: Int) : + ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) { + + override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + return builder + .withDictionaryEncoding("server_id", true) + .withDictionaryEncoding("state", true) + .build() + } + + override fun convert(builder: GenericRecordBuilder, data: ServerData) { + builder["timestamp"] = data.timestamp + builder["server_id"] = data.server.uid.toString() + builder["state"] = data.server.state + builder["uptime"] = data.uptime + builder["downtime"] = data.downtime + builder["num_vcpus"] = data.server.flavor.cpuCount + builder["mem_capacity"] = data.server.flavor.memorySize + } + + override fun toString(): String = "server-writer" + + companion object { + private val SCHEMA: Schema = SchemaBuilder + .record("server") + .namespace("org.opendc.telemetry.compute") + .fields() + .requiredLong("timestamp") + .requiredString("server_id") + .requiredString("state") + .requiredLong("uptime") + .requiredLong("downtime") + .requiredInt("num_vcpus") + .requiredLong("mem_capacity") + .endRecord() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt index 36d630f3..e1428fe7 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt @@ -24,7 +24,7 @@ package org.opendc.experiments.capelin.export.parquet import org.apache.avro.Schema import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder import org.opendc.telemetry.compute.table.ServiceData import java.io.File @@ -32,34 +32,34 @@ import java.io.File * A Parquet event writer for [ServiceData]s. */ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<ServiceData>(path, schema, convert, bufferSize) { + ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) { - override fun toString(): String = "service-writer" + override fun convert(builder: GenericRecordBuilder, data: ServiceData) { + builder["timestamp"] = data.timestamp + builder["host_total_count"] = data.hostCount + builder["host_available_count"] = data.activeHostCount + builder["instance_total_count"] = data.instanceCount + builder["instance_active_count"] = data.runningInstanceCount + builder["instance_inactive_count"] = data.finishedInstanceCount + builder["instance_waiting_count"] = data.queuedInstanceCount + builder["instance_failed_count"] = data.failedInstanceCount + } - public companion object { - private val convert: (ServiceData, GenericData.Record) -> Unit = { data, record -> - record.put("timestamp", data.timestamp) - record.put("host_total_count", data.hostCount) - record.put("host_available_count", data.activeHostCount) - record.put("instance_total_count", data.instanceCount) - record.put("instance_active_count", data.runningInstanceCount) - record.put("instance_inactive_count", data.finishedInstanceCount) - record.put("instance_waiting_count", data.queuedInstanceCount) - record.put("instance_failed_count", data.failedInstanceCount) - } + override fun toString(): String = "service-writer" - private val schema: Schema = SchemaBuilder + companion object { + private val SCHEMA: Schema = SchemaBuilder .record("service") .namespace("org.opendc.telemetry.compute") .fields() - .name("timestamp").type().longType().noDefault() - .name("host_total_count").type().intType().noDefault() - .name("host_available_count").type().intType().noDefault() - .name("instance_total_count").type().intType().noDefault() - .name("instance_active_count").type().intType().noDefault() - .name("instance_inactive_count").type().intType().noDefault() - .name("instance_waiting_count").type().intType().noDefault() - .name("instance_failed_count").type().intType().noDefault() + .requiredLong("timestamp") + .requiredInt("host_total_count") + .requiredInt("host_available_count") + .requiredInt("instance_total_count") + .requiredInt("instance_active_count") + .requiredInt("instance_inactive_count") + .requiredInt("instance_waiting_count") + .requiredInt("instance_failed_count") .endRecord() } } |
