summaryrefslogtreecommitdiff
path: root/opendc-telemetry
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-15 15:32:11 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-25 17:58:54 +0200
commit17951889c6d805b907d936d54e7e66efb7376879 (patch)
treee1c54d4a381b708da5fe964ecd5897b55a63fa42 /opendc-telemetry
parentf565afb1ef7b940804af62aa73b6859dcb78a847 (diff)
perf(telemetry): Prevent allocations during collection cycle
This change redesigns the ComputeMonitor interface to reduce the number of memory allocations necessary during a collection cycle.
Diffstat (limited to 'opendc-telemetry')
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt393
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt18
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt6
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt125
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt43
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt90
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt7
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt (renamed from opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt)64
8 files changed, 489 insertions, 257 deletions
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
index b293f7b5..418dc201 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
@@ -20,6 +20,8 @@
* SOFTWARE.
*/
+@file:Suppress("PropertyName")
+
package org.opendc.telemetry.compute
import io.opentelemetry.api.common.AttributeKey
@@ -32,7 +34,7 @@ import org.opendc.telemetry.compute.table.*
import java.time.Instant
/**
- * Helper class responsible for aggregating [MetricData] into [ServiceData], [HostData] and [ServerData].
+ * Helper class responsible for aggregating [MetricData] into [ServiceTableReader], [HostTableReader] and [ServerTableReader].
*/
public class ComputeMetricAggregator {
private val _service = ServiceAggregator()
@@ -58,25 +60,25 @@ public class ComputeMetricAggregator {
service.recordTimestamp(point)
when (point.attributes[STATE_KEY]) {
- "up" -> service.hostsUp = point.value.toInt()
- "down" -> service.hostsDown = point.value.toInt()
+ "up" -> service._hostsUp = point.value.toInt()
+ "down" -> service._hostsDown = point.value.toInt()
}
}
}
"scheduler.servers" -> {
for (point in metric.longSumData.points) {
when (point.attributes[STATE_KEY]) {
- "pending" -> service.serversPending = point.value.toInt()
- "active" -> service.serversActive = point.value.toInt()
+ "pending" -> service._serversPending = point.value.toInt()
+ "active" -> service._serversActive = point.value.toInt()
}
}
}
"scheduler.attempts" -> {
for (point in metric.longSumData.points) {
when (point.attributes[RESULT_KEY]) {
- "success" -> service.attemptsSuccess = point.value.toInt()
- "failure" -> service.attemptsFailure = point.value.toInt()
- "error" -> service.attemptsError = point.value.toInt()
+ "success" -> service._attemptsSuccess = point.value.toInt()
+ "failure" -> service._attemptsFailure = point.value.toInt()
+ "error" -> service._attemptsError = point.value.toInt()
}
}
}
@@ -87,10 +89,10 @@ public class ComputeMetricAggregator {
for (point in metric.longSumData.points) {
when (point.attributes[STATE_KEY]) {
- "terminated" -> agg.guestsTerminated = point.value.toInt()
- "running" -> agg.guestsRunning = point.value.toInt()
- "error" -> agg.guestsRunning = point.value.toInt()
- "invalid" -> agg.guestsInvalid = point.value.toInt()
+ "terminated" -> agg._guestsTerminated = point.value.toInt()
+ "running" -> agg._guestsRunning = point.value.toInt()
+ "error" -> agg._guestsRunning = point.value.toInt()
+ "invalid" -> agg._guestsInvalid = point.value.toInt()
}
}
}
@@ -101,24 +103,24 @@ public class ComputeMetricAggregator {
val server = getServer(servers, point)
if (server != null) {
- server.cpuLimit = point.value
- server.host = agg.host
+ server._cpuLimit = point.value
+ server._host = agg.host
} else {
- agg.cpuLimit = point.value
+ agg._cpuLimit = point.value
}
}
}
"system.cpu.usage" -> {
val agg = getHost(hosts, resource) ?: continue
- agg.cpuUsage = metric.doubleGaugeData.points.first().value
+ agg._cpuUsage = metric.doubleGaugeData.points.first().value
}
"system.cpu.demand" -> {
val agg = getHost(hosts, resource) ?: continue
- agg.cpuDemand = metric.doubleGaugeData.points.first().value
+ agg._cpuDemand = metric.doubleGaugeData.points.first().value
}
"system.cpu.utilization" -> {
val agg = getHost(hosts, resource) ?: continue
- agg.cpuUtilization = metric.doubleGaugeData.points.first().value
+ agg._cpuUtilization = metric.doubleGaugeData.points.first().value
}
"system.cpu.time" -> {
val agg = getHost(hosts, resource) ?: continue
@@ -128,29 +130,29 @@ public class ComputeMetricAggregator {
val state = point.attributes[STATE_KEY]
if (server != null) {
when (state) {
- "active" -> server.cpuActiveTime = point.value
- "idle" -> server.cpuIdleTime = point.value
- "steal" -> server.cpuStealTime = point.value
- "lost" -> server.cpuLostTime = point.value
+ "active" -> server._cpuActiveTime = point.value
+ "idle" -> server._cpuIdleTime = point.value
+ "steal" -> server._cpuStealTime = point.value
+ "lost" -> server._cpuLostTime = point.value
}
- server.host = agg.host
+ server._host = agg.host
} else {
when (state) {
- "active" -> agg.cpuActiveTime = point.value
- "idle" -> agg.cpuIdleTime = point.value
- "steal" -> agg.cpuStealTime = point.value
- "lost" -> agg.cpuLostTime = point.value
+ "active" -> agg._cpuActiveTime = point.value
+ "idle" -> agg._cpuIdleTime = point.value
+ "steal" -> agg._cpuStealTime = point.value
+ "lost" -> agg._cpuLostTime = point.value
}
}
}
}
"system.power.usage" -> {
val agg = getHost(hosts, resource) ?: continue
- agg.powerUsage = metric.doubleGaugeData.points.first().value
+ agg._powerUsage = metric.doubleGaugeData.points.first().value
}
"system.power.total" -> {
val agg = getHost(hosts, resource) ?: continue
- agg.powerTotal = metric.doubleSumData.points.first().value
+ agg._powerTotal = metric.doubleSumData.points.first().value
}
"system.time" -> {
val agg = getHost(hosts, resource) ?: continue
@@ -162,16 +164,16 @@ public class ComputeMetricAggregator {
server.recordTimestamp(point)
when (point.attributes[STATE_KEY]) {
- "up" -> server.uptime = point.value
- "down" -> server.downtime = point.value
+ "up" -> server._uptime = point.value
+ "down" -> server._downtime = point.value
}
- server.host = agg.host
+ server._host = agg.host
} else {
agg.recordTimestamp(point)
when (point.attributes[STATE_KEY]) {
- "up" -> agg.uptime = point.value
- "down" -> agg.downtime = point.value
+ "up" -> agg._uptime = point.value
+ "down" -> agg._downtime = point.value
}
}
}
@@ -183,10 +185,10 @@ public class ComputeMetricAggregator {
val server = getServer(servers, point)
if (server != null) {
- server.bootTime = Instant.ofEpochMilli(point.value)
- server.host = agg.host
+ server._bootTime = Instant.ofEpochMilli(point.value)
+ server._host = agg.host
} else {
- agg.bootTime = Instant.ofEpochMilli(point.value)
+ agg._bootTime = Instant.ofEpochMilli(point.value)
}
}
}
@@ -194,7 +196,7 @@ public class ComputeMetricAggregator {
for (point in metric.longGaugeData.points) {
val server = getServer(servers, point) ?: continue
server.recordTimestamp(point)
- server.provisionTime = Instant.ofEpochMilli(point.value)
+ server._provisionTime = Instant.ofEpochMilli(point.value)
}
}
}
@@ -205,14 +207,16 @@ public class ComputeMetricAggregator {
* Collect the data via the [monitor].
*/
public fun collect(monitor: ComputeMonitor) {
- monitor.record(_service.collect())
+ monitor.record(_service)
for (host in _hosts.values) {
- monitor.record(host.collect())
+ monitor.record(host)
+ host.reset()
}
for (server in _servers.values) {
- monitor.record(server.collect())
+ monitor.record(server)
+ server.reset()
}
}
@@ -243,50 +247,55 @@ public class ComputeMetricAggregator {
/**
* An aggregator for service metrics before they are reported.
*/
- internal class ServiceAggregator {
- private var timestamp = Long.MIN_VALUE
+ internal class ServiceAggregator : ServiceTableReader {
+ private var _timestamp: Instant = Instant.MIN
+ override val timestamp: Instant
+ get() = _timestamp
- @JvmField var hostsUp = 0
- @JvmField var hostsDown = 0
+ override val hostsUp: Int
+ get() = _hostsUp
+ @JvmField var _hostsUp = 0
- @JvmField var serversPending = 0
- @JvmField var serversActive = 0
+ override val hostsDown: Int
+ get() = _hostsDown
+ @JvmField var _hostsDown = 0
- @JvmField var attemptsSuccess = 0
- @JvmField var attemptsFailure = 0
- @JvmField var attemptsError = 0
+ override val serversPending: Int
+ get() = _serversPending
+ @JvmField var _serversPending = 0
- /**
- * Finish the aggregation for this cycle.
- */
- fun collect(): ServiceData {
- val now = Instant.ofEpochMilli(timestamp)
- return toServiceData(now)
- }
+ override val serversActive: Int
+ get() = _serversActive
+ @JvmField var _serversActive = 0
- /**
- * Convert the aggregator state to an immutable [ServiceData].
- */
- private fun toServiceData(now: Instant): ServiceData {
- return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError)
- }
+ override val attemptsSuccess: Int
+ get() = _attemptsSuccess
+ @JvmField var _attemptsSuccess = 0
+
+ override val attemptsFailure: Int
+ get() = _attemptsFailure
+ @JvmField var _attemptsFailure = 0
+
+ override val attemptsError: Int
+ get() = _attemptsError
+ @JvmField var _attemptsError = 0
/**
* Record the timestamp of a [point] for this aggregator.
*/
fun recordTimestamp(point: PointData) {
- timestamp = point.epochNanos / 1_000_000L // ns to ms
+ _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms
}
}
/**
* An aggregator for host metrics before they are reported.
*/
- internal class HostAggregator(resource: Resource) {
+ internal class HostAggregator(resource: Resource) : HostTableReader {
/**
* The static information about this host.
*/
- val host = HostInfo(
+ override val host = HostInfo(
resource.attributes[HOST_ID]!!,
resource.attributes[HOST_NAME] ?: "",
resource.attributes[HOST_ARCH] ?: "",
@@ -294,111 +303,127 @@ public class ComputeMetricAggregator {
resource.attributes[HOST_MEM_CAPACITY] ?: 0,
)
- private var timestamp = Long.MIN_VALUE
+ override val timestamp: Instant
+ get() = _timestamp
+ private var _timestamp = Instant.MIN
+
+ override val guestsTerminated: Int
+ get() = _guestsTerminated
+ @JvmField var _guestsTerminated = 0
+
+ override val guestsRunning: Int
+ get() = _guestsRunning
+ @JvmField var _guestsRunning = 0
+
+ override val guestsError: Int
+ get() = _guestsError
+ @JvmField var _guestsError = 0
- @JvmField var guestsTerminated = 0
- @JvmField var guestsRunning = 0
- @JvmField var guestsError = 0
- @JvmField var guestsInvalid = 0
+ override val guestsInvalid: Int
+ get() = _guestsInvalid
+ @JvmField var _guestsInvalid = 0
- @JvmField var cpuLimit = 0.0
- @JvmField var cpuUsage = 0.0
- @JvmField var cpuDemand = 0.0
- @JvmField var cpuUtilization = 0.0
+ override val cpuLimit: Double
+ get() = _cpuLimit
+ @JvmField var _cpuLimit = 0.0
- @JvmField var cpuActiveTime = 0L
- @JvmField var cpuIdleTime = 0L
- @JvmField var cpuStealTime = 0L
- @JvmField var cpuLostTime = 0L
+ override val cpuUsage: Double
+ get() = _cpuUsage
+ @JvmField var _cpuUsage = 0.0
+
+ override val cpuDemand: Double
+ get() = _cpuDemand
+ @JvmField var _cpuDemand = 0.0
+
+ override val cpuUtilization: Double
+ get() = _cpuUtilization
+ @JvmField var _cpuUtilization = 0.0
+
+ override val cpuActiveTime: Long
+ get() = _cpuActiveTime - previousCpuActiveTime
+ @JvmField var _cpuActiveTime = 0L
private var previousCpuActiveTime = 0L
+
+ override val cpuIdleTime: Long
+ get() = _cpuIdleTime - previousCpuIdleTime
+ @JvmField var _cpuIdleTime = 0L
private var previousCpuIdleTime = 0L
+
+ override val cpuStealTime: Long
+ get() = _cpuStealTime - previousCpuStealTime
+ @JvmField var _cpuStealTime = 0L
private var previousCpuStealTime = 0L
+
+ override val cpuLostTime: Long
+ get() = _cpuLostTime - previousCpuLostTime
+ @JvmField var _cpuLostTime = 0L
private var previousCpuLostTime = 0L
- @JvmField var powerUsage = 0.0
- @JvmField var powerTotal = 0.0
+ override val powerUsage: Double
+ get() = _powerUsage
+ @JvmField var _powerUsage = 0.0
+
+ override val powerTotal: Double
+ get() = _powerTotal - previousPowerTotal
+ @JvmField var _powerTotal = 0.0
private var previousPowerTotal = 0.0
- @JvmField var uptime = 0L
+ override val uptime: Long
+ get() = _uptime - previousUptime
+ @JvmField var _uptime = 0L
private var previousUptime = 0L
- @JvmField var downtime = 0L
+
+ override val downtime: Long
+ get() = _downtime - previousDowntime
+ @JvmField var _downtime = 0L
private var previousDowntime = 0L
- @JvmField var bootTime: Instant? = null
+
+ override val bootTime: Instant?
+ get() = _bootTime
+ @JvmField var _bootTime: Instant? = null
/**
* Finish the aggregation for this cycle.
*/
- fun collect(): HostData {
- val now = Instant.ofEpochMilli(timestamp)
- val data = toHostData(now)
-
+ fun reset() {
// Reset intermediate state for next aggregation
- previousCpuActiveTime = cpuActiveTime
- previousCpuIdleTime = cpuIdleTime
- previousCpuStealTime = cpuStealTime
- previousCpuLostTime = cpuLostTime
- previousPowerTotal = powerTotal
- previousUptime = uptime
- previousDowntime = downtime
-
- guestsTerminated = 0
- guestsRunning = 0
- guestsError = 0
- guestsInvalid = 0
-
- cpuLimit = 0.0
- cpuUsage = 0.0
- cpuDemand = 0.0
- cpuUtilization = 0.0
-
- powerUsage = 0.0
-
- return data
- }
-
- /**
- * Convert the aggregator state to an immutable [HostData] instance.
- */
- private fun toHostData(now: Instant): HostData {
- return HostData(
- now,
- host,
- guestsTerminated,
- guestsRunning,
- guestsError,
- guestsInvalid,
- cpuLimit,
- cpuUsage,
- cpuDemand,
- cpuUtilization,
- cpuActiveTime - previousCpuActiveTime,
- cpuIdleTime - previousCpuIdleTime,
- cpuStealTime - previousCpuStealTime,
- cpuLostTime - previousCpuLostTime,
- powerUsage,
- powerTotal - previousPowerTotal,
- uptime - previousUptime,
- downtime - previousDowntime,
- bootTime
- )
+ previousCpuActiveTime = _cpuActiveTime
+ previousCpuIdleTime = _cpuIdleTime
+ previousCpuStealTime = _cpuStealTime
+ previousCpuLostTime = _cpuLostTime
+ previousPowerTotal = _powerTotal
+ previousUptime = _uptime
+ previousDowntime = _downtime
+
+ _guestsTerminated = 0
+ _guestsRunning = 0
+ _guestsError = 0
+ _guestsInvalid = 0
+
+ _cpuLimit = 0.0
+ _cpuUsage = 0.0
+ _cpuDemand = 0.0
+ _cpuUtilization = 0.0
+
+ _powerUsage = 0.0
}
/**
* Record the timestamp of a [point] for this aggregator.
*/
fun recordTimestamp(point: PointData) {
- timestamp = point.epochNanos / 1_000_000L // ns to ms
+ _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms
}
}
/**
* An aggregator for server metrics before they are reported.
*/
- internal class ServerAggregator(attributes: Attributes) {
+ internal class ServerAggregator(attributes: Attributes) : ServerTableReader {
/**
* The static information about this server.
*/
- val server = ServerInfo(
+ override val server = ServerInfo(
attributes[ResourceAttributes.HOST_ID]!!,
attributes[ResourceAttributes.HOST_NAME]!!,
attributes[ResourceAttributes.HOST_TYPE]!!,
@@ -412,70 +437,76 @@ public class ComputeMetricAggregator {
/**
* The [HostInfo] of the host on which the server is hosted.
*/
- @JvmField var host: HostInfo? = null
+ override val host: HostInfo?
+ get() = _host
+ @JvmField var _host: HostInfo? = null
- private var timestamp = Long.MIN_VALUE
- @JvmField var uptime: Long = 0
+ private var _timestamp = Instant.MIN
+ override val timestamp: Instant
+ get() = _timestamp
+
+ override val uptime: Long
+ get() = _uptime - previousUptime
+ @JvmField var _uptime: Long = 0
private var previousUptime = 0L
- @JvmField var downtime: Long = 0
+
+ override val downtime: Long
+ get() = _downtime - previousDowntime
+ @JvmField var _downtime: Long = 0
private var previousDowntime = 0L
- @JvmField var provisionTime: Instant? = null
- @JvmField var bootTime: Instant? = null
- @JvmField var cpuLimit = 0.0
- @JvmField var cpuActiveTime = 0L
- @JvmField var cpuIdleTime = 0L
- @JvmField var cpuStealTime = 0L
- @JvmField var cpuLostTime = 0L
+
+ override val provisionTime: Instant?
+ get() = _provisionTime
+ @JvmField var _provisionTime: Instant? = null
+
+ override val bootTime: Instant?
+ get() = _bootTime
+ @JvmField var _bootTime: Instant? = null
+
+ override val cpuLimit: Double
+ get() = _cpuLimit
+ @JvmField var _cpuLimit = 0.0
+
+ override val cpuActiveTime: Long
+ get() = _cpuActiveTime - previousCpuActiveTime
+ @JvmField var _cpuActiveTime = 0L
private var previousCpuActiveTime = 0L
+
+ override val cpuIdleTime: Long
+ get() = _cpuIdleTime - previousCpuIdleTime
+ @JvmField var _cpuIdleTime = 0L
private var previousCpuIdleTime = 0L
+
+ override val cpuStealTime: Long
+ get() = _cpuStealTime - previousCpuStealTime
+ @JvmField var _cpuStealTime = 0L
private var previousCpuStealTime = 0L
+
+ override val cpuLostTime: Long
+ get() = _cpuLostTime - previousCpuLostTime
+ @JvmField var _cpuLostTime = 0L
private var previousCpuLostTime = 0L
/**
* Finish the aggregation for this cycle.
*/
- fun collect(): ServerData {
- val now = Instant.ofEpochMilli(timestamp)
- val data = toServerData(now)
-
- previousUptime = uptime
- previousDowntime = downtime
+ fun reset() {
+ previousUptime = _uptime
+ previousDowntime = _downtime
previousCpuActiveTime = cpuActiveTime
previousCpuIdleTime = cpuIdleTime
previousCpuStealTime = cpuStealTime
previousCpuLostTime = cpuLostTime
- host = null
- cpuLimit = 0.0
-
- return data
- }
-
- /**
- * Convert the aggregator state into an immutable [ServerData].
- */
- private fun toServerData(now: Instant): ServerData {
- return ServerData(
- now,
- server,
- host,
- uptime - previousUptime,
- downtime - previousDowntime,
- provisionTime,
- bootTime,
- cpuLimit,
- cpuActiveTime - previousCpuActiveTime,
- cpuIdleTime - previousCpuIdleTime,
- cpuStealTime - previousCpuStealTime,
- cpuLostTime - previousCpuLostTime
- )
+ _host = null
+ _cpuLimit = 0.0
}
/**
* Record the timestamp of a [point] for this aggregator.
*/
fun recordTimestamp(point: PointData) {
- timestamp = point.epochNanos / 1_000_000L // ns to ms
+ _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms
}
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
index d51bcab4..64b5f337 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
@@ -22,26 +22,26 @@
package org.opendc.telemetry.compute
-import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.table.ServerData
-import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.compute.table.HostTableReader
+import org.opendc.telemetry.compute.table.ServerTableReader
+import org.opendc.telemetry.compute.table.ServiceTableReader
/**
* A monitor that tracks the metrics and events of the OpenDC Compute service.
*/
public interface ComputeMonitor {
/**
- * Record the specified [data].
+ * Record an entry with the specified [reader].
*/
- public fun record(data: ServerData) {}
+ public fun record(reader: ServerTableReader) {}
/**
- * Record the specified [data].
+ * Record an entry with the specified [reader].
*/
- public fun record(data: HostData) {}
+ public fun record(reader: HostTableReader) {}
/**
- * Record the specified [data].
+ * Record an entry with the specified [reader].
*/
- public fun record(data: ServiceData) {}
+ public fun record(reader: ServiceTableReader) {}
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
index ce89061b..41315b15 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
@@ -24,6 +24,8 @@ package org.opendc.telemetry.compute
import io.opentelemetry.sdk.metrics.export.MetricProducer
import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.compute.table.ServiceTableReader
+import org.opendc.telemetry.compute.table.toServiceData
/**
* Collect the metrics of the compute service.
@@ -32,8 +34,8 @@ public fun collectServiceMetrics(metricProducer: MetricProducer): ServiceData {
lateinit var serviceData: ServiceData
val agg = ComputeMetricAggregator()
val monitor = object : ComputeMonitor {
- override fun record(data: ServiceData) {
- serviceData = data
+ override fun record(reader: ServiceTableReader) {
+ serviceData = reader.toServiceData()
}
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt
new file mode 100644
index 00000000..1e1ad94e
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute.table
+
+import java.time.Instant
+
+/**
+ * An interface that is used to read a row of a host trace entry.
+ */
+public interface HostTableReader {
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestamp: Instant
+
+ /**
+ * The [HostInfo] of the host to which the row belongs to.
+ */
+ public val host: HostInfo
+
+ /**
+ * The number of guests that are in a terminated state.
+ */
+ public val guestsTerminated: Int
+
+ /**
+ * The number of guests that are in a running state.
+ */
+ public val guestsRunning: Int
+
+ /**
+ * The number of guests that are in an error state.
+ */
+ public val guestsError: Int
+
+ /**
+ * The number of guests that are in an unknown state.
+ */
+ public val guestsInvalid: Int
+
+ /**
+ * The capacity of the CPUs in the host (in MHz).
+ */
+ public val cpuLimit: Double
+
+ /**
+ * The usage of all CPUs in the host (in MHz).
+ */
+ public val cpuUsage: Double
+
+ /**
+ * The demand of all vCPUs of the guests (in MHz)
+ */
+ public val cpuDemand: Double
+
+ /**
+ * The CPU utilization of the host.
+ */
+ public val cpuUtilization: Double
+
+ /**
+ * The duration (in seconds) that a CPU was active in the host.
+ */
+ public val cpuActiveTime: Long
+
+ /**
+ * The duration (in seconds) that a CPU was idle in the host.
+ */
+ public val cpuIdleTime: Long
+
+ /**
+ * The duration (in seconds) that a vCPU wanted to run, but no capacity was available.
+ */
+ public val cpuStealTime: Long
+
+ /**
+ * The duration (in seconds) of CPU time that was lost due to interference.
+ */
+ public val cpuLostTime: Long
+
+ /**
+ * The current power usage of the host in W.
+ */
+ public val powerUsage: Double
+
+ /**
+ * The total power consumption of the host since last time in J.
+ */
+ public val powerTotal: Double
+
+ /**
+ * The uptime of the host since last time in ms.
+ */
+ public val uptime: Long
+
+ /**
+ * The downtime of the host since last time in ms.
+ */
+ public val downtime: Long
+
+ /**
+ * The [Instant] at which the host booted.
+ */
+ public val bootTime: Instant?
+}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
deleted file mode 100644
index 6fd2a81b..00000000
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.telemetry.compute.table
-
-import java.time.Instant
-
-/**
- * A trace entry for a particular server.
- */
-public data class ServerData(
- val timestamp: Instant,
- val server: ServerInfo,
- val host: HostInfo?,
- val uptime: Long,
- val downtime: Long,
- val provisionTime: Instant?,
- val bootTime: Instant?,
- val cpuLimit: Double,
- val cpuActiveTime: Long,
- val cpuIdleTime: Long,
- val cpuStealTime: Long,
- val cpuLostTime: Long,
-)
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt
new file mode 100644
index 00000000..c23d1467
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute.table
+
+import java.time.Instant
+
+/**
+ * An interface that is used to read a row of a server trace entry.
+ */
+public interface ServerTableReader {
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestamp: Instant
+
+ /**
+ * The [ServerInfo] of the server to which the row belongs to.
+ */
+ public val server: ServerInfo
+
+ /**
+ * The [HostInfo] of the host on which the server is hosted or `null` if it has no host.
+ */
+ public val host: HostInfo?
+
+ /**
+ * The uptime of the host since last time in ms.
+ */
+ public val uptime: Long
+
+ /**
+ * The downtime of the host since last time in ms.
+ */
+ public val downtime: Long
+
+ /**
+ * The [Instant] at which the server was enqueued for the scheduler.
+ */
+ public val provisionTime: Instant?
+
+ /**
+ * The [Instant] at which the server booted.
+ */
+ public val bootTime: Instant?
+
+ /**
+ * The capacity of the CPUs of the servers (in MHz).
+ */
+ public val cpuLimit: Double
+
+ /**
+ * The duration (in seconds) that a CPU was active in the server.
+ */
+ public val cpuActiveTime: Long
+
+ /**
+ * The duration (in seconds) that a CPU was idle in the server.
+ */
+ public val cpuIdleTime: Long
+
+ /**
+ * The duration (in seconds) that a vCPU wanted to run, but no capacity was available.
+ */
+ public val cpuStealTime: Long
+
+ /**
+ * The duration (in seconds) of CPU time that was lost due to interference.
+ */
+ public val cpuLostTime: Long
+}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
index 6db1399d..39bf96f4 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
@@ -37,3 +37,10 @@ public data class ServiceData(
val attemptsFailure: Int,
val attemptsError: Int
)
+
+/**
+ * Convert a [ServiceTableReader] into a persistent object.
+ */
+public fun ServiceTableReader.toServiceData(): ServiceData {
+ return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError)
+}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt
index 8e787b97..908f6748 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt
@@ -25,26 +25,46 @@ package org.opendc.telemetry.compute.table
import java.time.Instant
/**
- * A trace entry for a particular host.
+ * An interface that is used to read a row of a service trace entry.
*/
-public data class HostData(
- val timestamp: Instant,
- val host: HostInfo,
- val guestsTerminated: Int,
- val guestsRunning: Int,
- val guestsError: Int,
- val guestsInvalid: Int,
- val cpuLimit: Double,
- val cpuUsage: Double,
- val cpuDemand: Double,
- val cpuUtilization: Double,
- val cpuActiveTime: Long,
- val cpuIdleTime: Long,
- val cpuStealTime: Long,
- val cpuLostTime: Long,
- val powerUsage: Double,
- val powerTotal: Double,
- val uptime: Long,
- val downtime: Long,
- val bootTime: Instant?
-)
+public interface ServiceTableReader {
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestamp: Instant
+
+ /**
+ * The number of hosts that are up at this instant.
+ */
+ public val hostsUp: Int
+
+ /**
+ * The number of hosts that are down at this instant.
+ */
+ public val hostsDown: Int
+
+ /**
+ * The number of servers that are pending to be scheduled.
+ */
+ public val serversPending: Int
+
+ /**
+ * The number of servers that are currently active.
+ */
+ public val serversActive: Int
+
+ /**
+ * The scheduling attempts that were successful.
+ */
+ public val attemptsSuccess: Int
+
+ /**
+ * The scheduling attempts that were unsuccessful due to client error.
+ */
+ public val attemptsFailure: Int
+
+ /**
+ * The scheduling attempts that were unsuccessful due to scheduler error.
+ */
+ public val attemptsError: Int
+}