diff options
16 files changed, 559 insertions, 334 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index a0ff9228..799a8cf0 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -46,8 +46,8 @@ import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.FlowEngine import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.HOST_ID -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.ServerData +import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServerTableReader import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import org.opendc.telemetry.sdk.toOtelClock import java.time.Duration @@ -140,10 +140,10 @@ internal class SimHostTest { val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), object : ComputeMetricExporter() { - override fun record(data: HostData) { - activeTime += data.cpuActiveTime - idleTime += data.cpuIdleTime - stealTime += data.cpuStealTime + override fun record(reader: HostTableReader) { + activeTime += reader.cpuActiveTime + idleTime += reader.cpuIdleTime + stealTime += reader.cpuStealTime } }, exportInterval = Duration.ofSeconds(duration) @@ -236,16 +236,16 @@ internal class SimHostTest { val reader = CoroutineMetricReader( this, listOf(meterProvider as MetricProducer), object : ComputeMetricExporter() { - override fun record(data: HostData) { - activeTime += data.cpuActiveTime - idleTime += data.cpuIdleTime - uptime += data.uptime - downtime += data.downtime + override fun record(reader: HostTableReader) { + activeTime += reader.cpuActiveTime + idleTime += reader.cpuIdleTime + uptime += reader.uptime + downtime += reader.downtime } - override fun record(data: ServerData) { - guestUptime += data.uptime - guestDowntime += data.downtime + override fun record(reader: ServerTableReader) { + guestUptime += reader.uptime + guestDowntime += reader.downtime } }, exportInterval = Duration.ofSeconds(duration) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt index ad182d67..a46885f4 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt @@ -25,9 +25,9 @@ package org.opendc.compute.workload.export.parquet import io.opentelemetry.sdk.common.CompletableResultCode import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.ServerData -import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServerTableReader +import org.opendc.telemetry.compute.table.ServiceTableReader import java.io.File /** @@ -49,16 +49,16 @@ public class ParquetComputeMetricExporter(base: File, partition: String, bufferS bufferSize ) - override fun record(data: ServerData) { - serverWriter.write(data) + override fun record(reader: ServerTableReader) { + serverWriter.write(reader) } - override fun record(data: HostData) { - hostWriter.write(data) + override fun record(reader: HostTableReader) { + hostWriter.write(reader) } - override fun record(data: ServiceData) { - serviceWriter.write(data) + override fun record(reader: ServiceTableReader) { + serviceWriter.write(reader) } override fun shutdown(): CompletableResultCode { diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt index 4172d729..84387bbc 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt @@ -50,9 +50,9 @@ public abstract class ParquetDataWriter<in T>( private val logger = KotlinLogging.logger {} /** - * The queue of commands to process. + * The queue of records to process. */ - private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize) + private val queue: BlockingQueue<GenericData.Record> = ArrayBlockingQueue(bufferSize) /** * An exception to be propagated to the actual writer. @@ -72,20 +72,20 @@ public abstract class ParquetDataWriter<in T>( } val queue = queue - val buf = mutableListOf<T>() + val buf = mutableListOf<GenericData.Record>() var shouldStop = false try { while (!shouldStop) { try { - process(writer, queue.take()) + writer.write(queue.take()) } catch (e: InterruptedException) { shouldStop = true } if (queue.drainTo(buf) > 0) { for (data in buf) { - process(writer, data) + writer.write(data) } buf.clear() } @@ -119,7 +119,9 @@ public abstract class ParquetDataWriter<in T>( throw IllegalStateException("Writer thread failed", exception) } - queue.put(data) + val builder = GenericRecordBuilder(schema) + convert(builder, data) + queue.put(builder.build()) } /** @@ -133,13 +135,4 @@ public abstract class ParquetDataWriter<in T>( init { writerThread.start() } - - /** - * Process the specified [data] to be written to the Parquet file. - */ - private fun process(writer: ParquetWriter<GenericData.Record>, data: T) { - val builder = GenericRecordBuilder(schema) - convert(builder, data) - writer.write(builder.build()) - } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt index 98a0739e..2b7cac8f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt @@ -28,17 +28,17 @@ import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.HostTableReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA import org.opendc.trace.util.parquet.UUID_SCHEMA import org.opendc.trace.util.parquet.optional import java.io.File /** - * A Parquet event writer for [HostData]s. + * A Parquet event writer for [HostTableReader]s. */ public class ParquetHostDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) { + ParquetDataWriter<HostTableReader>(path, SCHEMA, bufferSize) { override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { return builder @@ -46,7 +46,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) : .build() } - override fun convert(builder: GenericRecordBuilder, data: HostData) { + override fun convert(builder: GenericRecordBuilder, data: HostTableReader) { builder["timestamp"] = data.timestamp.toEpochMilli() builder["host_id"] = data.host.id diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt index 4ebf8c62..144b6624 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt @@ -28,17 +28,17 @@ import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter -import org.opendc.telemetry.compute.table.ServerData +import org.opendc.telemetry.compute.table.ServerTableReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA import org.opendc.trace.util.parquet.UUID_SCHEMA import org.opendc.trace.util.parquet.optional import java.io.File /** - * A Parquet event writer for [ServerData]s. + * A Parquet event writer for [ServerTableReader]s. */ public class ParquetServerDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) { + ParquetDataWriter<ServerTableReader>(path, SCHEMA, bufferSize) { override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { return builder @@ -47,7 +47,7 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) : .build() } - override fun convert(builder: GenericRecordBuilder, data: ServerData) { + override fun convert(builder: GenericRecordBuilder, data: ServerTableReader) { builder["timestamp"] = data.timestamp.toEpochMilli() builder["server_id"] = data.server.id diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt index 47824b29..ec8a2b65 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt @@ -25,17 +25,17 @@ package org.opendc.compute.workload.export.parquet import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericRecordBuilder -import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.ServiceTableReader import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA import java.io.File /** - * A Parquet event writer for [ServiceData]s. + * A Parquet event writer for [ServiceTableReader]s. */ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) { + ParquetDataWriter<ServiceTableReader>(path, SCHEMA, bufferSize) { - override fun convert(builder: GenericRecordBuilder, data: ServiceData) { + override fun convert(builder: GenericRecordBuilder, data: ServiceTableReader) { builder["timestamp"] = data.timestamp.toEpochMilli() builder["hosts_up"] = data.hostsUp builder["hosts_down"] = data.hostsDown diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 56ba9cfe..f3a6ed1a 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -39,7 +39,7 @@ import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.collectServiceMetrics -import org.opendc.telemetry.compute.table.HostData +import org.opendc.telemetry.compute.table.HostTableReader import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Duration @@ -284,13 +284,13 @@ class CapelinIntegrationTest { var energyUsage = 0.0 var uptime = 0L - override fun record(data: HostData) { - idleTime += data.cpuIdleTime - activeTime += data.cpuActiveTime - stealTime += data.cpuStealTime - lostTime += data.cpuLostTime - energyUsage += data.powerTotal - uptime += data.uptime + override fun record(reader: HostTableReader) { + idleTime += reader.cpuIdleTime + activeTime += reader.cpuActiveTime + stealTime += reader.cpuStealTime + lostTime += reader.cpuLostTime + energyUsage += reader.powerTotal + uptime += reader.uptime } } } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt index b293f7b5..418dc201 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt @@ -20,6 +20,8 @@ * SOFTWARE. */ +@file:Suppress("PropertyName") + package org.opendc.telemetry.compute import io.opentelemetry.api.common.AttributeKey @@ -32,7 +34,7 @@ import org.opendc.telemetry.compute.table.* import java.time.Instant /** - * Helper class responsible for aggregating [MetricData] into [ServiceData], [HostData] and [ServerData]. + * Helper class responsible for aggregating [MetricData] into [ServiceTableReader], [HostTableReader] and [ServerTableReader]. */ public class ComputeMetricAggregator { private val _service = ServiceAggregator() @@ -58,25 +60,25 @@ public class ComputeMetricAggregator { service.recordTimestamp(point) when (point.attributes[STATE_KEY]) { - "up" -> service.hostsUp = point.value.toInt() - "down" -> service.hostsDown = point.value.toInt() + "up" -> service._hostsUp = point.value.toInt() + "down" -> service._hostsDown = point.value.toInt() } } } "scheduler.servers" -> { for (point in metric.longSumData.points) { when (point.attributes[STATE_KEY]) { - "pending" -> service.serversPending = point.value.toInt() - "active" -> service.serversActive = point.value.toInt() + "pending" -> service._serversPending = point.value.toInt() + "active" -> service._serversActive = point.value.toInt() } } } "scheduler.attempts" -> { for (point in metric.longSumData.points) { when (point.attributes[RESULT_KEY]) { - "success" -> service.attemptsSuccess = point.value.toInt() - "failure" -> service.attemptsFailure = point.value.toInt() - "error" -> service.attemptsError = point.value.toInt() + "success" -> service._attemptsSuccess = point.value.toInt() + "failure" -> service._attemptsFailure = point.value.toInt() + "error" -> service._attemptsError = point.value.toInt() } } } @@ -87,10 +89,10 @@ public class ComputeMetricAggregator { for (point in metric.longSumData.points) { when (point.attributes[STATE_KEY]) { - "terminated" -> agg.guestsTerminated = point.value.toInt() - "running" -> agg.guestsRunning = point.value.toInt() - "error" -> agg.guestsRunning = point.value.toInt() - "invalid" -> agg.guestsInvalid = point.value.toInt() + "terminated" -> agg._guestsTerminated = point.value.toInt() + "running" -> agg._guestsRunning = point.value.toInt() + "error" -> agg._guestsRunning = point.value.toInt() + "invalid" -> agg._guestsInvalid = point.value.toInt() } } } @@ -101,24 +103,24 @@ public class ComputeMetricAggregator { val server = getServer(servers, point) if (server != null) { - server.cpuLimit = point.value - server.host = agg.host + server._cpuLimit = point.value + server._host = agg.host } else { - agg.cpuLimit = point.value + agg._cpuLimit = point.value } } } "system.cpu.usage" -> { val agg = getHost(hosts, resource) ?: continue - agg.cpuUsage = metric.doubleGaugeData.points.first().value + agg._cpuUsage = metric.doubleGaugeData.points.first().value } "system.cpu.demand" -> { val agg = getHost(hosts, resource) ?: continue - agg.cpuDemand = metric.doubleGaugeData.points.first().value + agg._cpuDemand = metric.doubleGaugeData.points.first().value } "system.cpu.utilization" -> { val agg = getHost(hosts, resource) ?: continue - agg.cpuUtilization = metric.doubleGaugeData.points.first().value + agg._cpuUtilization = metric.doubleGaugeData.points.first().value } "system.cpu.time" -> { val agg = getHost(hosts, resource) ?: continue @@ -128,29 +130,29 @@ public class ComputeMetricAggregator { val state = point.attributes[STATE_KEY] if (server != null) { when (state) { - "active" -> server.cpuActiveTime = point.value - "idle" -> server.cpuIdleTime = point.value - "steal" -> server.cpuStealTime = point.value - "lost" -> server.cpuLostTime = point.value + "active" -> server._cpuActiveTime = point.value + "idle" -> server._cpuIdleTime = point.value + "steal" -> server._cpuStealTime = point.value + "lost" -> server._cpuLostTime = point.value } - server.host = agg.host + server._host = agg.host } else { when (state) { - "active" -> agg.cpuActiveTime = point.value - "idle" -> agg.cpuIdleTime = point.value - "steal" -> agg.cpuStealTime = point.value - "lost" -> agg.cpuLostTime = point.value + "active" -> agg._cpuActiveTime = point.value + "idle" -> agg._cpuIdleTime = point.value + "steal" -> agg._cpuStealTime = point.value + "lost" -> agg._cpuLostTime = point.value } } } } "system.power.usage" -> { val agg = getHost(hosts, resource) ?: continue - agg.powerUsage = metric.doubleGaugeData.points.first().value + agg._powerUsage = metric.doubleGaugeData.points.first().value } "system.power.total" -> { val agg = getHost(hosts, resource) ?: continue - agg.powerTotal = metric.doubleSumData.points.first().value + agg._powerTotal = metric.doubleSumData.points.first().value } "system.time" -> { val agg = getHost(hosts, resource) ?: continue @@ -162,16 +164,16 @@ public class ComputeMetricAggregator { server.recordTimestamp(point) when (point.attributes[STATE_KEY]) { - "up" -> server.uptime = point.value - "down" -> server.downtime = point.value + "up" -> server._uptime = point.value + "down" -> server._downtime = point.value } - server.host = agg.host + server._host = agg.host } else { agg.recordTimestamp(point) when (point.attributes[STATE_KEY]) { - "up" -> agg.uptime = point.value - "down" -> agg.downtime = point.value + "up" -> agg._uptime = point.value + "down" -> agg._downtime = point.value } } } @@ -183,10 +185,10 @@ public class ComputeMetricAggregator { val server = getServer(servers, point) if (server != null) { - server.bootTime = Instant.ofEpochMilli(point.value) - server.host = agg.host + server._bootTime = Instant.ofEpochMilli(point.value) + server._host = agg.host } else { - agg.bootTime = Instant.ofEpochMilli(point.value) + agg._bootTime = Instant.ofEpochMilli(point.value) } } } @@ -194,7 +196,7 @@ public class ComputeMetricAggregator { for (point in metric.longGaugeData.points) { val server = getServer(servers, point) ?: continue server.recordTimestamp(point) - server.provisionTime = Instant.ofEpochMilli(point.value) + server._provisionTime = Instant.ofEpochMilli(point.value) } } } @@ -205,14 +207,16 @@ public class ComputeMetricAggregator { * Collect the data via the [monitor]. */ public fun collect(monitor: ComputeMonitor) { - monitor.record(_service.collect()) + monitor.record(_service) for (host in _hosts.values) { - monitor.record(host.collect()) + monitor.record(host) + host.reset() } for (server in _servers.values) { - monitor.record(server.collect()) + monitor.record(server) + server.reset() } } @@ -243,50 +247,55 @@ public class ComputeMetricAggregator { /** * An aggregator for service metrics before they are reported. */ - internal class ServiceAggregator { - private var timestamp = Long.MIN_VALUE + internal class ServiceAggregator : ServiceTableReader { + private var _timestamp: Instant = Instant.MIN + override val timestamp: Instant + get() = _timestamp - @JvmField var hostsUp = 0 - @JvmField var hostsDown = 0 + override val hostsUp: Int + get() = _hostsUp + @JvmField var _hostsUp = 0 - @JvmField var serversPending = 0 - @JvmField var serversActive = 0 + override val hostsDown: Int + get() = _hostsDown + @JvmField var _hostsDown = 0 - @JvmField var attemptsSuccess = 0 - @JvmField var attemptsFailure = 0 - @JvmField var attemptsError = 0 + override val serversPending: Int + get() = _serversPending + @JvmField var _serversPending = 0 - /** - * Finish the aggregation for this cycle. - */ - fun collect(): ServiceData { - val now = Instant.ofEpochMilli(timestamp) - return toServiceData(now) - } + override val serversActive: Int + get() = _serversActive + @JvmField var _serversActive = 0 - /** - * Convert the aggregator state to an immutable [ServiceData]. - */ - private fun toServiceData(now: Instant): ServiceData { - return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) - } + override val attemptsSuccess: Int + get() = _attemptsSuccess + @JvmField var _attemptsSuccess = 0 + + override val attemptsFailure: Int + get() = _attemptsFailure + @JvmField var _attemptsFailure = 0 + + override val attemptsError: Int + get() = _attemptsError + @JvmField var _attemptsError = 0 /** * Record the timestamp of a [point] for this aggregator. */ fun recordTimestamp(point: PointData) { - timestamp = point.epochNanos / 1_000_000L // ns to ms + _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms } } /** * An aggregator for host metrics before they are reported. */ - internal class HostAggregator(resource: Resource) { + internal class HostAggregator(resource: Resource) : HostTableReader { /** * The static information about this host. */ - val host = HostInfo( + override val host = HostInfo( resource.attributes[HOST_ID]!!, resource.attributes[HOST_NAME] ?: "", resource.attributes[HOST_ARCH] ?: "", @@ -294,111 +303,127 @@ public class ComputeMetricAggregator { resource.attributes[HOST_MEM_CAPACITY] ?: 0, ) - private var timestamp = Long.MIN_VALUE + override val timestamp: Instant + get() = _timestamp + private var _timestamp = Instant.MIN + + override val guestsTerminated: Int + get() = _guestsTerminated + @JvmField var _guestsTerminated = 0 + + override val guestsRunning: Int + get() = _guestsRunning + @JvmField var _guestsRunning = 0 + + override val guestsError: Int + get() = _guestsError + @JvmField var _guestsError = 0 - @JvmField var guestsTerminated = 0 - @JvmField var guestsRunning = 0 - @JvmField var guestsError = 0 - @JvmField var guestsInvalid = 0 + override val guestsInvalid: Int + get() = _guestsInvalid + @JvmField var _guestsInvalid = 0 - @JvmField var cpuLimit = 0.0 - @JvmField var cpuUsage = 0.0 - @JvmField var cpuDemand = 0.0 - @JvmField var cpuUtilization = 0.0 + override val cpuLimit: Double + get() = _cpuLimit + @JvmField var _cpuLimit = 0.0 - @JvmField var cpuActiveTime = 0L - @JvmField var cpuIdleTime = 0L - @JvmField var cpuStealTime = 0L - @JvmField var cpuLostTime = 0L + override val cpuUsage: Double + get() = _cpuUsage + @JvmField var _cpuUsage = 0.0 + + override val cpuDemand: Double + get() = _cpuDemand + @JvmField var _cpuDemand = 0.0 + + override val cpuUtilization: Double + get() = _cpuUtilization + @JvmField var _cpuUtilization = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + @JvmField var _cpuActiveTime = 0L private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + @JvmField var _cpuIdleTime = 0L private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + @JvmField var _cpuStealTime = 0L private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + @JvmField var _cpuLostTime = 0L private var previousCpuLostTime = 0L - @JvmField var powerUsage = 0.0 - @JvmField var powerTotal = 0.0 + override val powerUsage: Double + get() = _powerUsage + @JvmField var _powerUsage = 0.0 + + override val powerTotal: Double + get() = _powerTotal - previousPowerTotal + @JvmField var _powerTotal = 0.0 private var previousPowerTotal = 0.0 - @JvmField var uptime = 0L + override val uptime: Long + get() = _uptime - previousUptime + @JvmField var _uptime = 0L private var previousUptime = 0L - @JvmField var downtime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + @JvmField var _downtime = 0L private var previousDowntime = 0L - @JvmField var bootTime: Instant? = null + + override val bootTime: Instant? + get() = _bootTime + @JvmField var _bootTime: Instant? = null /** * Finish the aggregation for this cycle. */ - fun collect(): HostData { - val now = Instant.ofEpochMilli(timestamp) - val data = toHostData(now) - + fun reset() { // Reset intermediate state for next aggregation - previousCpuActiveTime = cpuActiveTime - previousCpuIdleTime = cpuIdleTime - previousCpuStealTime = cpuStealTime - previousCpuLostTime = cpuLostTime - previousPowerTotal = powerTotal - previousUptime = uptime - previousDowntime = downtime - - guestsTerminated = 0 - guestsRunning = 0 - guestsError = 0 - guestsInvalid = 0 - - cpuLimit = 0.0 - cpuUsage = 0.0 - cpuDemand = 0.0 - cpuUtilization = 0.0 - - powerUsage = 0.0 - - return data - } - - /** - * Convert the aggregator state to an immutable [HostData] instance. - */ - private fun toHostData(now: Instant): HostData { - return HostData( - now, - host, - guestsTerminated, - guestsRunning, - guestsError, - guestsInvalid, - cpuLimit, - cpuUsage, - cpuDemand, - cpuUtilization, - cpuActiveTime - previousCpuActiveTime, - cpuIdleTime - previousCpuIdleTime, - cpuStealTime - previousCpuStealTime, - cpuLostTime - previousCpuLostTime, - powerUsage, - powerTotal - previousPowerTotal, - uptime - previousUptime, - downtime - previousDowntime, - bootTime - ) + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + previousPowerTotal = _powerTotal + previousUptime = _uptime + previousDowntime = _downtime + + _guestsTerminated = 0 + _guestsRunning = 0 + _guestsError = 0 + _guestsInvalid = 0 + + _cpuLimit = 0.0 + _cpuUsage = 0.0 + _cpuDemand = 0.0 + _cpuUtilization = 0.0 + + _powerUsage = 0.0 } /** * Record the timestamp of a [point] for this aggregator. */ fun recordTimestamp(point: PointData) { - timestamp = point.epochNanos / 1_000_000L // ns to ms + _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms } } /** * An aggregator for server metrics before they are reported. */ - internal class ServerAggregator(attributes: Attributes) { + internal class ServerAggregator(attributes: Attributes) : ServerTableReader { /** * The static information about this server. */ - val server = ServerInfo( + override val server = ServerInfo( attributes[ResourceAttributes.HOST_ID]!!, attributes[ResourceAttributes.HOST_NAME]!!, attributes[ResourceAttributes.HOST_TYPE]!!, @@ -412,70 +437,76 @@ public class ComputeMetricAggregator { /** * The [HostInfo] of the host on which the server is hosted. */ - @JvmField var host: HostInfo? = null + override val host: HostInfo? + get() = _host + @JvmField var _host: HostInfo? = null - private var timestamp = Long.MIN_VALUE - @JvmField var uptime: Long = 0 + private var _timestamp = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + override val uptime: Long + get() = _uptime - previousUptime + @JvmField var _uptime: Long = 0 private var previousUptime = 0L - @JvmField var downtime: Long = 0 + + override val downtime: Long + get() = _downtime - previousDowntime + @JvmField var _downtime: Long = 0 private var previousDowntime = 0L - @JvmField var provisionTime: Instant? = null - @JvmField var bootTime: Instant? = null - @JvmField var cpuLimit = 0.0 - @JvmField var cpuActiveTime = 0L - @JvmField var cpuIdleTime = 0L - @JvmField var cpuStealTime = 0L - @JvmField var cpuLostTime = 0L + + override val provisionTime: Instant? + get() = _provisionTime + @JvmField var _provisionTime: Instant? = null + + override val bootTime: Instant? + get() = _bootTime + @JvmField var _bootTime: Instant? = null + + override val cpuLimit: Double + get() = _cpuLimit + @JvmField var _cpuLimit = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + @JvmField var _cpuActiveTime = 0L private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + @JvmField var _cpuIdleTime = 0L private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + @JvmField var _cpuStealTime = 0L private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + @JvmField var _cpuLostTime = 0L private var previousCpuLostTime = 0L /** * Finish the aggregation for this cycle. */ - fun collect(): ServerData { - val now = Instant.ofEpochMilli(timestamp) - val data = toServerData(now) - - previousUptime = uptime - previousDowntime = downtime + fun reset() { + previousUptime = _uptime + previousDowntime = _downtime previousCpuActiveTime = cpuActiveTime previousCpuIdleTime = cpuIdleTime previousCpuStealTime = cpuStealTime previousCpuLostTime = cpuLostTime - host = null - cpuLimit = 0.0 - - return data - } - - /** - * Convert the aggregator state into an immutable [ServerData]. - */ - private fun toServerData(now: Instant): ServerData { - return ServerData( - now, - server, - host, - uptime - previousUptime, - downtime - previousDowntime, - provisionTime, - bootTime, - cpuLimit, - cpuActiveTime - previousCpuActiveTime, - cpuIdleTime - previousCpuIdleTime, - cpuStealTime - previousCpuStealTime, - cpuLostTime - previousCpuLostTime - ) + _host = null + _cpuLimit = 0.0 } /** * Record the timestamp of a [point] for this aggregator. */ fun recordTimestamp(point: PointData) { - timestamp = point.epochNanos / 1_000_000L // ns to ms + _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms } } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt index d51bcab4..64b5f337 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt @@ -22,26 +22,26 @@ package org.opendc.telemetry.compute -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.ServerData -import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServerTableReader +import org.opendc.telemetry.compute.table.ServiceTableReader /** * A monitor that tracks the metrics and events of the OpenDC Compute service. */ public interface ComputeMonitor { /** - * Record the specified [data]. + * Record an entry with the specified [reader]. */ - public fun record(data: ServerData) {} + public fun record(reader: ServerTableReader) {} /** - * Record the specified [data]. + * Record an entry with the specified [reader]. */ - public fun record(data: HostData) {} + public fun record(reader: HostTableReader) {} /** - * Record the specified [data]. + * Record an entry with the specified [reader]. */ - public fun record(data: ServiceData) {} + public fun record(reader: ServiceTableReader) {} } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt index ce89061b..41315b15 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt @@ -24,6 +24,8 @@ package org.opendc.telemetry.compute import io.opentelemetry.sdk.metrics.export.MetricProducer import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.ServiceTableReader +import org.opendc.telemetry.compute.table.toServiceData /** * Collect the metrics of the compute service. @@ -32,8 +34,8 @@ public fun collectServiceMetrics(metricProducer: MetricProducer): ServiceData { lateinit var serviceData: ServiceData val agg = ComputeMetricAggregator() val monitor = object : ComputeMonitor { - override fun record(data: ServiceData) { - serviceData = data + override fun record(reader: ServiceTableReader) { + serviceData = reader.toServiceData() } } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt new file mode 100644 index 00000000..1e1ad94e --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a host trace entry. + */ +public interface HostTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The [HostInfo] of the host to which the row belongs to. + */ + public val host: HostInfo + + /** + * The number of guests that are in a terminated state. + */ + public val guestsTerminated: Int + + /** + * The number of guests that are in a running state. + */ + public val guestsRunning: Int + + /** + * The number of guests that are in an error state. + */ + public val guestsError: Int + + /** + * The number of guests that are in an unknown state. + */ + public val guestsInvalid: Int + + /** + * The capacity of the CPUs in the host (in MHz). + */ + public val cpuLimit: Double + + /** + * The usage of all CPUs in the host (in MHz). + */ + public val cpuUsage: Double + + /** + * The demand of all vCPUs of the guests (in MHz) + */ + public val cpuDemand: Double + + /** + * The CPU utilization of the host. + */ + public val cpuUtilization: Double + + /** + * The duration (in seconds) that a CPU was active in the host. + */ + public val cpuActiveTime: Long + + /** + * The duration (in seconds) that a CPU was idle in the host. + */ + public val cpuIdleTime: Long + + /** + * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. + */ + public val cpuStealTime: Long + + /** + * The duration (in seconds) of CPU time that was lost due to interference. + */ + public val cpuLostTime: Long + + /** + * The current power usage of the host in W. + */ + public val powerUsage: Double + + /** + * The total power consumption of the host since last time in J. + */ + public val powerTotal: Double + + /** + * The uptime of the host since last time in ms. + */ + public val uptime: Long + + /** + * The downtime of the host since last time in ms. + */ + public val downtime: Long + + /** + * The [Instant] at which the host booted. + */ + public val bootTime: Instant? +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt deleted file mode 100644 index 6fd2a81b..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.telemetry.compute.table - -import java.time.Instant - -/** - * A trace entry for a particular server. - */ -public data class ServerData( - val timestamp: Instant, - val server: ServerInfo, - val host: HostInfo?, - val uptime: Long, - val downtime: Long, - val provisionTime: Instant?, - val bootTime: Instant?, - val cpuLimit: Double, - val cpuActiveTime: Long, - val cpuIdleTime: Long, - val cpuStealTime: Long, - val cpuLostTime: Long, -) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt new file mode 100644 index 00000000..c23d1467 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a server trace entry. + */ +public interface ServerTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The [ServerInfo] of the server to which the row belongs to. + */ + public val server: ServerInfo + + /** + * The [HostInfo] of the host on which the server is hosted or `null` if it has no host. + */ + public val host: HostInfo? + + /** + * The uptime of the host since last time in ms. + */ + public val uptime: Long + + /** + * The downtime of the host since last time in ms. + */ + public val downtime: Long + + /** + * The [Instant] at which the server was enqueued for the scheduler. + */ + public val provisionTime: Instant? + + /** + * The [Instant] at which the server booted. + */ + public val bootTime: Instant? + + /** + * The capacity of the CPUs of the servers (in MHz). + */ + public val cpuLimit: Double + + /** + * The duration (in seconds) that a CPU was active in the server. + */ + public val cpuActiveTime: Long + + /** + * The duration (in seconds) that a CPU was idle in the server. + */ + public val cpuIdleTime: Long + + /** + * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. + */ + public val cpuStealTime: Long + + /** + * The duration (in seconds) of CPU time that was lost due to interference. + */ + public val cpuLostTime: Long +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt index 6db1399d..39bf96f4 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt @@ -37,3 +37,10 @@ public data class ServiceData( val attemptsFailure: Int, val attemptsError: Int ) + +/** + * Convert a [ServiceTableReader] into a persistent object. + */ +public fun ServiceTableReader.toServiceData(): ServiceData { + return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt index 8e787b97..908f6748 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt @@ -25,26 +25,46 @@ package org.opendc.telemetry.compute.table import java.time.Instant /** - * A trace entry for a particular host. + * An interface that is used to read a row of a service trace entry. */ -public data class HostData( - val timestamp: Instant, - val host: HostInfo, - val guestsTerminated: Int, - val guestsRunning: Int, - val guestsError: Int, - val guestsInvalid: Int, - val cpuLimit: Double, - val cpuUsage: Double, - val cpuDemand: Double, - val cpuUtilization: Double, - val cpuActiveTime: Long, - val cpuIdleTime: Long, - val cpuStealTime: Long, - val cpuLostTime: Long, - val powerUsage: Double, - val powerTotal: Double, - val uptime: Long, - val downtime: Long, - val bootTime: Instant? -) +public interface ServiceTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The number of hosts that are up at this instant. + */ + public val hostsUp: Int + + /** + * The number of hosts that are down at this instant. + */ + public val hostsDown: Int + + /** + * The number of servers that are pending to be scheduled. + */ + public val serversPending: Int + + /** + * The number of servers that are currently active. + */ + public val serversActive: Int + + /** + * The scheduling attempts that were successful. + */ + public val attemptsSuccess: Int + + /** + * The scheduling attempts that were unsuccessful due to client error. + */ + public val attemptsFailure: Int + + /** + * The scheduling attempts that were unsuccessful due to scheduler error. + */ + public val attemptsError: Int +} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt index 7913660d..d39a0c74 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt @@ -24,8 +24,8 @@ package org.opendc.web.runner import org.opendc.telemetry.compute.ComputeMetricExporter import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServiceTableReader import kotlin.math.max import kotlin.math.roundToLong @@ -33,24 +33,24 @@ import kotlin.math.roundToLong * A [ComputeMonitor] that tracks the aggregate metrics for each repeat. */ class WebComputeMetricExporter : ComputeMetricExporter() { - override fun record(data: HostData) { - val slices = data.downtime / SLICE_LENGTH + override fun record(reader: HostTableReader) { + val slices = reader.downtime / SLICE_LENGTH hostAggregateMetrics = AggregateHostMetrics( - hostAggregateMetrics.totalActiveTime + data.cpuActiveTime, - hostAggregateMetrics.totalIdleTime + data.cpuIdleTime, - hostAggregateMetrics.totalStealTime + data.cpuStealTime, - hostAggregateMetrics.totalLostTime + data.cpuLostTime, - hostAggregateMetrics.totalPowerDraw + data.powerTotal, + hostAggregateMetrics.totalActiveTime + reader.cpuActiveTime, + hostAggregateMetrics.totalIdleTime + reader.cpuIdleTime, + hostAggregateMetrics.totalStealTime + reader.cpuStealTime, + hostAggregateMetrics.totalLostTime + reader.cpuLostTime, + hostAggregateMetrics.totalPowerDraw + reader.powerTotal, hostAggregateMetrics.totalFailureSlices + slices, - hostAggregateMetrics.totalFailureVmSlices + data.guestsRunning * slices + hostAggregateMetrics.totalFailureVmSlices + reader.guestsRunning * slices ) - hostMetrics.compute(data.host.id) { _, prev -> + hostMetrics.compute(reader.host.id) { _, prev -> HostMetrics( - data.cpuUsage + (prev?.cpuUsage ?: 0.0), - data.cpuDemand + (prev?.cpuDemand ?: 0.0), - data.guestsRunning + (prev?.instanceCount ?: 0), + reader.cpuUsage + (prev?.cpuUsage ?: 0.0), + reader.cpuDemand + (prev?.cpuDemand ?: 0.0), + reader.guestsRunning + (prev?.instanceCount ?: 0), 1 + (prev?.count ?: 0) ) } @@ -79,13 +79,13 @@ class WebComputeMetricExporter : ComputeMetricExporter() { private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics() - override fun record(data: ServiceData) { + override fun record(reader: ServiceTableReader) { serviceMetrics = AggregateServiceMetrics( - max(data.attemptsSuccess, serviceMetrics.vmTotalCount), - max(data.serversPending, serviceMetrics.vmWaitingCount), - max(data.serversActive, serviceMetrics.vmActiveCount), + max(reader.attemptsSuccess, serviceMetrics.vmTotalCount), + max(reader.serversPending, serviceMetrics.vmWaitingCount), + max(reader.serversActive, serviceMetrics.vmActiveCount), max(0, serviceMetrics.vmInactiveCount), - max(data.attemptsFailure, serviceMetrics.vmFailedCount), + max(reader.attemptsFailure, serviceMetrics.vmFailedCount), ) } |
