summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-workload
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-15 15:32:11 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-25 17:58:54 +0200
commit17951889c6d805b907d936d54e7e66efb7376879 (patch)
treee1c54d4a381b708da5fe964ecd5897b55a63fa42 /opendc-compute/opendc-compute-workload
parentf565afb1ef7b940804af62aa73b6859dcb78a847 (diff)
perf(telemetry): Prevent allocations during collection cycle
This change redesigns the ComputeMonitor interface to reduce the number of memory allocations necessary during a collection cycle.
Diffstat (limited to 'opendc-compute/opendc-compute-workload')
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt18
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt23
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt8
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt8
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt8
5 files changed, 29 insertions, 36 deletions
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt
index ad182d67..a46885f4 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt
@@ -25,9 +25,9 @@ package org.opendc.compute.workload.export.parquet
import io.opentelemetry.sdk.common.CompletableResultCode
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
-import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.table.ServerData
-import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.compute.table.HostTableReader
+import org.opendc.telemetry.compute.table.ServerTableReader
+import org.opendc.telemetry.compute.table.ServiceTableReader
import java.io.File
/**
@@ -49,16 +49,16 @@ public class ParquetComputeMetricExporter(base: File, partition: String, bufferS
bufferSize
)
- override fun record(data: ServerData) {
- serverWriter.write(data)
+ override fun record(reader: ServerTableReader) {
+ serverWriter.write(reader)
}
- override fun record(data: HostData) {
- hostWriter.write(data)
+ override fun record(reader: HostTableReader) {
+ hostWriter.write(reader)
}
- override fun record(data: ServiceData) {
- serviceWriter.write(data)
+ override fun record(reader: ServiceTableReader) {
+ serviceWriter.write(reader)
}
override fun shutdown(): CompletableResultCode {
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
index 4172d729..84387bbc 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
@@ -50,9 +50,9 @@ public abstract class ParquetDataWriter<in T>(
private val logger = KotlinLogging.logger {}
/**
- * The queue of commands to process.
+ * The queue of records to process.
*/
- private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize)
+ private val queue: BlockingQueue<GenericData.Record> = ArrayBlockingQueue(bufferSize)
/**
* An exception to be propagated to the actual writer.
@@ -72,20 +72,20 @@ public abstract class ParquetDataWriter<in T>(
}
val queue = queue
- val buf = mutableListOf<T>()
+ val buf = mutableListOf<GenericData.Record>()
var shouldStop = false
try {
while (!shouldStop) {
try {
- process(writer, queue.take())
+ writer.write(queue.take())
} catch (e: InterruptedException) {
shouldStop = true
}
if (queue.drainTo(buf) > 0) {
for (data in buf) {
- process(writer, data)
+ writer.write(data)
}
buf.clear()
}
@@ -119,7 +119,9 @@ public abstract class ParquetDataWriter<in T>(
throw IllegalStateException("Writer thread failed", exception)
}
- queue.put(data)
+ val builder = GenericRecordBuilder(schema)
+ convert(builder, data)
+ queue.put(builder.build())
}
/**
@@ -133,13 +135,4 @@ public abstract class ParquetDataWriter<in T>(
init {
writerThread.start()
}
-
- /**
- * Process the specified [data] to be written to the Parquet file.
- */
- private fun process(writer: ParquetWriter<GenericData.Record>, data: T) {
- val builder = GenericRecordBuilder(schema)
- convert(builder, data)
- writer.write(builder.build())
- }
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
index 98a0739e..2b7cac8f 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
@@ -28,17 +28,17 @@ import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
-import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.HostTableReader
import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
import org.opendc.trace.util.parquet.UUID_SCHEMA
import org.opendc.trace.util.parquet.optional
import java.io.File
/**
- * A Parquet event writer for [HostData]s.
+ * A Parquet event writer for [HostTableReader]s.
*/
public class ParquetHostDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) {
+ ParquetDataWriter<HostTableReader>(path, SCHEMA, bufferSize) {
override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
return builder
@@ -46,7 +46,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
.build()
}
- override fun convert(builder: GenericRecordBuilder, data: HostData) {
+ override fun convert(builder: GenericRecordBuilder, data: HostTableReader) {
builder["timestamp"] = data.timestamp.toEpochMilli()
builder["host_id"] = data.host.id
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
index 4ebf8c62..144b6624 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
@@ -28,17 +28,17 @@ import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
-import org.opendc.telemetry.compute.table.ServerData
+import org.opendc.telemetry.compute.table.ServerTableReader
import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
import org.opendc.trace.util.parquet.UUID_SCHEMA
import org.opendc.trace.util.parquet.optional
import java.io.File
/**
- * A Parquet event writer for [ServerData]s.
+ * A Parquet event writer for [ServerTableReader]s.
*/
public class ParquetServerDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) {
+ ParquetDataWriter<ServerTableReader>(path, SCHEMA, bufferSize) {
override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
return builder
@@ -47,7 +47,7 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
.build()
}
- override fun convert(builder: GenericRecordBuilder, data: ServerData) {
+ override fun convert(builder: GenericRecordBuilder, data: ServerTableReader) {
builder["timestamp"] = data.timestamp.toEpochMilli()
builder["server_id"] = data.server.id
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
index 47824b29..ec8a2b65 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
@@ -25,17 +25,17 @@ package org.opendc.compute.workload.export.parquet
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericRecordBuilder
-import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.compute.table.ServiceTableReader
import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
import java.io.File
/**
- * A Parquet event writer for [ServiceData]s.
+ * A Parquet event writer for [ServiceTableReader]s.
*/
public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) {
+ ParquetDataWriter<ServiceTableReader>(path, SCHEMA, bufferSize) {
- override fun convert(builder: GenericRecordBuilder, data: ServiceData) {
+ override fun convert(builder: GenericRecordBuilder, data: ServiceTableReader) {
builder["timestamp"] = data.timestamp.toEpochMilli()
builder["hosts_up"] = data.hostsUp
builder["hosts_down"] = data.hostsDown