summaryrefslogtreecommitdiff
path: root/opendc-compute/opendc-compute-telemetry
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2024-03-05 13:23:57 +0100
committerGitHub <noreply@github.com>2024-03-05 13:23:57 +0100
commit5864cbcbfe2eb8c36ca05c3a39c7e5916aeecaec (patch)
tree5b2773b8dc21c2e1b526fb70f829c376dd80532a /opendc-compute/opendc-compute-telemetry
parentd28002a3c151d198298574312f32f1cb43f3a660 (diff)
Updated package versions, updated web server tests. (#207)
* Updated all package versions including kotlin. Updated all web-server tests to run. * Changed the java version of the tests. OpenDC now only supports java 19. * small update * test update * new update * updated docker version to 19 * updated docker version to 19
Diffstat (limited to 'opendc-compute/opendc-compute-telemetry')
-rw-r--r--opendc-compute/opendc-compute-telemetry/build.gradle.kts2
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt49
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt27
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt61
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt145
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt112
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt63
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt1
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt2
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt1
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt4
-rw-r--r--opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt1
12 files changed, 240 insertions, 228 deletions
diff --git a/opendc-compute/opendc-compute-telemetry/build.gradle.kts b/opendc-compute/opendc-compute-telemetry/build.gradle.kts
index c403ccb9..f7af3877 100644
--- a/opendc-compute/opendc-compute-telemetry/build.gradle.kts
+++ b/opendc-compute/opendc-compute-telemetry/build.gradle.kts
@@ -22,7 +22,7 @@
description = "OpenDC Compute Service implementation"
-/* Build configuration */
+// Build configuration
plugins {
`kotlin-library-conventions`
}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt
index db875449..830101ef 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt
@@ -53,7 +53,7 @@ public class ComputeMetricReader(
dispatcher: Dispatcher,
private val service: ComputeService,
private val monitor: ComputeMonitor,
- private val exportInterval: Duration = Duration.ofMinutes(5)
+ private val exportInterval: Duration = Duration.ofMinutes(5),
) : AutoCloseable {
private val logger = KotlinLogging.logger {}
private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher())
@@ -77,22 +77,23 @@ public class ComputeMetricReader(
/**
* The background job that is responsible for collecting the metrics every cycle.
*/
- private val job = scope.launch {
- val intervalMs = exportInterval.toMillis()
- try {
- while (isActive) {
- delay(intervalMs)
-
+ private val job =
+ scope.launch {
+ val intervalMs = exportInterval.toMillis()
+ try {
+ while (isActive) {
+ delay(intervalMs)
+
+ loggState()
+ }
+ } finally {
loggState()
- }
- } finally {
- loggState()
- if (monitor is AutoCloseable) {
- monitor.close()
+ if (monitor is AutoCloseable) {
+ monitor.close()
+ }
}
}
- }
private fun loggState() {
try {
@@ -127,7 +128,6 @@ public class ComputeMetricReader(
* An aggregator for service metrics before they are reported.
*/
private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader {
-
override fun copy(): ServiceTableReader {
val newServiceTable = ServiceTableReaderImpl(service)
newServiceTable.setValues(this)
@@ -402,16 +402,17 @@ public class ComputeMetricReader(
/**
* The static information about this server.
*/
- override val server = ServerInfo(
- server.uid.toString(),
- server.name,
- "vm",
- "x86",
- server.image.uid.toString(),
- server.image.name,
- server.flavor.cpuCount,
- server.flavor.memorySize
- )
+ override val server =
+ ServerInfo(
+ server.uid.toString(),
+ server.name,
+ "vm",
+ "x86",
+ server.image.uid.toString(),
+ server.image.name,
+ server.flavor.cpuCount,
+ server.flavor.memorySize,
+ )
/**
* The [HostInfo] of the host on which the server is hosted.
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt
index f60fbe6d..1c910497 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetComputeMonitor.kt
@@ -32,20 +32,23 @@ import java.io.File
* A [ComputeMonitor] that logs the events to a Parquet file.
*/
public class ParquetComputeMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
- private val serverWriter = ParquetServerDataWriter(
- File(base, "$partition/server.parquet").also { it.parentFile.mkdirs() },
- bufferSize
- )
+ private val serverWriter =
+ ParquetServerDataWriter(
+ File(base, "$partition/server.parquet").also { it.parentFile.mkdirs() },
+ bufferSize,
+ )
- private val hostWriter = ParquetHostDataWriter(
- File(base, "$partition/host.parquet").also { it.parentFile.mkdirs() },
- bufferSize
- )
+ private val hostWriter =
+ ParquetHostDataWriter(
+ File(base, "$partition/host.parquet").also { it.parentFile.mkdirs() },
+ bufferSize,
+ )
- private val serviceWriter = ParquetServiceDataWriter(
- File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() },
- bufferSize
- )
+ private val serviceWriter =
+ ParquetServiceDataWriter(
+ File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() },
+ bufferSize,
+ )
override fun record(reader: ServerTableReader) {
serverWriter.write(reader)
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt
index 34a75d75..b96ee28b 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetDataWriter.kt
@@ -43,7 +43,7 @@ import kotlin.concurrent.thread
public abstract class ParquetDataWriter<in T>(
path: File,
private val writeSupport: WriteSupport<T>,
- bufferSize: Int = 4096
+ bufferSize: Int = 4096,
) : AutoCloseable {
/**
* The logging instance to use.
@@ -63,41 +63,44 @@ public abstract class ParquetDataWriter<in T>(
/**
* The thread that is responsible for writing the Parquet records.
*/
- private val writerThread = thread(start = false, name = this.toString()) {
- val writer = let {
- val builder = LocalParquetWriter.builder(path.toPath(), writeSupport)
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- buildWriter(builder)
- }
+ private val writerThread =
+ thread(start = false, name = this.toString()) {
+ val writer =
+ let {
+ val builder =
+ LocalParquetWriter.builder(path.toPath(), writeSupport)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ buildWriter(builder)
+ }
- val queue = queue
- val buf = mutableListOf<T>()
- var shouldStop = false
+ val queue = queue
+ val buf = mutableListOf<T>()
+ var shouldStop = false
- try {
- while (!shouldStop) {
- try {
- writer.write(queue.take())
- } catch (e: InterruptedException) {
- shouldStop = true
- }
+ try {
+ while (!shouldStop) {
+ try {
+ writer.write(queue.take())
+ } catch (e: InterruptedException) {
+ shouldStop = true
+ }
- if (queue.drainTo(buf) > 0) {
- for (data in buf) {
- writer.write(data)
+ if (queue.drainTo(buf) > 0) {
+ for (data in buf) {
+ writer.write(data)
+ }
+ buf.clear()
}
- buf.clear()
}
+ } catch (e: Throwable) {
+ logger.error(e) { "Failure in Parquet data writer" }
+ exception = e
+ } finally {
+ writer.close()
}
- } catch (e: Throwable) {
- logger.error(e) { "Failure in Parquet data writer" }
- exception = e
- } finally {
- writer.close()
}
- }
/**
* Build the [ParquetWriter] used to write the Parquet files.
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt
index a6799ef8..b789e44f 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt
@@ -40,7 +40,6 @@ import java.io.File
*/
public class ParquetHostDataWriter(path: File, bufferSize: Int) :
ParquetDataWriter<HostTableReader>(path, HostDataWriteSupport(), bufferSize) {
-
override fun buildWriter(builder: LocalParquetWriter.Builder<HostTableReader>): ParquetWriter<HostTableReader> {
return builder
.withDictionaryEncoding("host_id", true)
@@ -67,7 +66,10 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
write(recordConsumer, record)
}
- private fun write(consumer: RecordConsumer, data: HostTableReader) {
+ private fun write(
+ consumer: RecordConsumer,
+ data: HostTableReader,
+ ) {
consumer.startMessage()
consumer.startField("timestamp", 0)
@@ -165,76 +167,77 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
/**
* The schema of the host data.
*/
- val SCHEMA: MessageType = Types
- .buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
+ val SCHEMA: MessageType =
+ Types
+ .buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
// .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("timestamp"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("host_id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("mem_capacity"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_terminated"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_running"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_error"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("guests_invalid"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_limit"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_usage"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_demand"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_utilization"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_active"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_idle"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_steal"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_lost"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("power_draw"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("energy_usage"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("uptime"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("downtime"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("host_id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_terminated"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_running"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_error"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_invalid"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_limit"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_usage"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_demand"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_utilization"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_active"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_idle"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_steal"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_lost"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("power_draw"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("energy_usage"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("uptime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("downtime"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
// .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("boot_time")
- )
- .named("host")
+ .named("boot_time"),
+ )
+ .named("host")
}
}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt
index e8a28016..bcae6805 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt
@@ -40,7 +40,6 @@ import java.io.File
*/
public class ParquetServerDataWriter(path: File, bufferSize: Int) :
ParquetDataWriter<ServerTableReader>(path, ServerDataWriteSupport(), bufferSize) {
-
override fun buildWriter(builder: LocalParquetWriter.Builder<ServerTableReader>): ParquetWriter<ServerTableReader> {
return builder
.withDictionaryEncoding("server_id", true)
@@ -68,7 +67,10 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
write(recordConsumer, record)
}
- private fun write(consumer: RecordConsumer, data: ServerTableReader) {
+ private fun write(
+ consumer: RecordConsumer,
+ data: ServerTableReader,
+ ) {
consumer.startMessage()
consumer.startField("timestamp", 0)
@@ -148,61 +150,61 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
/**
* The schema of the server data.
*/
- val SCHEMA: MessageType = Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
+ val SCHEMA: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
// .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("timestamp"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("server_id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("server_name"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.BINARY)
- .`as`(LogicalTypeAnnotation.stringType())
- .named("host_id"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("mem_capacity"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("cpu_count"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
- .named("cpu_limit"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_active"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_idle"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_steal"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("cpu_time_lost"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("uptime"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
- .named("downtime"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("server_id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("server_name"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("host_id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_limit"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_active"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_idle"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_steal"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_lost"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("uptime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("downtime"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
// .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("provision_time"),
- Types
- .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("provision_time"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
// .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("boot_time")
-
- )
- .named("server")
+ .named("boot_time"),
+ )
+ .named("server")
}
}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt
index a487203e..21247ef3 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt
@@ -36,7 +36,6 @@ import java.io.File
*/
public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
ParquetDataWriter<ServiceTableReader>(path, ServiceDataWriteSupport(), bufferSize) {
-
override fun toString(): String = "service-writer"
/**
@@ -57,7 +56,10 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
write(recordConsumer, record)
}
- private fun write(consumer: RecordConsumer, data: ServiceTableReader) {
+ private fun write(
+ consumer: RecordConsumer,
+ data: ServiceTableReader,
+ ) {
consumer.startMessage()
consumer.startField("timestamp", 0)
@@ -97,34 +99,35 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
}
private companion object {
- private val SCHEMA: MessageType = Types.buildMessage()
- .addFields(
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT64)
+ private val SCHEMA: MessageType =
+ Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
// .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
- .named("timestamp"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("hosts_up"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("hosts_down"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("servers_pending"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("servers_active"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("attempts_success"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("attempts_failure"),
- Types
- .required(PrimitiveType.PrimitiveTypeName.INT32)
- .named("attempts_error")
- )
- .named("service")
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("hosts_up"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("hosts_down"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("servers_pending"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("servers_active"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("attempts_success"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("attempts_failure"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("attempts_error"),
+ )
+ .named("service")
}
}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt
index bfe2f281..f9fff3e5 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt
@@ -28,7 +28,6 @@ import java.time.Instant
* An interface that is used to read a row of a host trace entry.
*/
public interface HostTableReader {
-
public fun copy(): HostTableReader
public fun setValues(table: HostTableReader)
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt
index 96c5bb13..fb83bf06 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerInfo.kt
@@ -33,5 +33,5 @@ public data class ServerInfo(
val imageId: String,
val imageName: String,
val cpuCount: Int,
- val memCapacity: Long
+ val memCapacity: Long,
)
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt
index ec9743d8..0ebf9d2f 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt
@@ -28,7 +28,6 @@ import java.time.Instant
* An interface that is used to read a row of a server trace entry.
*/
public interface ServerTableReader {
-
public fun copy(): ServerTableReader
public fun setValues(table: ServerTableReader)
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt
index 0d8b2abd..ad4b3d49 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceData.kt
@@ -36,7 +36,7 @@ public data class ServiceData(
val serversActive: Int,
val attemptsSuccess: Int,
val attemptsFailure: Int,
- val attemptsError: Int
+ val attemptsError: Int,
)
/**
@@ -52,6 +52,6 @@ public fun ServiceTableReader.toServiceData(): ServiceData {
serversActive,
attemptsSuccess,
attemptsFailure,
- attemptsError
+ attemptsError,
)
}
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt
index 501e317c..10757a27 100644
--- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt
+++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt
@@ -28,7 +28,6 @@ import java.time.Instant
* An interface that is used to read a row of a service trace entry.
*/
public interface ServiceTableReader {
-
public fun copy(): ServiceTableReader
public fun setValues(table: ServiceTableReader)