summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-13 12:25:51 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-17 16:48:02 +0200
commiteef8ea3ab40a4e4a12ba96f839c35c5804884bc1 (patch)
tree6fa2460ff5593d625a80efe614c02079085eb157 /opendc-experiments/opendc-experiments-capelin
parent7960791430b0536df4704493c01d32e38f37f022 (diff)
refactor(capelin): Improve ParquetDataWriter implementation
This change improves the ParquetDataWriter class to support more complex use-cases. It now allows subclasses to modify the writer options. In addition to this, a subclass for writing server data is added.
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt125
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt14
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt74
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt73
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt46
5 files changed, 225 insertions, 107 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
index c5cb80e2..5684bde9 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetDataWriter.kt
@@ -25,11 +25,13 @@ package org.opendc.experiments.capelin.export.parquet
import mu.KotlinLogging
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.example.Paper.schema
import org.apache.parquet.hadoop.ParquetFileWriter
+import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.opendc.trace.util.parquet.LocalOutputFile
-import java.io.Closeable
import java.io.File
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.BlockingQueue
@@ -38,50 +40,94 @@ import kotlin.concurrent.thread
/**
* A writer that writes data in Parquet format.
*/
-public open class ParquetDataWriter<in T>(
- private val path: File,
+abstract class ParquetDataWriter<in T>(
+ path: File,
private val schema: Schema,
- private val converter: (T, GenericData.Record) -> Unit,
- private val bufferSize: Int = 4096
-) : Runnable, Closeable {
+ bufferSize: Int = 4096
+) : AutoCloseable {
/**
* The logging instance to use.
*/
private val logger = KotlinLogging.logger {}
/**
- * The writer to write the Parquet file.
+ * The queue of commands to process.
*/
- private val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path))
- .withSchema(schema)
- .withCompressionCodec(CompressionCodecName.SNAPPY)
- .withPageSize(4 * 1024 * 1024) // For compression
- .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- .build()
+ private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize)
/**
- * The queue of commands to process.
+ * An exception to be propagated to the actual writer.
*/
- private val queue: BlockingQueue<Action> = ArrayBlockingQueue(bufferSize)
+ private var exception: Throwable? = null
/**
* The thread that is responsible for writing the Parquet records.
*/
- private val writerThread = thread(start = false, name = this.toString()) { run() }
+ private val writerThread = thread(start = false, name = this.toString()) {
+ val writer = let {
+ val builder = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path))
+ .withSchema(schema)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ buildWriter(builder)
+ }
+
+ val queue = queue
+ val buf = mutableListOf<T>()
+ var shouldStop = false
+
+ try {
+ while (!shouldStop) {
+ try {
+ process(writer, queue.take())
+ } catch (e: InterruptedException) {
+ shouldStop = true
+ }
+
+ if (queue.drainTo(buf) > 0) {
+ for (data in buf) {
+ process(writer, data)
+ }
+ buf.clear()
+ }
+ }
+ } catch (e: Throwable) {
+ logger.error(e) { "Failure in Parquet data writer" }
+ exception = e
+ } finally {
+ writer.close()
+ }
+ }
+
+ /**
+ * Build the [ParquetWriter] used to write the Parquet files.
+ */
+ protected open fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ return builder.build()
+ }
+
+ /**
+ * Convert the specified [data] into a Parquet record.
+ */
+ protected abstract fun convert(builder: GenericRecordBuilder, data: T)
/**
* Write the specified metrics to the database.
*/
- public fun write(event: T) {
- queue.put(Action.Write(event))
+ fun write(data: T) {
+ val exception = exception
+ if (exception != null) {
+ throw IllegalStateException("Writer thread failed", exception)
+ }
+
+ queue.put(data)
}
/**
* Signal the writer to stop.
*/
- public override fun close() {
- queue.put(Action.Stop)
+ override fun close() {
+ writerThread.interrupt()
writerThread.join()
}
@@ -90,38 +136,11 @@ public open class ParquetDataWriter<in T>(
}
/**
- * Start the writer thread.
+ * Process the specified [data] to be written to the Parquet file.
*/
- override fun run() {
- try {
- loop@ while (true) {
- val action = queue.take()
- when (action) {
- is Action.Stop -> break@loop
- is Action.Write<*> -> {
- val record = GenericData.Record(schema)
- @Suppress("UNCHECKED_CAST")
- converter(action.data as T, record)
- writer.write(record)
- }
- }
- }
- } catch (e: Throwable) {
- logger.error("Writer failed", e)
- } finally {
- writer.close()
- }
- }
-
- public sealed class Action {
- /**
- * A poison pill that will stop the writer thread.
- */
- public object Stop : Action()
-
- /**
- * Write the specified metrics to the database.
- */
- public data class Write<out T>(val data: T) : Action()
+ private fun process(writer: ParquetWriter<GenericData.Record>, data: T) {
+ val builder = GenericRecordBuilder(schema)
+ convert(builder, data)
+ writer.write(builder.build())
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
index 79b84e9d..b057e932 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetExportMonitor.kt
@@ -24,22 +24,33 @@ package org.opendc.experiments.capelin.export.parquet
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.ServerData
import org.opendc.telemetry.compute.table.ServiceData
import java.io.File
/**
* A [ComputeMonitor] that logs the events to a Parquet file.
*/
-public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
+class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
+ private val serverWriter = ParquetServerDataWriter(
+ File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() },
+ bufferSize
+ )
+
private val hostWriter = ParquetHostDataWriter(
File(base, "host/$partition/data.parquet").also { it.parentFile.mkdirs() },
bufferSize
)
+
private val serviceWriter = ParquetServiceDataWriter(
File(base, "service/$partition/data.parquet").also { it.parentFile.mkdirs() },
bufferSize
)
+ override fun record(data: ServerData) {
+ serverWriter.write(data)
+ }
+
override fun record(data: HostData) {
hostWriter.write(data)
}
@@ -51,5 +62,6 @@ public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int
override fun close() {
hostWriter.close()
serviceWriter.close()
+ serverWriter.close()
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
index 8912c12e..7062a275 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetHostDataWriter.kt
@@ -25,6 +25,10 @@ package org.opendc.experiments.capelin.export.parquet
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetWriter
+import org.opendc.compute.service.driver.HostState
import org.opendc.telemetry.compute.table.HostData
import java.io.File
@@ -32,42 +36,52 @@ import java.io.File
* A Parquet event writer for [HostData]s.
*/
public class ParquetHostDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<HostData>(path, schema, convert, bufferSize) {
+ ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) {
- override fun toString(): String = "host-writer"
+ override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ return builder
+ .withDictionaryEncoding("host_id", true)
+ .build()
+ }
- public companion object {
- private val convert: (HostData, GenericData.Record) -> Unit = { data, record ->
- record.put("host_id", data.host.name)
- record.put("state", data.host.state.name)
- record.put("timestamp", data.timestamp)
- record.put("total_work", data.totalWork)
- record.put("granted_work", data.grantedWork)
- record.put("overcommitted_work", data.overcommittedWork)
- record.put("interfered_work", data.interferedWork)
- record.put("cpu_usage", data.cpuUsage)
- record.put("cpu_demand", data.cpuDemand)
- record.put("power_draw", data.powerDraw)
- record.put("instance_count", data.instanceCount)
- record.put("cores", data.host.model.cpuCount)
- }
+ override fun convert(builder: GenericRecordBuilder, data: HostData) {
+ builder["timestamp"] = data.timestamp
+ builder["host_id"] = data.host.name
+ builder["powered_on"] = data.host.state == HostState.UP
+ builder["uptime"] = data.uptime
+ builder["downtime"] = data.downtime
+ builder["total_work"] = data.totalWork
+ builder["granted_work"] = data.grantedWork
+ builder["overcommitted_work"] = data.overcommittedWork
+ builder["interfered_work"] = data.interferedWork
+ builder["cpu_usage"] = data.cpuUsage
+ builder["cpu_demand"] = data.cpuDemand
+ builder["power_draw"] = data.powerDraw
+ builder["num_instances"] = data.instanceCount
+ builder["num_cpus"] = data.host.model.cpuCount
+ }
+
+ override fun toString(): String = "host-writer"
- private val schema: Schema = SchemaBuilder
+ companion object {
+ private val SCHEMA: Schema = SchemaBuilder
.record("host")
.namespace("org.opendc.telemetry.compute")
.fields()
- .name("timestamp").type().longType().noDefault()
- .name("host_id").type().stringType().noDefault()
- .name("state").type().stringType().noDefault()
- .name("requested_work").type().longType().noDefault()
- .name("granted_work").type().longType().noDefault()
- .name("overcommitted_work").type().longType().noDefault()
- .name("interfered_work").type().longType().noDefault()
- .name("cpu_usage").type().doubleType().noDefault()
- .name("cpu_demand").type().doubleType().noDefault()
- .name("power_draw").type().doubleType().noDefault()
- .name("instance_count").type().intType().noDefault()
- .name("cores").type().intType().noDefault()
+ .requiredLong("timestamp")
+ .requiredString("host_id")
+ .requiredBoolean("powered_on")
+ .requiredLong("uptime")
+ .requiredLong("downtime")
+ .requiredDouble("total_work")
+ .requiredDouble("granted_work")
+ .requiredDouble("overcommitted_work")
+ .requiredDouble("interfered_work")
+ .requiredDouble("cpu_usage")
+ .requiredDouble("cpu_demand")
+ .requiredDouble("power_draw")
+ .requiredInt("num_instances")
+ .requiredInt("num_cpus")
.endRecord()
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
new file mode 100644
index 00000000..9904adde
--- /dev/null
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServerDataWriter.kt
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2020 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.capelin.export.parquet
+
+import org.apache.avro.Schema
+import org.apache.avro.SchemaBuilder
+import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetWriter
+import org.opendc.telemetry.compute.table.ServerData
+import java.io.File
+
+/**
+ * A Parquet event writer for [ServerData]s.
+ */
+public class ParquetServerDataWriter(path: File, bufferSize: Int) :
+ ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) {
+
+ override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ return builder
+ .withDictionaryEncoding("server_id", true)
+ .withDictionaryEncoding("state", true)
+ .build()
+ }
+
+ override fun convert(builder: GenericRecordBuilder, data: ServerData) {
+ builder["timestamp"] = data.timestamp
+ builder["server_id"] = data.server.uid.toString()
+ builder["state"] = data.server.state
+ builder["uptime"] = data.uptime
+ builder["downtime"] = data.downtime
+ builder["num_vcpus"] = data.server.flavor.cpuCount
+ builder["mem_capacity"] = data.server.flavor.memorySize
+ }
+
+ override fun toString(): String = "server-writer"
+
+ companion object {
+ private val SCHEMA: Schema = SchemaBuilder
+ .record("server")
+ .namespace("org.opendc.telemetry.compute")
+ .fields()
+ .requiredLong("timestamp")
+ .requiredString("server_id")
+ .requiredString("state")
+ .requiredLong("uptime")
+ .requiredLong("downtime")
+ .requiredInt("num_vcpus")
+ .requiredLong("mem_capacity")
+ .endRecord()
+ }
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
index 36d630f3..e1428fe7 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/export/parquet/ParquetServiceDataWriter.kt
@@ -24,7 +24,7 @@ package org.opendc.experiments.capelin.export.parquet
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
+import org.apache.avro.generic.GenericRecordBuilder
import org.opendc.telemetry.compute.table.ServiceData
import java.io.File
@@ -32,34 +32,34 @@ import java.io.File
* A Parquet event writer for [ServiceData]s.
*/
public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServiceData>(path, schema, convert, bufferSize) {
+ ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) {
- override fun toString(): String = "service-writer"
+ override fun convert(builder: GenericRecordBuilder, data: ServiceData) {
+ builder["timestamp"] = data.timestamp
+ builder["host_total_count"] = data.hostCount
+ builder["host_available_count"] = data.activeHostCount
+ builder["instance_total_count"] = data.instanceCount
+ builder["instance_active_count"] = data.runningInstanceCount
+ builder["instance_inactive_count"] = data.finishedInstanceCount
+ builder["instance_waiting_count"] = data.queuedInstanceCount
+ builder["instance_failed_count"] = data.failedInstanceCount
+ }
- public companion object {
- private val convert: (ServiceData, GenericData.Record) -> Unit = { data, record ->
- record.put("timestamp", data.timestamp)
- record.put("host_total_count", data.hostCount)
- record.put("host_available_count", data.activeHostCount)
- record.put("instance_total_count", data.instanceCount)
- record.put("instance_active_count", data.runningInstanceCount)
- record.put("instance_inactive_count", data.finishedInstanceCount)
- record.put("instance_waiting_count", data.queuedInstanceCount)
- record.put("instance_failed_count", data.failedInstanceCount)
- }
+ override fun toString(): String = "service-writer"
- private val schema: Schema = SchemaBuilder
+ companion object {
+ private val SCHEMA: Schema = SchemaBuilder
.record("service")
.namespace("org.opendc.telemetry.compute")
.fields()
- .name("timestamp").type().longType().noDefault()
- .name("host_total_count").type().intType().noDefault()
- .name("host_available_count").type().intType().noDefault()
- .name("instance_total_count").type().intType().noDefault()
- .name("instance_active_count").type().intType().noDefault()
- .name("instance_inactive_count").type().intType().noDefault()
- .name("instance_waiting_count").type().intType().noDefault()
- .name("instance_failed_count").type().intType().noDefault()
+ .requiredLong("timestamp")
+ .requiredInt("host_total_count")
+ .requiredInt("host_available_count")
+ .requiredInt("instance_total_count")
+ .requiredInt("instance_active_count")
+ .requiredInt("instance_inactive_count")
+ .requiredInt("instance_waiting_count")
+ .requiredInt("instance_failed_count")
.endRecord()
}
}