diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2024-04-11 11:37:03 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-04-11 11:37:03 +0200 |
| commit | a7b0afbb5b7059274962ade234a50240677008fd (patch) | |
| tree | 554e3ca84168bd6c40b979437ccdf9d4e660ddad /opendc-compute/opendc-compute-telemetry | |
| parent | 48cde8e59f376c7249e51d8e45e22134d60956d9 (diff) | |
Added absolute timestamp based on the given workload to the output files (#215)
Diffstat (limited to 'opendc-compute/opendc-compute-telemetry')
7 files changed, 160 insertions, 94 deletions
diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt index 21cd93d6..91f7cecf 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/ComputeMetricReader.kt @@ -54,6 +54,7 @@ public class ComputeMetricReader( private val service: ComputeService, private val monitor: ComputeMonitor, private val exportInterval: Duration = Duration.ofMinutes(5), + private val startTime: Duration = Duration.ofMillis(0), ) : AutoCloseable { private val logger = KotlinLogging.logger {} private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher()) @@ -62,7 +63,7 @@ public class ComputeMetricReader( /** * Aggregator for service metrics. */ - private val serviceTableReader = ServiceTableReaderImpl(service) + private val serviceTableReader = ServiceTableReaderImpl(service, startTime) /** * Mapping from [Host] instances to [HostTableReaderImpl] @@ -100,14 +101,14 @@ public class ComputeMetricReader( val now = this.clock.instant() for (host in this.service.hosts) { - val reader = this.hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) } + val reader = this.hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it, startTime) } reader.record(now) this.monitor.record(reader.copy()) reader.reset() } for (server in this.service.servers) { - val reader = this.serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) } + val reader = this.serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it, startTime) } reader.record(now) this.monitor.record(reader.copy()) reader.reset() @@ -127,7 +128,10 @@ public class ComputeMetricReader( /** * An aggregator for service metrics before they are reported. */ - private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader { + private class ServiceTableReaderImpl( + private val service: ComputeService, + private val startTime: Duration = Duration.ofMillis(0), + ) : ServiceTableReader { override fun copy(): ServiceTableReader { val newServiceTable = ServiceTableReaderImpl(service) newServiceTable.setValues(this) @@ -137,6 +141,7 @@ public class ComputeMetricReader( override fun setValues(table: ServiceTableReader) { _timestamp = table.timestamp + _absoluteTimestamp = table.absoluteTimestamp _hostsUp = table.hostsUp _hostsDown = table.hostsDown @@ -152,6 +157,10 @@ public class ComputeMetricReader( override val timestamp: Instant get() = _timestamp + private var _absoluteTimestamp: Instant = Instant.MIN + override val absoluteTimestamp: Instant + get() = _absoluteTimestamp + override val hostsUp: Int get() = _hostsUp private var _hostsUp = 0 @@ -189,6 +198,7 @@ public class ComputeMetricReader( */ fun record(now: Instant) { _timestamp = now + _absoluteTimestamp = now + startTime val stats = service.getSchedulerStats() _hostsUp = stats.hostsAvailable @@ -205,7 +215,10 @@ public class ComputeMetricReader( /** * An aggregator for host metrics before they are reported. */ - private class HostTableReaderImpl(host: Host) : HostTableReader { + private class HostTableReaderImpl( + host: Host, + private val startTime: Duration = Duration.ofMillis(0), + ) : HostTableReader { override fun copy(): HostTableReader { val newHostTable = HostTableReaderImpl(_host) newHostTable.setValues(this) @@ -215,6 +228,8 @@ public class ComputeMetricReader( override fun setValues(table: HostTableReader) { _timestamp = table.timestamp + _absoluteTimestamp = table.absoluteTimestamp + _guestsTerminated = table.guestsTerminated _guestsRunning = table.guestsRunning _guestsError = table.guestsError @@ -242,6 +257,10 @@ public class ComputeMetricReader( get() = _timestamp private var _timestamp = Instant.MIN + override val absoluteTimestamp: Instant + get() = _absoluteTimestamp + private var _absoluteTimestamp = Instant.MIN + override val guestsTerminated: Int get() = _guestsTerminated private var _guestsTerminated = 0 @@ -325,6 +344,8 @@ public class ComputeMetricReader( val hostSysStats = _host.getSystemStats() _timestamp = now + _absoluteTimestamp = now + startTime + _guestsTerminated = hostSysStats.guestsTerminated _guestsRunning = hostSysStats.guestsRunning _guestsError = hostSysStats.guestsError @@ -374,7 +395,11 @@ public class ComputeMetricReader( /** * An aggregator for server metrics before they are reported. */ - private class ServerTableReaderImpl(private val service: ComputeService, server: Server) : ServerTableReader { + private class ServerTableReaderImpl( + private val service: ComputeService, + server: Server, + private val startTime: Duration = Duration.ofMillis(0), + ) : ServerTableReader { override fun copy(): ServerTableReader { val newServerTable = ServerTableReaderImpl(service, _server) newServerTable.setValues(this) @@ -386,6 +411,8 @@ public class ComputeMetricReader( host = table.host _timestamp = table.timestamp + _absoluteTimestamp = table.absoluteTimestamp + _cpuLimit = table.cpuLimit _cpuActiveTime = table.cpuActiveTime _cpuIdleTime = table.cpuIdleTime @@ -424,6 +451,10 @@ public class ComputeMetricReader( override val timestamp: Instant get() = _timestamp + private var _absoluteTimestamp = Instant.MIN + override val absoluteTimestamp: Instant + get() = _absoluteTimestamp + override val uptime: Long get() = _uptime - previousUptime private var _uptime: Long = 0 @@ -480,6 +511,8 @@ public class ComputeMetricReader( val sysStats = _host?.getSystemStats(_server) _timestamp = now + _absoluteTimestamp = now + startTime + _cpuLimit = cpuStats?.capacity ?: 0.0 _cpuActiveTime = cpuStats?.activeTime ?: 0 _cpuIdleTime = cpuStats?.idleTime ?: 0 diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt index b789e44f..4fc5a078 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetHostDataWriter.kt @@ -51,7 +51,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : /** * A [WriteSupport] implementation for a [HostTableReader]. */ - private class HostDataWriteSupport : WriteSupport<HostTableReader>() { + private class HostDataWriteSupport() : WriteSupport<HostTableReader>() { lateinit var recordConsumer: RecordConsumer override fun init(configuration: Configuration): WriteContext { @@ -76,87 +76,91 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : consumer.addLong(data.timestamp.toEpochMilli()) consumer.endField("timestamp", 0) - consumer.startField("host_id", 1) + consumer.startField("absolute_timestamp", 1) + consumer.addLong(data.absoluteTimestamp.toEpochMilli()) + consumer.endField("absolute_timestamp", 1) + + consumer.startField("host_id", 2) consumer.addBinary(Binary.fromString(data.host.id)) - consumer.endField("host_id", 1) + consumer.endField("host_id", 2) - consumer.startField("cpu_count", 2) + consumer.startField("cpu_count", 3) consumer.addInteger(data.host.cpuCount) - consumer.endField("cpu_count", 2) + consumer.endField("cpu_count", 3) - consumer.startField("mem_capacity", 3) + consumer.startField("mem_capacity", 4) consumer.addLong(data.host.memCapacity) - consumer.endField("mem_capacity", 3) + consumer.endField("mem_capacity", 4) - consumer.startField("guests_terminated", 4) + consumer.startField("guests_terminated", 5) consumer.addInteger(data.guestsTerminated) - consumer.endField("guests_terminated", 4) + consumer.endField("guests_terminated", 5) - consumer.startField("guests_running", 5) + consumer.startField("guests_running", 6) consumer.addInteger(data.guestsRunning) - consumer.endField("guests_running", 5) + consumer.endField("guests_running", 6) - consumer.startField("guests_error", 6) + consumer.startField("guests_error", 7) consumer.addInteger(data.guestsError) - consumer.endField("guests_error", 6) + consumer.endField("guests_error", 7) - consumer.startField("guests_invalid", 7) + consumer.startField("guests_invalid", 8) consumer.addInteger(data.guestsInvalid) - consumer.endField("guests_invalid", 7) + consumer.endField("guests_invalid", 8) - consumer.startField("cpu_limit", 8) + consumer.startField("cpu_limit", 9) consumer.addDouble(data.cpuLimit) - consumer.endField("cpu_limit", 8) + consumer.endField("cpu_limit", 9) - consumer.startField("cpu_usage", 9) + consumer.startField("cpu_usage", 10) consumer.addDouble(data.cpuUsage) - consumer.endField("cpu_usage", 9) + consumer.endField("cpu_usage", 10) - consumer.startField("cpu_demand", 10) + consumer.startField("cpu_demand", 11) consumer.addDouble(data.cpuUsage) - consumer.endField("cpu_demand", 10) + consumer.endField("cpu_demand", 11) - consumer.startField("cpu_utilization", 11) + consumer.startField("cpu_utilization", 12) consumer.addDouble(data.cpuUtilization) - consumer.endField("cpu_utilization", 11) + consumer.endField("cpu_utilization", 12) - consumer.startField("cpu_time_active", 12) + consumer.startField("cpu_time_active", 13) consumer.addLong(data.cpuActiveTime) - consumer.endField("cpu_time_active", 12) + consumer.endField("cpu_time_active", 13) - consumer.startField("cpu_time_idle", 13) + consumer.startField("cpu_time_idle", 14) consumer.addLong(data.cpuIdleTime) - consumer.endField("cpu_time_idle", 13) + consumer.endField("cpu_time_idle", 14) - consumer.startField("cpu_time_steal", 14) + consumer.startField("cpu_time_steal", 15) consumer.addLong(data.cpuStealTime) - consumer.endField("cpu_time_steal", 14) + consumer.endField("cpu_time_steal", 15) - consumer.startField("cpu_time_lost", 15) + consumer.startField("cpu_time_lost", 16) consumer.addLong(data.cpuLostTime) - consumer.endField("cpu_time_lost", 15) + consumer.endField("cpu_time_lost", 16) - consumer.startField("power_draw", 16) + consumer.startField("power_draw", 17) consumer.addDouble(data.powerDraw) - consumer.endField("power_draw", 16) + consumer.endField("power_draw", 17) - consumer.startField("energy_usage", 17) + consumer.startField("energy_usage", 18) consumer.addDouble(data.energyUsage) - consumer.endField("energy_usage", 17) + consumer.endField("energy_usage", 18) - consumer.startField("uptime", 18) + consumer.startField("uptime", 19) consumer.addLong(data.uptime) - consumer.endField("uptime", 18) + consumer.endField("uptime", 19) - consumer.startField("downtime", 19) + consumer.startField("downtime", 20) consumer.addLong(data.downtime) - consumer.endField("downtime", 19) + consumer.endField("downtime", 20) val bootTime = data.bootTime if (bootTime != null) { - consumer.startField("boot_time", 20) + consumer.startField("boot_time", 21) consumer.addLong(bootTime.toEpochMilli()) - consumer.endField("boot_time", 20) + consumer.endField("boot_time", 21) } consumer.endMessage() @@ -173,9 +177,11 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : .addFields( Types .required(PrimitiveType.PrimitiveTypeName.INT64) -// .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) .named("timestamp"), Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("absolute_timestamp"), + Types .required(PrimitiveType.PrimitiveTypeName.BINARY) .`as`(LogicalTypeAnnotation.stringType()) .named("host_id"), diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt index bcae6805..4045d070 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServerDataWriter.kt @@ -52,7 +52,7 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : /** * A [WriteSupport] implementation for a [ServerTableReader]. */ - private class ServerDataWriteSupport : WriteSupport<ServerTableReader>() { + private class ServerDataWriteSupport() : WriteSupport<ServerTableReader>() { lateinit var recordConsumer: RecordConsumer override fun init(configuration: Configuration): WriteContext { @@ -77,69 +77,73 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : consumer.addLong(data.timestamp.toEpochMilli()) consumer.endField("timestamp", 0) - consumer.startField("server_id", 1) + consumer.startField("absolute_timestamp", 1) + consumer.addLong(data.absoluteTimestamp.toEpochMilli()) + consumer.endField("absolute_timestamp", 1) + + consumer.startField("server_id", 2) consumer.addBinary(Binary.fromString(data.server.id)) - consumer.endField("server_id", 1) + consumer.endField("server_id", 2) - consumer.startField("server_name", 2) + consumer.startField("server_name", 3) consumer.addBinary(Binary.fromString(data.server.name)) - consumer.endField("server_name", 2) + consumer.endField("server_name", 3) val hostId = data.host?.id if (hostId != null) { - consumer.startField("host_id", 3) + consumer.startField("host_id", 4) consumer.addBinary(Binary.fromString(hostId)) - consumer.endField("host_id", 3) + consumer.endField("host_id", 4) } - consumer.startField("mem_capacity", 4) + consumer.startField("mem_capacity", 5) consumer.addLong(data.server.memCapacity) - consumer.endField("mem_capacity", 4) + consumer.endField("mem_capacity", 5) - consumer.startField("cpu_count", 5) + consumer.startField("cpu_count", 6) consumer.addInteger(data.server.cpuCount) - consumer.endField("cpu_count", 5) + consumer.endField("cpu_count", 6) - consumer.startField("cpu_limit", 6) + consumer.startField("cpu_limit", 7) consumer.addDouble(data.cpuLimit) - consumer.endField("cpu_limit", 6) + consumer.endField("cpu_limit", 7) - consumer.startField("cpu_time_active", 7) + consumer.startField("cpu_time_active", 8) consumer.addLong(data.cpuActiveTime) - consumer.endField("cpu_time_active", 7) + consumer.endField("cpu_time_active", 8) - consumer.startField("cpu_time_idle", 8) + consumer.startField("cpu_time_idle", 9) consumer.addLong(data.cpuIdleTime) - consumer.endField("cpu_time_idle", 8) + consumer.endField("cpu_time_idle", 9) - consumer.startField("cpu_time_steal", 9) + consumer.startField("cpu_time_steal", 10) consumer.addLong(data.cpuStealTime) - consumer.endField("cpu_time_steal", 9) + consumer.endField("cpu_time_steal", 10) - consumer.startField("cpu_time_lost", 10) + consumer.startField("cpu_time_lost", 11) consumer.addLong(data.cpuLostTime) - consumer.endField("cpu_time_lost", 10) + consumer.endField("cpu_time_lost", 11) - consumer.startField("uptime", 11) + consumer.startField("uptime", 12) consumer.addLong(data.uptime) - consumer.endField("uptime", 11) + consumer.endField("uptime", 12) - consumer.startField("downtime", 12) + consumer.startField("downtime", 13) consumer.addLong(data.downtime) - consumer.endField("downtime", 12) + consumer.endField("downtime", 13) val provisionTime = data.provisionTime if (provisionTime != null) { - consumer.startField("provision_time", 13) + consumer.startField("provision_time", 14) consumer.addLong(provisionTime.toEpochMilli()) - consumer.endField("provision_time", 13) + consumer.endField("provision_time", 14) } val bootTime = data.bootTime if (bootTime != null) { - consumer.startField("boot_time", 14) + consumer.startField("boot_time", 15) consumer.addLong(bootTime.toEpochMilli()) - consumer.endField("boot_time", 14) + consumer.endField("boot_time", 15) } consumer.endMessage() @@ -155,9 +159,11 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : .addFields( Types .required(PrimitiveType.PrimitiveTypeName.INT64) -// .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) .named("timestamp"), Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("absolute_timestamp"), + Types .required(PrimitiveType.PrimitiveTypeName.BINARY) .`as`(LogicalTypeAnnotation.stringType()) .named("server_id"), diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt index 21247ef3..068f82ba 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/export/parquet/ParquetServiceDataWriter.kt @@ -41,7 +41,7 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : /** * A [WriteSupport] implementation for a [ServiceTableReader]. */ - private class ServiceDataWriteSupport : WriteSupport<ServiceTableReader>() { + private class ServiceDataWriteSupport() : WriteSupport<ServiceTableReader>() { lateinit var recordConsumer: RecordConsumer override fun init(configuration: Configuration): WriteContext { @@ -66,33 +66,37 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : consumer.addLong(data.timestamp.toEpochMilli()) consumer.endField("timestamp", 0) - consumer.startField("hosts_up", 1) + consumer.startField("absolute_timestamp", 1) + consumer.addLong(data.absoluteTimestamp.toEpochMilli()) + consumer.endField("absolute_timestamp", 1) + + consumer.startField("hosts_up", 2) consumer.addInteger(data.hostsUp) - consumer.endField("hosts_up", 1) + consumer.endField("hosts_up", 2) - consumer.startField("hosts_down", 2) + consumer.startField("hosts_down", 3) consumer.addInteger(data.hostsDown) - consumer.endField("hosts_down", 2) + consumer.endField("hosts_down", 3) - consumer.startField("servers_pending", 3) + consumer.startField("servers_pending", 4) consumer.addInteger(data.serversPending) - consumer.endField("servers_pending", 3) + consumer.endField("servers_pending", 4) - consumer.startField("servers_active", 4) + consumer.startField("servers_active", 5) consumer.addInteger(data.serversActive) - consumer.endField("servers_active", 4) + consumer.endField("servers_active", 5) - consumer.startField("attempts_success", 5) + consumer.startField("attempts_success", 6) consumer.addInteger(data.attemptsSuccess) - consumer.endField("attempts_pending", 5) + consumer.endField("attempts_pending", 6) - consumer.startField("attempts_failure", 6) + consumer.startField("attempts_failure", 7) consumer.addInteger(data.attemptsFailure) - consumer.endField("attempts_failure", 6) + consumer.endField("attempts_failure", 7) - consumer.startField("attempts_error", 7) + consumer.startField("attempts_error", 8) consumer.addInteger(data.attemptsError) - consumer.endField("attempts_error", 7) + consumer.endField("attempts_error", 8) consumer.endMessage() } @@ -104,9 +108,11 @@ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : .addFields( Types .required(PrimitiveType.PrimitiveTypeName.INT64) -// .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) .named("timestamp"), Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("absolute_timestamp"), + Types .required(PrimitiveType.PrimitiveTypeName.INT32) .named("hosts_up"), Types diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt index f9fff3e5..7246ef55 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/HostTableReader.kt @@ -43,6 +43,11 @@ public interface HostTableReader { public val timestamp: Instant /** + * The timestamp of the current entry of the reader. + */ + public val absoluteTimestamp: Instant + + /** * The number of guests that are in a terminated state. */ public val guestsTerminated: Int diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt index 0ebf9d2f..baac1142 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServerTableReader.kt @@ -38,6 +38,11 @@ public interface ServerTableReader { public val timestamp: Instant /** + * The timestamp of the current entry of the reader. + */ + public val absoluteTimestamp: Instant + + /** * The [ServerInfo] of the server to which the row belongs to. */ public val server: ServerInfo diff --git a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt index 10757a27..3b184913 100644 --- a/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt +++ b/opendc-compute/opendc-compute-telemetry/src/main/kotlin/org/opendc/compute/telemetry/table/ServiceTableReader.kt @@ -38,6 +38,11 @@ public interface ServiceTableReader { public val timestamp: Instant /** + * The timestamp of the current entry of the reader. + */ + public val absoluteTimestamp: Instant + + /** * The number of hosts that are up at this instant. */ public val hostsUp: Int |
