summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt28
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt18
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt23
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt8
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt8
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt8
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt16
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt393
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt18
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt6
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt125
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt43
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt90
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt7
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt (renamed from opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt)64
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt38
16 files changed, 559 insertions, 334 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index a0ff9228..799a8cf0 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -46,8 +46,8 @@ import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.flow.FlowEngine
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.HOST_ID
-import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.table.ServerData
+import org.opendc.telemetry.compute.table.HostTableReader
+import org.opendc.telemetry.compute.table.ServerTableReader
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.telemetry.sdk.toOtelClock
import java.time.Duration
@@ -140,10 +140,10 @@ internal class SimHostTest {
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
object : ComputeMetricExporter() {
- override fun record(data: HostData) {
- activeTime += data.cpuActiveTime
- idleTime += data.cpuIdleTime
- stealTime += data.cpuStealTime
+ override fun record(reader: HostTableReader) {
+ activeTime += reader.cpuActiveTime
+ idleTime += reader.cpuIdleTime
+ stealTime += reader.cpuStealTime
}
},
exportInterval = Duration.ofSeconds(duration)
@@ -236,16 +236,16 @@ internal class SimHostTest {
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
object : ComputeMetricExporter() {
- override fun record(data: HostData) {
- activeTime += data.cpuActiveTime
- idleTime += data.cpuIdleTime
- uptime += data.uptime
- downtime += data.downtime
+ override fun record(reader: HostTableReader) {
+ activeTime += reader.cpuActiveTime
+ idleTime += reader.cpuIdleTime
+ uptime += reader.uptime
+ downtime += reader.downtime
}
- override fun record(data: ServerData) {
- guestUptime += data.uptime
- guestDowntime += data.downtime
+ override fun record(reader: ServerTableReader) {
+ guestUptime += reader.uptime
+ guestDowntime += reader.downtime
}
},
exportInterval = Duration.ofSeconds(duration)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt
index ad182d67..a46885f4 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt
@@ -25,9 +25,9 @@ package org.opendc.compute.workload.export.parquet
import io.opentelemetry.sdk.common.CompletableResultCode
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
-import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.table.ServerData
-import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.compute.table.HostTableReader
+import org.opendc.telemetry.compute.table.ServerTableReader
+import org.opendc.telemetry.compute.table.ServiceTableReader
import java.io.File
/**
@@ -49,16 +49,16 @@ public class ParquetComputeMetricExporter(base: File, partition: String, bufferS
bufferSize
)
- override fun record(data: ServerData) {
- serverWriter.write(data)
+ override fun record(reader: ServerTableReader) {
+ serverWriter.write(reader)
}
- override fun record(data: HostData) {
- hostWriter.write(data)
+ override fun record(reader: HostTableReader) {
+ hostWriter.write(reader)
}
- override fun record(data: ServiceData) {
- serviceWriter.write(data)
+ override fun record(reader: ServiceTableReader) {
+ serviceWriter.write(reader)
}
override fun shutdown(): CompletableResultCode {
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
index 4172d729..84387bbc 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
@@ -50,9 +50,9 @@ public abstract class ParquetDataWriter<in T>(
private val logger = KotlinLogging.logger {}
/**
- * The queue of commands to process.
+ * The queue of records to process.
*/
- private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize)
+ private val queue: BlockingQueue<GenericData.Record> = ArrayBlockingQueue(bufferSize)
/**
* An exception to be propagated to the actual writer.
@@ -72,20 +72,20 @@ public abstract class ParquetDataWriter<in T>(
}
val queue = queue
- val buf = mutableListOf<T>()
+ val buf = mutableListOf<GenericData.Record>()
var shouldStop = false
try {
while (!shouldStop) {
try {
- process(writer, queue.take())
+ writer.write(queue.take())
} catch (e: InterruptedException) {
shouldStop = true
}
if (queue.drainTo(buf) > 0) {
for (data in buf) {
- process(writer, data)
+ writer.write(data)
}
buf.clear()
}
@@ -119,7 +119,9 @@ public abstract class ParquetDataWriter<in T>(
throw IllegalStateException("Writer thread failed", exception)
}
- queue.put(data)
+ val builder = GenericRecordBuilder(schema)
+ convert(builder, data)
+ queue.put(builder.build())
}
/**
@@ -133,13 +135,4 @@ public abstract class ParquetDataWriter<in T>(
init {
writerThread.start()
}
-
- /**
- * Process the specified [data] to be written to the Parquet file.
- */
- private fun process(writer: ParquetWriter<GenericData.Record>, data: T) {
- val builder = GenericRecordBuilder(schema)
- convert(builder, data)
- writer.write(builder.build())
- }
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
index 98a0739e..2b7cac8f 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
@@ -28,17 +28,17 @@ import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
-import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.HostTableReader
import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
import org.opendc.trace.util.parquet.UUID_SCHEMA
import org.opendc.trace.util.parquet.optional
import java.io.File
/**
- * A Parquet event writer for [HostData]s.
+ * A Parquet event writer for [HostTableReader]s.
*/
public class ParquetHostDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<HostData>(path, SCHEMA, bufferSize) {
+ ParquetDataWriter<HostTableReader>(path, SCHEMA, bufferSize) {
override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
return builder
@@ -46,7 +46,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
.build()
}
- override fun convert(builder: GenericRecordBuilder, data: HostData) {
+ override fun convert(builder: GenericRecordBuilder, data: HostTableReader) {
builder["timestamp"] = data.timestamp.toEpochMilli()
builder["host_id"] = data.host.id
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
index 4ebf8c62..144b6624 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
@@ -28,17 +28,17 @@ import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
-import org.opendc.telemetry.compute.table.ServerData
+import org.opendc.telemetry.compute.table.ServerTableReader
import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
import org.opendc.trace.util.parquet.UUID_SCHEMA
import org.opendc.trace.util.parquet.optional
import java.io.File
/**
- * A Parquet event writer for [ServerData]s.
+ * A Parquet event writer for [ServerTableReader]s.
*/
public class ParquetServerDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServerData>(path, SCHEMA, bufferSize) {
+ ParquetDataWriter<ServerTableReader>(path, SCHEMA, bufferSize) {
override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
return builder
@@ -47,7 +47,7 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
.build()
}
- override fun convert(builder: GenericRecordBuilder, data: ServerData) {
+ override fun convert(builder: GenericRecordBuilder, data: ServerTableReader) {
builder["timestamp"] = data.timestamp.toEpochMilli()
builder["server_id"] = data.server.id
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
index 47824b29..ec8a2b65 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
@@ -25,17 +25,17 @@ package org.opendc.compute.workload.export.parquet
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericRecordBuilder
-import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.compute.table.ServiceTableReader
import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
import java.io.File
/**
- * A Parquet event writer for [ServiceData]s.
+ * A Parquet event writer for [ServiceTableReader]s.
*/
public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServiceData>(path, SCHEMA, bufferSize) {
+ ParquetDataWriter<ServiceTableReader>(path, SCHEMA, bufferSize) {
- override fun convert(builder: GenericRecordBuilder, data: ServiceData) {
+ override fun convert(builder: GenericRecordBuilder, data: ServiceTableReader) {
builder["timestamp"] = data.timestamp.toEpochMilli()
builder["hosts_up"] = data.hostsUp
builder["hosts_down"] = data.hostsDown
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index 56ba9cfe..f3a6ed1a 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -39,7 +39,7 @@ import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
-import org.opendc.telemetry.compute.table.HostData
+import org.opendc.telemetry.compute.table.HostTableReader
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.time.Duration
@@ -284,13 +284,13 @@ class CapelinIntegrationTest {
var energyUsage = 0.0
var uptime = 0L
- override fun record(data: HostData) {
- idleTime += data.cpuIdleTime
- activeTime += data.cpuActiveTime
- stealTime += data.cpuStealTime
- lostTime += data.cpuLostTime
- energyUsage += data.powerTotal
- uptime += data.uptime
+ override fun record(reader: HostTableReader) {
+ idleTime += reader.cpuIdleTime
+ activeTime += reader.cpuActiveTime
+ stealTime += reader.cpuStealTime
+ lostTime += reader.cpuLostTime
+ energyUsage += reader.powerTotal
+ uptime += reader.uptime
}
}
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
index b293f7b5..418dc201 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
@@ -20,6 +20,8 @@
* SOFTWARE.
*/
+@file:Suppress("PropertyName")
+
package org.opendc.telemetry.compute
import io.opentelemetry.api.common.AttributeKey
@@ -32,7 +34,7 @@ import org.opendc.telemetry.compute.table.*
import java.time.Instant
/**
- * Helper class responsible for aggregating [MetricData] into [ServiceData], [HostData] and [ServerData].
+ * Helper class responsible for aggregating [MetricData] into [ServiceTableReader], [HostTableReader] and [ServerTableReader].
*/
public class ComputeMetricAggregator {
private val _service = ServiceAggregator()
@@ -58,25 +60,25 @@ public class ComputeMetricAggregator {
service.recordTimestamp(point)
when (point.attributes[STATE_KEY]) {
- "up" -> service.hostsUp = point.value.toInt()
- "down" -> service.hostsDown = point.value.toInt()
+ "up" -> service._hostsUp = point.value.toInt()
+ "down" -> service._hostsDown = point.value.toInt()
}
}
}
"scheduler.servers" -> {
for (point in metric.longSumData.points) {
when (point.attributes[STATE_KEY]) {
- "pending" -> service.serversPending = point.value.toInt()
- "active" -> service.serversActive = point.value.toInt()
+ "pending" -> service._serversPending = point.value.toInt()
+ "active" -> service._serversActive = point.value.toInt()
}
}
}
"scheduler.attempts" -> {
for (point in metric.longSumData.points) {
when (point.attributes[RESULT_KEY]) {
- "success" -> service.attemptsSuccess = point.value.toInt()
- "failure" -> service.attemptsFailure = point.value.toInt()
- "error" -> service.attemptsError = point.value.toInt()
+ "success" -> service._attemptsSuccess = point.value.toInt()
+ "failure" -> service._attemptsFailure = point.value.toInt()
+ "error" -> service._attemptsError = point.value.toInt()
}
}
}
@@ -87,10 +89,10 @@ public class ComputeMetricAggregator {
for (point in metric.longSumData.points) {
when (point.attributes[STATE_KEY]) {
- "terminated" -> agg.guestsTerminated = point.value.toInt()
- "running" -> agg.guestsRunning = point.value.toInt()
- "error" -> agg.guestsRunning = point.value.toInt()
- "invalid" -> agg.guestsInvalid = point.value.toInt()
+ "terminated" -> agg._guestsTerminated = point.value.toInt()
+ "running" -> agg._guestsRunning = point.value.toInt()
+ "error" -> agg._guestsRunning = point.value.toInt()
+ "invalid" -> agg._guestsInvalid = point.value.toInt()
}
}
}
@@ -101,24 +103,24 @@ public class ComputeMetricAggregator {
val server = getServer(servers, point)
if (server != null) {
- server.cpuLimit = point.value
- server.host = agg.host
+ server._cpuLimit = point.value
+ server._host = agg.host
} else {
- agg.cpuLimit = point.value
+ agg._cpuLimit = point.value
}
}
}
"system.cpu.usage" -> {
val agg = getHost(hosts, resource) ?: continue
- agg.cpuUsage = metric.doubleGaugeData.points.first().value
+ agg._cpuUsage = metric.doubleGaugeData.points.first().value
}
"system.cpu.demand" -> {
val agg = getHost(hosts, resource) ?: continue
- agg.cpuDemand = metric.doubleGaugeData.points.first().value
+ agg._cpuDemand = metric.doubleGaugeData.points.first().value
}
"system.cpu.utilization" -> {
val agg = getHost(hosts, resource) ?: continue
- agg.cpuUtilization = metric.doubleGaugeData.points.first().value
+ agg._cpuUtilization = metric.doubleGaugeData.points.first().value
}
"system.cpu.time" -> {
val agg = getHost(hosts, resource) ?: continue
@@ -128,29 +130,29 @@ public class ComputeMetricAggregator {
val state = point.attributes[STATE_KEY]
if (server != null) {
when (state) {
- "active" -> server.cpuActiveTime = point.value
- "idle" -> server.cpuIdleTime = point.value
- "steal" -> server.cpuStealTime = point.value
- "lost" -> server.cpuLostTime = point.value
+ "active" -> server._cpuActiveTime = point.value
+ "idle" -> server._cpuIdleTime = point.value
+ "steal" -> server._cpuStealTime = point.value
+ "lost" -> server._cpuLostTime = point.value
}
- server.host = agg.host
+ server._host = agg.host
} else {
when (state) {
- "active" -> agg.cpuActiveTime = point.value
- "idle" -> agg.cpuIdleTime = point.value
- "steal" -> agg.cpuStealTime = point.value
- "lost" -> agg.cpuLostTime = point.value
+ "active" -> agg._cpuActiveTime = point.value
+ "idle" -> agg._cpuIdleTime = point.value
+ "steal" -> agg._cpuStealTime = point.value
+ "lost" -> agg._cpuLostTime = point.value
}
}
}
}
"system.power.usage" -> {
val agg = getHost(hosts, resource) ?: continue
- agg.powerUsage = metric.doubleGaugeData.points.first().value
+ agg._powerUsage = metric.doubleGaugeData.points.first().value
}
"system.power.total" -> {
val agg = getHost(hosts, resource) ?: continue
- agg.powerTotal = metric.doubleSumData.points.first().value
+ agg._powerTotal = metric.doubleSumData.points.first().value
}
"system.time" -> {
val agg = getHost(hosts, resource) ?: continue
@@ -162,16 +164,16 @@ public class ComputeMetricAggregator {
server.recordTimestamp(point)
when (point.attributes[STATE_KEY]) {
- "up" -> server.uptime = point.value
- "down" -> server.downtime = point.value
+ "up" -> server._uptime = point.value
+ "down" -> server._downtime = point.value
}
- server.host = agg.host
+ server._host = agg.host
} else {
agg.recordTimestamp(point)
when (point.attributes[STATE_KEY]) {
- "up" -> agg.uptime = point.value
- "down" -> agg.downtime = point.value
+ "up" -> agg._uptime = point.value
+ "down" -> agg._downtime = point.value
}
}
}
@@ -183,10 +185,10 @@ public class ComputeMetricAggregator {
val server = getServer(servers, point)
if (server != null) {
- server.bootTime = Instant.ofEpochMilli(point.value)
- server.host = agg.host
+ server._bootTime = Instant.ofEpochMilli(point.value)
+ server._host = agg.host
} else {
- agg.bootTime = Instant.ofEpochMilli(point.value)
+ agg._bootTime = Instant.ofEpochMilli(point.value)
}
}
}
@@ -194,7 +196,7 @@ public class ComputeMetricAggregator {
for (point in metric.longGaugeData.points) {
val server = getServer(servers, point) ?: continue
server.recordTimestamp(point)
- server.provisionTime = Instant.ofEpochMilli(point.value)
+ server._provisionTime = Instant.ofEpochMilli(point.value)
}
}
}
@@ -205,14 +207,16 @@ public class ComputeMetricAggregator {
* Collect the data via the [monitor].
*/
public fun collect(monitor: ComputeMonitor) {
- monitor.record(_service.collect())
+ monitor.record(_service)
for (host in _hosts.values) {
- monitor.record(host.collect())
+ monitor.record(host)
+ host.reset()
}
for (server in _servers.values) {
- monitor.record(server.collect())
+ monitor.record(server)
+ server.reset()
}
}
@@ -243,50 +247,55 @@ public class ComputeMetricAggregator {
/**
* An aggregator for service metrics before they are reported.
*/
- internal class ServiceAggregator {
- private var timestamp = Long.MIN_VALUE
+ internal class ServiceAggregator : ServiceTableReader {
+ private var _timestamp: Instant = Instant.MIN
+ override val timestamp: Instant
+ get() = _timestamp
- @JvmField var hostsUp = 0
- @JvmField var hostsDown = 0
+ override val hostsUp: Int
+ get() = _hostsUp
+ @JvmField var _hostsUp = 0
- @JvmField var serversPending = 0
- @JvmField var serversActive = 0
+ override val hostsDown: Int
+ get() = _hostsDown
+ @JvmField var _hostsDown = 0
- @JvmField var attemptsSuccess = 0
- @JvmField var attemptsFailure = 0
- @JvmField var attemptsError = 0
+ override val serversPending: Int
+ get() = _serversPending
+ @JvmField var _serversPending = 0
- /**
- * Finish the aggregation for this cycle.
- */
- fun collect(): ServiceData {
- val now = Instant.ofEpochMilli(timestamp)
- return toServiceData(now)
- }
+ override val serversActive: Int
+ get() = _serversActive
+ @JvmField var _serversActive = 0
- /**
- * Convert the aggregator state to an immutable [ServiceData].
- */
- private fun toServiceData(now: Instant): ServiceData {
- return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError)
- }
+ override val attemptsSuccess: Int
+ get() = _attemptsSuccess
+ @JvmField var _attemptsSuccess = 0
+
+ override val attemptsFailure: Int
+ get() = _attemptsFailure
+ @JvmField var _attemptsFailure = 0
+
+ override val attemptsError: Int
+ get() = _attemptsError
+ @JvmField var _attemptsError = 0
/**
* Record the timestamp of a [point] for this aggregator.
*/
fun recordTimestamp(point: PointData) {
- timestamp = point.epochNanos / 1_000_000L // ns to ms
+ _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms
}
}
/**
* An aggregator for host metrics before they are reported.
*/
- internal class HostAggregator(resource: Resource) {
+ internal class HostAggregator(resource: Resource) : HostTableReader {
/**
* The static information about this host.
*/
- val host = HostInfo(
+ override val host = HostInfo(
resource.attributes[HOST_ID]!!,
resource.attributes[HOST_NAME] ?: "",
resource.attributes[HOST_ARCH] ?: "",
@@ -294,111 +303,127 @@ public class ComputeMetricAggregator {
resource.attributes[HOST_MEM_CAPACITY] ?: 0,
)
- private var timestamp = Long.MIN_VALUE
+ override val timestamp: Instant
+ get() = _timestamp
+ private var _timestamp = Instant.MIN
+
+ override val guestsTerminated: Int
+ get() = _guestsTerminated
+ @JvmField var _guestsTerminated = 0
+
+ override val guestsRunning: Int
+ get() = _guestsRunning
+ @JvmField var _guestsRunning = 0
+
+ override val guestsError: Int
+ get() = _guestsError
+ @JvmField var _guestsError = 0
- @JvmField var guestsTerminated = 0
- @JvmField var guestsRunning = 0
- @JvmField var guestsError = 0
- @JvmField var guestsInvalid = 0
+ override val guestsInvalid: Int
+ get() = _guestsInvalid
+ @JvmField var _guestsInvalid = 0
- @JvmField var cpuLimit = 0.0
- @JvmField var cpuUsage = 0.0
- @JvmField var cpuDemand = 0.0
- @JvmField var cpuUtilization = 0.0
+ override val cpuLimit: Double
+ get() = _cpuLimit
+ @JvmField var _cpuLimit = 0.0
- @JvmField var cpuActiveTime = 0L
- @JvmField var cpuIdleTime = 0L
- @JvmField var cpuStealTime = 0L
- @JvmField var cpuLostTime = 0L
+ override val cpuUsage: Double
+ get() = _cpuUsage
+ @JvmField var _cpuUsage = 0.0
+
+ override val cpuDemand: Double
+ get() = _cpuDemand
+ @JvmField var _cpuDemand = 0.0
+
+ override val cpuUtilization: Double
+ get() = _cpuUtilization
+ @JvmField var _cpuUtilization = 0.0
+
+ override val cpuActiveTime: Long
+ get() = _cpuActiveTime - previousCpuActiveTime
+ @JvmField var _cpuActiveTime = 0L
private var previousCpuActiveTime = 0L
+
+ override val cpuIdleTime: Long
+ get() = _cpuIdleTime - previousCpuIdleTime
+ @JvmField var _cpuIdleTime = 0L
private var previousCpuIdleTime = 0L
+
+ override val cpuStealTime: Long
+ get() = _cpuStealTime - previousCpuStealTime
+ @JvmField var _cpuStealTime = 0L
private var previousCpuStealTime = 0L
+
+ override val cpuLostTime: Long
+ get() = _cpuLostTime - previousCpuLostTime
+ @JvmField var _cpuLostTime = 0L
private var previousCpuLostTime = 0L
- @JvmField var powerUsage = 0.0
- @JvmField var powerTotal = 0.0
+ override val powerUsage: Double
+ get() = _powerUsage
+ @JvmField var _powerUsage = 0.0
+
+ override val powerTotal: Double
+ get() = _powerTotal - previousPowerTotal
+ @JvmField var _powerTotal = 0.0
private var previousPowerTotal = 0.0
- @JvmField var uptime = 0L
+ override val uptime: Long
+ get() = _uptime - previousUptime
+ @JvmField var _uptime = 0L
private var previousUptime = 0L
- @JvmField var downtime = 0L
+
+ override val downtime: Long
+ get() = _downtime - previousDowntime
+ @JvmField var _downtime = 0L
private var previousDowntime = 0L
- @JvmField var bootTime: Instant? = null
+
+ override val bootTime: Instant?
+ get() = _bootTime
+ @JvmField var _bootTime: Instant? = null
/**
* Finish the aggregation for this cycle.
*/
- fun collect(): HostData {
- val now = Instant.ofEpochMilli(timestamp)
- val data = toHostData(now)
-
+ fun reset() {
// Reset intermediate state for next aggregation
- previousCpuActiveTime = cpuActiveTime
- previousCpuIdleTime = cpuIdleTime
- previousCpuStealTime = cpuStealTime
- previousCpuLostTime = cpuLostTime
- previousPowerTotal = powerTotal
- previousUptime = uptime
- previousDowntime = downtime
-
- guestsTerminated = 0
- guestsRunning = 0
- guestsError = 0
- guestsInvalid = 0
-
- cpuLimit = 0.0
- cpuUsage = 0.0
- cpuDemand = 0.0
- cpuUtilization = 0.0
-
- powerUsage = 0.0
-
- return data
- }
-
- /**
- * Convert the aggregator state to an immutable [HostData] instance.
- */
- private fun toHostData(now: Instant): HostData {
- return HostData(
- now,
- host,
- guestsTerminated,
- guestsRunning,
- guestsError,
- guestsInvalid,
- cpuLimit,
- cpuUsage,
- cpuDemand,
- cpuUtilization,
- cpuActiveTime - previousCpuActiveTime,
- cpuIdleTime - previousCpuIdleTime,
- cpuStealTime - previousCpuStealTime,
- cpuLostTime - previousCpuLostTime,
- powerUsage,
- powerTotal - previousPowerTotal,
- uptime - previousUptime,
- downtime - previousDowntime,
- bootTime
- )
+ previousCpuActiveTime = _cpuActiveTime
+ previousCpuIdleTime = _cpuIdleTime
+ previousCpuStealTime = _cpuStealTime
+ previousCpuLostTime = _cpuLostTime
+ previousPowerTotal = _powerTotal
+ previousUptime = _uptime
+ previousDowntime = _downtime
+
+ _guestsTerminated = 0
+ _guestsRunning = 0
+ _guestsError = 0
+ _guestsInvalid = 0
+
+ _cpuLimit = 0.0
+ _cpuUsage = 0.0
+ _cpuDemand = 0.0
+ _cpuUtilization = 0.0
+
+ _powerUsage = 0.0
}
/**
* Record the timestamp of a [point] for this aggregator.
*/
fun recordTimestamp(point: PointData) {
- timestamp = point.epochNanos / 1_000_000L // ns to ms
+ _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms
}
}
/**
* An aggregator for server metrics before they are reported.
*/
- internal class ServerAggregator(attributes: Attributes) {
+ internal class ServerAggregator(attributes: Attributes) : ServerTableReader {
/**
* The static information about this server.
*/
- val server = ServerInfo(
+ override val server = ServerInfo(
attributes[ResourceAttributes.HOST_ID]!!,
attributes[ResourceAttributes.HOST_NAME]!!,
attributes[ResourceAttributes.HOST_TYPE]!!,
@@ -412,70 +437,76 @@ public class ComputeMetricAggregator {
/**
* The [HostInfo] of the host on which the server is hosted.
*/
- @JvmField var host: HostInfo? = null
+ override val host: HostInfo?
+ get() = _host
+ @JvmField var _host: HostInfo? = null
- private var timestamp = Long.MIN_VALUE
- @JvmField var uptime: Long = 0
+ private var _timestamp = Instant.MIN
+ override val timestamp: Instant
+ get() = _timestamp
+
+ override val uptime: Long
+ get() = _uptime - previousUptime
+ @JvmField var _uptime: Long = 0
private var previousUptime = 0L
- @JvmField var downtime: Long = 0
+
+ override val downtime: Long
+ get() = _downtime - previousDowntime
+ @JvmField var _downtime: Long = 0
private var previousDowntime = 0L
- @JvmField var provisionTime: Instant? = null
- @JvmField var bootTime: Instant? = null
- @JvmField var cpuLimit = 0.0
- @JvmField var cpuActiveTime = 0L
- @JvmField var cpuIdleTime = 0L
- @JvmField var cpuStealTime = 0L
- @JvmField var cpuLostTime = 0L
+
+ override val provisionTime: Instant?
+ get() = _provisionTime
+ @JvmField var _provisionTime: Instant? = null
+
+ override val bootTime: Instant?
+ get() = _bootTime
+ @JvmField var _bootTime: Instant? = null
+
+ override val cpuLimit: Double
+ get() = _cpuLimit
+ @JvmField var _cpuLimit = 0.0
+
+ override val cpuActiveTime: Long
+ get() = _cpuActiveTime - previousCpuActiveTime
+ @JvmField var _cpuActiveTime = 0L
private var previousCpuActiveTime = 0L
+
+ override val cpuIdleTime: Long
+ get() = _cpuIdleTime - previousCpuIdleTime
+ @JvmField var _cpuIdleTime = 0L
private var previousCpuIdleTime = 0L
+
+ override val cpuStealTime: Long
+ get() = _cpuStealTime - previousCpuStealTime
+ @JvmField var _cpuStealTime = 0L
private var previousCpuStealTime = 0L
+
+ override val cpuLostTime: Long
+ get() = _cpuLostTime - previousCpuLostTime
+ @JvmField var _cpuLostTime = 0L
private var previousCpuLostTime = 0L
/**
* Finish the aggregation for this cycle.
*/
- fun collect(): ServerData {
- val now = Instant.ofEpochMilli(timestamp)
- val data = toServerData(now)
-
- previousUptime = uptime
- previousDowntime = downtime
+ fun reset() {
+ previousUptime = _uptime
+ previousDowntime = _downtime
previousCpuActiveTime = cpuActiveTime
previousCpuIdleTime = cpuIdleTime
previousCpuStealTime = cpuStealTime
previousCpuLostTime = cpuLostTime
- host = null
- cpuLimit = 0.0
-
- return data
- }
-
- /**
- * Convert the aggregator state into an immutable [ServerData].
- */
- private fun toServerData(now: Instant): ServerData {
- return ServerData(
- now,
- server,
- host,
- uptime - previousUptime,
- downtime - previousDowntime,
- provisionTime,
- bootTime,
- cpuLimit,
- cpuActiveTime - previousCpuActiveTime,
- cpuIdleTime - previousCpuIdleTime,
- cpuStealTime - previousCpuStealTime,
- cpuLostTime - previousCpuLostTime
- )
+ _host = null
+ _cpuLimit = 0.0
}
/**
* Record the timestamp of a [point] for this aggregator.
*/
fun recordTimestamp(point: PointData) {
- timestamp = point.epochNanos / 1_000_000L // ns to ms
+ _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms
}
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
index d51bcab4..64b5f337 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt
@@ -22,26 +22,26 @@
package org.opendc.telemetry.compute
-import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.table.ServerData
-import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.compute.table.HostTableReader
+import org.opendc.telemetry.compute.table.ServerTableReader
+import org.opendc.telemetry.compute.table.ServiceTableReader
/**
* A monitor that tracks the metrics and events of the OpenDC Compute service.
*/
public interface ComputeMonitor {
/**
- * Record the specified [data].
+ * Record an entry with the specified [reader].
*/
- public fun record(data: ServerData) {}
+ public fun record(reader: ServerTableReader) {}
/**
- * Record the specified [data].
+ * Record an entry with the specified [reader].
*/
- public fun record(data: HostData) {}
+ public fun record(reader: HostTableReader) {}
/**
- * Record the specified [data].
+ * Record an entry with the specified [reader].
*/
- public fun record(data: ServiceData) {}
+ public fun record(reader: ServiceTableReader) {}
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
index ce89061b..41315b15 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
@@ -24,6 +24,8 @@ package org.opendc.telemetry.compute
import io.opentelemetry.sdk.metrics.export.MetricProducer
import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.compute.table.ServiceTableReader
+import org.opendc.telemetry.compute.table.toServiceData
/**
* Collect the metrics of the compute service.
@@ -32,8 +34,8 @@ public fun collectServiceMetrics(metricProducer: MetricProducer): ServiceData {
lateinit var serviceData: ServiceData
val agg = ComputeMetricAggregator()
val monitor = object : ComputeMonitor {
- override fun record(data: ServiceData) {
- serviceData = data
+ override fun record(reader: ServiceTableReader) {
+ serviceData = reader.toServiceData()
}
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt
new file mode 100644
index 00000000..1e1ad94e
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute.table
+
+import java.time.Instant
+
+/**
+ * An interface that is used to read a row of a host trace entry.
+ */
+public interface HostTableReader {
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestamp: Instant
+
+ /**
+ * The [HostInfo] of the host to which the row belongs to.
+ */
+ public val host: HostInfo
+
+ /**
+ * The number of guests that are in a terminated state.
+ */
+ public val guestsTerminated: Int
+
+ /**
+ * The number of guests that are in a running state.
+ */
+ public val guestsRunning: Int
+
+ /**
+ * The number of guests that are in an error state.
+ */
+ public val guestsError: Int
+
+ /**
+ * The number of guests that are in an unknown state.
+ */
+ public val guestsInvalid: Int
+
+ /**
+ * The capacity of the CPUs in the host (in MHz).
+ */
+ public val cpuLimit: Double
+
+ /**
+ * The usage of all CPUs in the host (in MHz).
+ */
+ public val cpuUsage: Double
+
+ /**
+ * The demand of all vCPUs of the guests (in MHz)
+ */
+ public val cpuDemand: Double
+
+ /**
+ * The CPU utilization of the host.
+ */
+ public val cpuUtilization: Double
+
+ /**
+ * The duration (in seconds) that a CPU was active in the host.
+ */
+ public val cpuActiveTime: Long
+
+ /**
+ * The duration (in seconds) that a CPU was idle in the host.
+ */
+ public val cpuIdleTime: Long
+
+ /**
+ * The duration (in seconds) that a vCPU wanted to run, but no capacity was available.
+ */
+ public val cpuStealTime: Long
+
+ /**
+ * The duration (in seconds) of CPU time that was lost due to interference.
+ */
+ public val cpuLostTime: Long
+
+ /**
+ * The current power usage of the host in W.
+ */
+ public val powerUsage: Double
+
+ /**
+ * The total power consumption of the host since last time in J.
+ */
+ public val powerTotal: Double
+
+ /**
+ * The uptime of the host since last time in ms.
+ */
+ public val uptime: Long
+
+ /**
+ * The downtime of the host since last time in ms.
+ */
+ public val downtime: Long
+
+ /**
+ * The [Instant] at which the host booted.
+ */
+ public val bootTime: Instant?
+}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
deleted file mode 100644
index 6fd2a81b..00000000
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.telemetry.compute.table
-
-import java.time.Instant
-
-/**
- * A trace entry for a particular server.
- */
-public data class ServerData(
- val timestamp: Instant,
- val server: ServerInfo,
- val host: HostInfo?,
- val uptime: Long,
- val downtime: Long,
- val provisionTime: Instant?,
- val bootTime: Instant?,
- val cpuLimit: Double,
- val cpuActiveTime: Long,
- val cpuIdleTime: Long,
- val cpuStealTime: Long,
- val cpuLostTime: Long,
-)
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt
new file mode 100644
index 00000000..c23d1467
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute.table
+
+import java.time.Instant
+
+/**
+ * An interface that is used to read a row of a server trace entry.
+ */
+public interface ServerTableReader {
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestamp: Instant
+
+ /**
+ * The [ServerInfo] of the server to which the row belongs to.
+ */
+ public val server: ServerInfo
+
+ /**
+ * The [HostInfo] of the host on which the server is hosted or `null` if it has no host.
+ */
+ public val host: HostInfo?
+
+ /**
+ * The uptime of the host since last time in ms.
+ */
+ public val uptime: Long
+
+ /**
+ * The downtime of the host since last time in ms.
+ */
+ public val downtime: Long
+
+ /**
+ * The [Instant] at which the server was enqueued for the scheduler.
+ */
+ public val provisionTime: Instant?
+
+ /**
+ * The [Instant] at which the server booted.
+ */
+ public val bootTime: Instant?
+
+ /**
+ * The capacity of the CPUs of the servers (in MHz).
+ */
+ public val cpuLimit: Double
+
+ /**
+ * The duration (in seconds) that a CPU was active in the server.
+ */
+ public val cpuActiveTime: Long
+
+ /**
+ * The duration (in seconds) that a CPU was idle in the server.
+ */
+ public val cpuIdleTime: Long
+
+ /**
+ * The duration (in seconds) that a vCPU wanted to run, but no capacity was available.
+ */
+ public val cpuStealTime: Long
+
+ /**
+ * The duration (in seconds) of CPU time that was lost due to interference.
+ */
+ public val cpuLostTime: Long
+}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
index 6db1399d..39bf96f4 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
@@ -37,3 +37,10 @@ public data class ServiceData(
val attemptsFailure: Int,
val attemptsError: Int
)
+
+/**
+ * Convert a [ServiceTableReader] into a persistent object.
+ */
+public fun ServiceTableReader.toServiceData(): ServiceData {
+ return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError)
+}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt
index 8e787b97..908f6748 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt
@@ -25,26 +25,46 @@ package org.opendc.telemetry.compute.table
import java.time.Instant
/**
- * A trace entry for a particular host.
+ * An interface that is used to read a row of a service trace entry.
*/
-public data class HostData(
- val timestamp: Instant,
- val host: HostInfo,
- val guestsTerminated: Int,
- val guestsRunning: Int,
- val guestsError: Int,
- val guestsInvalid: Int,
- val cpuLimit: Double,
- val cpuUsage: Double,
- val cpuDemand: Double,
- val cpuUtilization: Double,
- val cpuActiveTime: Long,
- val cpuIdleTime: Long,
- val cpuStealTime: Long,
- val cpuLostTime: Long,
- val powerUsage: Double,
- val powerTotal: Double,
- val uptime: Long,
- val downtime: Long,
- val bootTime: Instant?
-)
+public interface ServiceTableReader {
+ /**
+ * The timestamp of the current entry of the reader.
+ */
+ public val timestamp: Instant
+
+ /**
+ * The number of hosts that are up at this instant.
+ */
+ public val hostsUp: Int
+
+ /**
+ * The number of hosts that are down at this instant.
+ */
+ public val hostsDown: Int
+
+ /**
+ * The number of servers that are pending to be scheduled.
+ */
+ public val serversPending: Int
+
+ /**
+ * The number of servers that are currently active.
+ */
+ public val serversActive: Int
+
+ /**
+ * The scheduling attempts that were successful.
+ */
+ public val attemptsSuccess: Int
+
+ /**
+ * The scheduling attempts that were unsuccessful due to client error.
+ */
+ public val attemptsFailure: Int
+
+ /**
+ * The scheduling attempts that were unsuccessful due to scheduler error.
+ */
+ public val attemptsError: Int
+}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt
index 7913660d..d39a0c74 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt
@@ -24,8 +24,8 @@ package org.opendc.web.runner
import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
-import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.compute.table.HostTableReader
+import org.opendc.telemetry.compute.table.ServiceTableReader
import kotlin.math.max
import kotlin.math.roundToLong
@@ -33,24 +33,24 @@ import kotlin.math.roundToLong
* A [ComputeMonitor] that tracks the aggregate metrics for each repeat.
*/
class WebComputeMetricExporter : ComputeMetricExporter() {
- override fun record(data: HostData) {
- val slices = data.downtime / SLICE_LENGTH
+ override fun record(reader: HostTableReader) {
+ val slices = reader.downtime / SLICE_LENGTH
hostAggregateMetrics = AggregateHostMetrics(
- hostAggregateMetrics.totalActiveTime + data.cpuActiveTime,
- hostAggregateMetrics.totalIdleTime + data.cpuIdleTime,
- hostAggregateMetrics.totalStealTime + data.cpuStealTime,
- hostAggregateMetrics.totalLostTime + data.cpuLostTime,
- hostAggregateMetrics.totalPowerDraw + data.powerTotal,
+ hostAggregateMetrics.totalActiveTime + reader.cpuActiveTime,
+ hostAggregateMetrics.totalIdleTime + reader.cpuIdleTime,
+ hostAggregateMetrics.totalStealTime + reader.cpuStealTime,
+ hostAggregateMetrics.totalLostTime + reader.cpuLostTime,
+ hostAggregateMetrics.totalPowerDraw + reader.powerTotal,
hostAggregateMetrics.totalFailureSlices + slices,
- hostAggregateMetrics.totalFailureVmSlices + data.guestsRunning * slices
+ hostAggregateMetrics.totalFailureVmSlices + reader.guestsRunning * slices
)
- hostMetrics.compute(data.host.id) { _, prev ->
+ hostMetrics.compute(reader.host.id) { _, prev ->
HostMetrics(
- data.cpuUsage + (prev?.cpuUsage ?: 0.0),
- data.cpuDemand + (prev?.cpuDemand ?: 0.0),
- data.guestsRunning + (prev?.instanceCount ?: 0),
+ reader.cpuUsage + (prev?.cpuUsage ?: 0.0),
+ reader.cpuDemand + (prev?.cpuDemand ?: 0.0),
+ reader.guestsRunning + (prev?.instanceCount ?: 0),
1 + (prev?.count ?: 0)
)
}
@@ -79,13 +79,13 @@ class WebComputeMetricExporter : ComputeMetricExporter() {
private var serviceMetrics: AggregateServiceMetrics = AggregateServiceMetrics()
- override fun record(data: ServiceData) {
+ override fun record(reader: ServiceTableReader) {
serviceMetrics = AggregateServiceMetrics(
- max(data.attemptsSuccess, serviceMetrics.vmTotalCount),
- max(data.serversPending, serviceMetrics.vmWaitingCount),
- max(data.serversActive, serviceMetrics.vmActiveCount),
+ max(reader.attemptsSuccess, serviceMetrics.vmTotalCount),
+ max(reader.serversPending, serviceMetrics.vmWaitingCount),
+ max(reader.serversActive, serviceMetrics.vmActiveCount),
max(0, serviceMetrics.vmInactiveCount),
- max(data.attemptsFailure, serviceMetrics.vmFailedCount),
+ max(reader.attemptsFailure, serviceMetrics.vmFailedCount),
)
}