summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-28 11:23:13 +0200
committerGitHub <noreply@github.com>2021-09-28 11:23:13 +0200
commit6196895bfd0334052afa4fb91b00adb259a661b6 (patch)
tree8a14988b30f6f5758b1f9f982d0086296eb5d416
parent993c65d9c287d8db2db9ff1f95abb414803a502c (diff)
parent94d8ee69e52dcd375a662a08c198aa29670362fb (diff)
merge: Simplify usage of ComputeMetricExporter
This pull request addresses some issues with the current implementation of the `ComputeMetricExporter` class. In particular, the construction of `ComputeMetricExporter` does not require a `Clock` anymore. - Ensure shutdown of exporter is called - Do not require clock for ComputeMetricExporter - Do not recover guests in non-error state - Write null values explicitly in Parquet exporter - Report cause of compute exporter failure **Breaking API Changes** - `ComputeMetricExporter` is now an abstract class that can be extended to collect metrics - `ParquetComputeMonitor` has been renamed to `ParquetComputeMetricExporter` and extends `ComputeMetricExporter`
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt4
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt15
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt41
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt (renamed from opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt)8
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt4
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt10
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt59
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt54
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt13
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt15
-rw-r--r--opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt28
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt13
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt2
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt (renamed from opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt)5
15 files changed, 156 insertions, 119 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index ff55c585..fdb3f1dc 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -262,7 +262,7 @@ public class SimHost(
}
override suspend fun delete(server: Server) {
- val guest = guests.remove(server) ?: return
+ val guest = guests[server] ?: return
guest.terminate()
}
@@ -296,7 +296,7 @@ public class SimHost(
_bootTime = clock.millis()
for (guest in guests.values) {
- guest.start()
+ guest.recover()
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index 90562e2f..7f33154a 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -96,8 +96,8 @@ internal class Guest(
}
ServerState.RUNNING -> return
ServerState.DELETED -> {
- logger.warn { "User tried to start terminated server" }
- throw IllegalArgumentException("Server is terminated")
+ logger.warn { "User tried to start deleted server" }
+ throw IllegalArgumentException("Server is deleted")
}
else -> assert(false) { "Invalid state transition" }
}
@@ -144,6 +144,17 @@ internal class Guest(
}
/**
+ * Recover the guest if it is in an error state.
+ */
+ suspend fun recover() {
+ if (state != ServerState.ERROR) {
+ return
+ }
+
+ doStart()
+ }
+
+ /**
* The [Job] representing the current active virtual machine instance or `null` if no virtual machine is active.
*/
private var job: Job? = null
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 9c879e5e..e75c31a0 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -43,7 +43,6 @@ import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.simulator.resources.SimResourceInterpreter
import org.opendc.telemetry.compute.ComputeMetricExporter
-import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.HOST_ID
import org.opendc.telemetry.compute.table.HostData
import org.opendc.telemetry.compute.table.ServerData
@@ -138,16 +137,13 @@ internal class SimHostTest {
// Setup metric reader
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
- ComputeMetricExporter(
- clock,
- object : ComputeMonitor {
- override fun record(data: HostData) {
- activeTime += data.cpuActiveTime
- idleTime += data.cpuIdleTime
- stealTime += data.cpuStealTime
- }
+ object : ComputeMetricExporter() {
+ override fun record(data: HostData) {
+ activeTime += data.cpuActiveTime
+ idleTime += data.cpuIdleTime
+ stealTime += data.cpuStealTime
}
- ),
+ },
exportInterval = Duration.ofSeconds(duration)
)
@@ -237,22 +233,19 @@ internal class SimHostTest {
// Setup metric reader
val reader = CoroutineMetricReader(
this, listOf(meterProvider as MetricProducer),
- ComputeMetricExporter(
- clock,
- object : ComputeMonitor {
- override fun record(data: HostData) {
- activeTime += data.cpuActiveTime
- idleTime += data.cpuIdleTime
- uptime += data.uptime
- downtime += data.downtime
- }
+ object : ComputeMetricExporter() {
+ override fun record(data: HostData) {
+ activeTime += data.cpuActiveTime
+ idleTime += data.cpuIdleTime
+ uptime += data.uptime
+ downtime += data.downtime
+ }
- override fun record(data: ServerData) {
- guestUptime += data.uptime
- guestDowntime += data.downtime
- }
+ override fun record(data: ServerData) {
+ guestUptime += data.uptime
+ guestDowntime += data.downtime
}
- ),
+ },
exportInterval = Duration.ofSeconds(duration)
)
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt
index f41a2241..ad182d67 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetExportMonitor.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMetricExporter.kt
@@ -22,6 +22,8 @@
package org.opendc.compute.workload.export.parquet
+import io.opentelemetry.sdk.common.CompletableResultCode
+import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.table.HostData
import org.opendc.telemetry.compute.table.ServerData
@@ -31,7 +33,7 @@ import java.io.File
/**
* A [ComputeMonitor] that logs the events to a Parquet file.
*/
-public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int) : ComputeMonitor, AutoCloseable {
+public class ParquetComputeMetricExporter(base: File, partition: String, bufferSize: Int) : ComputeMetricExporter() {
private val serverWriter = ParquetServerDataWriter(
File(base, "server/$partition/data.parquet").also { it.parentFile.mkdirs() },
bufferSize
@@ -59,9 +61,11 @@ public class ParquetExportMonitor(base: File, partition: String, bufferSize: Int
serviceWriter.write(data)
}
- override fun close() {
+ override fun shutdown(): CompletableResultCode {
hostWriter.close()
serviceWriter.close()
serverWriter.close()
+
+ return CompletableResultCode.ofSuccess()
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
index 37066a0d..98a0739e 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
@@ -54,9 +54,7 @@ public class ParquetHostDataWriter(path: File, bufferSize: Int) :
builder["uptime"] = data.uptime
builder["downtime"] = data.downtime
val bootTime = data.bootTime
- if (bootTime != null) {
- builder["boot_time"] = bootTime.toEpochMilli()
- }
+ builder["boot_time"] = bootTime?.toEpochMilli()
builder["cpu_count"] = data.host.cpuCount
builder["cpu_limit"] = data.cpuLimit
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
index bea23d32..0d11ec23 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
@@ -56,9 +56,7 @@ public class ParquetServerDataWriter(path: File, bufferSize: Int) :
builder["uptime"] = data.uptime
builder["downtime"] = data.downtime
val bootTime = data.bootTime
- if (bootTime != null) {
- builder["boot_time"] = bootTime.toEpochMilli()
- }
+ builder["boot_time"] = bootTime?.toEpochMilli()
builder["scheduling_latency"] = data.schedulingLatency
builder["cpu_count"] = data.server.cpuCount
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 2201a6b4..21ff3ab0 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -27,7 +27,7 @@ import mu.KotlinLogging
import org.opendc.compute.workload.ComputeWorkloadLoader
import org.opendc.compute.workload.ComputeWorkloadRunner
import org.opendc.compute.workload.createComputeScheduler
-import org.opendc.compute.workload.export.parquet.ParquetExportMonitor
+import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter
import org.opendc.compute.workload.grid5000
import org.opendc.compute.workload.topology.apply
import org.opendc.compute.workload.util.PerformanceInterferenceReader
@@ -39,7 +39,6 @@ import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
@@ -120,12 +119,12 @@ abstract class Portfolio(name: String) : Experiment(name) {
performanceInterferenceModel
)
- val monitor = ParquetExportMonitor(
+ val exporter = ParquetComputeMetricExporter(
File(config.getString("output-path")),
"portfolio_id=$name/scenario_id=$id/run_id=$repeat",
4096
)
- val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor))
+ val metricReader = CoroutineMetricReader(this, runner.producers, exporter)
val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt"))
try {
@@ -137,10 +136,9 @@ abstract class Portfolio(name: String) : Experiment(name) {
} finally {
runner.close()
metricReader.close()
- monitor.close()
}
- val monitorResults = collectServiceMetrics(clock.instant(), runner.producers[0])
+ val monitorResults = collectServiceMetrics(runner.producers[0])
logger.debug {
"Scheduler " +
"Success=${monitorResults.attemptsSuccess} " +
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index ac2ea646..30cc1466 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -39,7 +39,6 @@ import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.compute.ComputeMetricExporter
-import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.compute.table.HostData
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
@@ -54,7 +53,7 @@ class CapelinIntegrationTest {
/**
* The monitor used to keep track of the metrics.
*/
- private lateinit var monitor: TestExperimentReporter
+ private lateinit var exporter: TestComputeMetricExporter
/**
* The [FilterScheduler] to use for all experiments.
@@ -71,7 +70,7 @@ class CapelinIntegrationTest {
*/
@BeforeEach
fun setUp() {
- monitor = TestExperimentReporter()
+ exporter = TestComputeMetricExporter()
computeScheduler = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
@@ -91,7 +90,7 @@ class CapelinIntegrationTest {
computeScheduler
)
val topology = createTopology()
- val metricReader = CoroutineMetricReader(this, runner.producers, ComputeMetricExporter(clock, monitor))
+ val metricReader = CoroutineMetricReader(this, runner.producers, exporter)
try {
runner.apply(topology)
@@ -101,7 +100,7 @@ class CapelinIntegrationTest {
metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.instant(), runner.producers[0])
+ val serviceMetrics = collectServiceMetrics(runner.producers[0])
println(
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
@@ -117,11 +116,11 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
{ assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
- { assertEquals(223331032, monitor.idleTime) { "Incorrect idle time" } },
- { assertEquals(67006568, monitor.activeTime) { "Incorrect active time" } },
- { assertEquals(3159379, monitor.stealTime) { "Incorrect steal time" } },
- { assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
- { assertEquals(5.841120890240688E9, monitor.energyUsage, 0.01) { "Incorrect power draw" } },
+ { assertEquals(223331032, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
+ { assertEquals(67006568, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
+ { assertEquals(3159379, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
+ { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } },
+ { assertEquals(5.841120890240688E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
)
}
@@ -139,7 +138,7 @@ class CapelinIntegrationTest {
computeScheduler
)
val topology = createTopology("single")
- val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+ val metricReader = CoroutineMetricReader(this, simulator.producers, exporter)
try {
simulator.apply(topology)
@@ -149,7 +148,7 @@ class CapelinIntegrationTest {
metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
+ val serviceMetrics = collectServiceMetrics(simulator.producers[0])
println(
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
@@ -161,10 +160,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(10998110, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(9740290, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
- { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } }
+ { assertEquals(10998110, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9740290, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
)
}
@@ -188,7 +187,7 @@ class CapelinIntegrationTest {
interferenceModel = performanceInterferenceModel
)
val topology = createTopology("single")
- val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+ val metricReader = CoroutineMetricReader(this, simulator.producers, exporter)
try {
simulator.apply(topology)
@@ -198,7 +197,7 @@ class CapelinIntegrationTest {
metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
+ val serviceMetrics = collectServiceMetrics(simulator.producers[0])
println(
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
@@ -210,10 +209,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(6013899, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(14724501, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(12530742, monitor.stealTime) { "Steal time incorrect" } },
- { assertEquals(473394, monitor.lostTime) { "Lost time incorrect" } }
+ { assertEquals(6013899, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(14724501, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
+ { assertEquals(473394, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
)
}
@@ -231,7 +230,7 @@ class CapelinIntegrationTest {
)
val topology = createTopology("single")
val workload = createTestWorkload(0.25, seed)
- val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor))
+ val metricReader = CoroutineMetricReader(this, simulator.producers, exporter)
try {
simulator.apply(topology)
@@ -241,7 +240,7 @@ class CapelinIntegrationTest {
metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
+ val serviceMetrics = collectServiceMetrics(simulator.producers[0])
println(
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
@@ -253,11 +252,11 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(11134319, monitor.idleTime) { "Idle time incorrect" } },
- { assertEquals(9604081, monitor.activeTime) { "Active time incorrect" } },
- { assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
- { assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
- { assertEquals(2559005056, monitor.uptime) { "Uptime incorrect" } }
+ { assertEquals(11134319, exporter.idleTime) { "Idle time incorrect" } },
+ { assertEquals(9604081, exporter.activeTime) { "Active time incorrect" } },
+ { assertEquals(0, exporter.stealTime) { "Steal time incorrect" } },
+ { assertEquals(0, exporter.lostTime) { "Lost time incorrect" } },
+ { assertEquals(2559005056, exporter.uptime) { "Uptime incorrect" } }
)
}
@@ -277,7 +276,7 @@ class CapelinIntegrationTest {
return stream.use { clusterTopology(stream) }
}
- class TestExperimentReporter : ComputeMonitor {
+ class TestComputeMetricExporter : ComputeMetricExporter() {
var idleTime = 0L
var activeTime = 0L
var stealTime = 0L
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
index e9449634..738ec38b 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
@@ -55,6 +55,9 @@ public class ComputeMetricAggregator {
// ComputeService
"scheduler.hosts" -> {
for (point in metric.longSumData.points) {
+ // Record the timestamp for the service
+ service.recordTimestamp(point)
+
when (point.attributes[STATE_KEY]) {
"up" -> service.hostsUp = point.value.toInt()
"down" -> service.hostsDown = point.value.toInt()
@@ -163,12 +166,16 @@ public class ComputeMetricAggregator {
val server = getServer(servers, point)
if (server != null) {
+ server.recordTimestamp(point)
+
when (point.attributes[STATE_KEY]) {
"up" -> server.uptime = point.value
"down" -> server.downtime = point.value
}
server.host = agg.host
} else {
+ agg.recordTimestamp(point)
+
when (point.attributes[STATE_KEY]) {
"up" -> agg.uptime = point.value
"down" -> agg.downtime = point.value
@@ -197,15 +204,15 @@ public class ComputeMetricAggregator {
/**
* Collect the data via the [monitor].
*/
- public fun collect(now: Instant, monitor: ComputeMonitor) {
- monitor.record(_service.collect(now))
+ public fun collect(monitor: ComputeMonitor) {
+ monitor.record(_service.collect())
for (host in _hosts.values) {
- monitor.record(host.collect(now))
+ monitor.record(host.collect())
}
for (server in _servers.values) {
- monitor.record(server.collect(now))
+ monitor.record(server.collect())
}
}
@@ -237,6 +244,8 @@ public class ComputeMetricAggregator {
* An aggregator for service metrics before they are reported.
*/
internal class ServiceAggregator {
+ private var timestamp = Long.MIN_VALUE
+
@JvmField var hostsUp = 0
@JvmField var hostsDown = 0
@@ -250,7 +259,10 @@ public class ComputeMetricAggregator {
/**
* Finish the aggregation for this cycle.
*/
- fun collect(now: Instant): ServiceData = toServiceData(now)
+ fun collect(): ServiceData {
+ val now = Instant.ofEpochMilli(timestamp)
+ return toServiceData(now)
+ }
/**
* Convert the aggregator state to an immutable [ServiceData].
@@ -258,6 +270,13 @@ public class ComputeMetricAggregator {
private fun toServiceData(now: Instant): ServiceData {
return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError)
}
+
+ /**
+ * Record the timestamp of a [point] for this aggregator.
+ */
+ fun recordTimestamp(point: PointData) {
+ timestamp = point.epochNanos / 1_000_000L // ns to ms
+ }
}
/**
@@ -275,6 +294,8 @@ public class ComputeMetricAggregator {
resource.attributes[HOST_MEM_CAPACITY] ?: 0,
)
+ private var timestamp = Long.MIN_VALUE
+
@JvmField var guestsTerminated = 0
@JvmField var guestsRunning = 0
@JvmField var guestsError = 0
@@ -307,7 +328,8 @@ public class ComputeMetricAggregator {
/**
* Finish the aggregation for this cycle.
*/
- fun collect(now: Instant): HostData {
+ fun collect(): HostData {
+ val now = Instant.ofEpochMilli(timestamp)
val data = toHostData(now)
// Reset intermediate state for next aggregation
@@ -360,6 +382,13 @@ public class ComputeMetricAggregator {
if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null
)
}
+
+ /**
+ * Record the timestamp of a [point] for this aggregator.
+ */
+ fun recordTimestamp(point: PointData) {
+ timestamp = point.epochNanos / 1_000_000L // ns to ms
+ }
}
/**
@@ -383,8 +412,9 @@ public class ComputeMetricAggregator {
/**
* The [HostInfo] of the host on which the server is hosted.
*/
- var host: HostInfo? = null
+ @JvmField var host: HostInfo? = null
+ private var timestamp = Long.MIN_VALUE
@JvmField var uptime: Long = 0
private var previousUptime = 0L
@JvmField var downtime: Long = 0
@@ -404,7 +434,8 @@ public class ComputeMetricAggregator {
/**
* Finish the aggregation for this cycle.
*/
- fun collect(now: Instant): ServerData {
+ fun collect(): ServerData {
+ val now = Instant.ofEpochMilli(timestamp)
val data = toServerData(now)
previousUptime = uptime
@@ -439,6 +470,13 @@ public class ComputeMetricAggregator {
cpuLostTime - previousCpuLostTime
)
}
+
+ /**
+ * Record the timestamp of a [point] for this aggregator.
+ */
+ fun recordTimestamp(point: PointData) {
+ timestamp = point.epochNanos / 1_000_000L // ns to ms
+ }
}
private companion object {
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
index ea96f721..3ab6c7b2 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
@@ -25,12 +25,17 @@ package org.opendc.telemetry.compute
import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.data.*
import io.opentelemetry.sdk.metrics.export.MetricExporter
-import java.time.Clock
+import mu.KotlinLogging
/**
* A [MetricExporter] that redirects data to a [ComputeMonitor] implementation.
*/
-public class ComputeMetricExporter(private val clock: Clock, private val monitor: ComputeMonitor) : MetricExporter {
+public abstract class ComputeMetricExporter : MetricExporter, ComputeMonitor {
+ /**
+ * The logging instance for this exporter.
+ */
+ private val logger = KotlinLogging.logger {}
+
/**
* A [ComputeMetricAggregator] that actually performs the aggregation.
*/
@@ -39,9 +44,11 @@ public class ComputeMetricExporter(private val clock: Clock, private val monitor
override fun export(metrics: Collection<MetricData>): CompletableResultCode {
return try {
agg.process(metrics)
- agg.collect(clock.instant(), monitor)
+ agg.collect(this)
+
CompletableResultCode.ofSuccess()
} catch (e: Throwable) {
+ logger.warn(e) { "Failed to export results" }
CompletableResultCode.ofFailure()
}
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
index 25d346fb..ce89061b 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
@@ -22,22 +22,13 @@
package org.opendc.telemetry.compute
-import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricProducer
import org.opendc.telemetry.compute.table.ServiceData
-import java.time.Instant
/**
* Collect the metrics of the compute service.
*/
-public fun collectServiceMetrics(timestamp: Instant, metricProducer: MetricProducer): ServiceData {
- return extractServiceMetrics(timestamp, metricProducer.collectAllMetrics())
-}
-
-/**
- * Extract a [ServiceData] object from the specified list of metric data.
- */
-public fun extractServiceMetrics(timestamp: Instant, metrics: Collection<MetricData>): ServiceData {
+public fun collectServiceMetrics(metricProducer: MetricProducer): ServiceData {
lateinit var serviceData: ServiceData
val agg = ComputeMetricAggregator()
val monitor = object : ComputeMonitor {
@@ -46,7 +37,7 @@ public fun extractServiceMetrics(timestamp: Instant, metrics: Collection<MetricD
}
}
- agg.process(metrics)
- agg.collect(timestamp, monitor)
+ agg.process(metricProducer.collectAllMetrics())
+ agg.collect(monitor)
return serviceData
}
diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
index 07f0ff7f..1de235e7 100644
--- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
+++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
@@ -22,7 +22,6 @@
package org.opendc.telemetry.sdk.metrics.export
-import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricExporter
import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
@@ -54,24 +53,25 @@ public class CoroutineMetricReader(
private val job = scope.launch {
val intervalMs = exportInterval.toMillis()
- while (isActive) {
- delay(intervalMs)
+ try {
+ while (isActive) {
+ delay(intervalMs)
- val metrics = mutableListOf<MetricData>()
- for (producer in producers) {
- metrics.addAll(producer.collectAllMetrics())
- }
+ val metrics = producers.flatMap(MetricProducer::collectAllMetrics)
- try {
- val result = exporter.export(metrics)
- result.whenComplete {
- if (!result.isSuccess) {
- logger.trace { "Exporter failed" }
+ try {
+ val result = exporter.export(metrics)
+ result.whenComplete {
+ if (!result.isSuccess) {
+ logger.warn { "Exporter failed" }
+ }
}
+ } catch (cause: Throwable) {
+ logger.warn(cause) { "Exporter threw an Exception" }
}
- } catch (cause: Throwable) {
- logger.warn(cause) { "Exporter threw an Exception" }
}
+ } finally {
+ exporter.shutdown()
}
}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index 40a7ea62..96b300d7 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -41,7 +41,6 @@ import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.power.LinearPowerModel
import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import org.opendc.web.client.ApiClient
@@ -126,7 +125,7 @@ class RunnerCli : CliktCommand(name = "runner") {
/**
* Run a single scenario.
*/
- private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, topology: Topology): List<WebComputeMonitor.Result> {
+ private suspend fun runScenario(portfolio: ClientPortfolio, scenario: Scenario, topology: Topology): List<WebComputeMetricExporter.Result> {
val id = scenario.id
logger.info { "Constructing performance interference model" }
@@ -167,8 +166,8 @@ class RunnerCli : CliktCommand(name = "runner") {
topology: Topology,
workloadLoader: ComputeWorkloadLoader,
interferenceModel: VmInterferenceModel?
- ): WebComputeMonitor.Result {
- val monitor = WebComputeMonitor()
+ ): WebComputeMetricExporter.Result {
+ val exporter = WebComputeMetricExporter()
try {
runBlockingSimulation {
@@ -195,7 +194,7 @@ class RunnerCli : CliktCommand(name = "runner") {
interferenceModel.takeIf { operational.performanceInterferenceEnabled }
)
- val metricReader = CoroutineMetricReader(this, simulator.producers, ComputeMetricExporter(clock, monitor), exportInterval = Duration.ofHours(1))
+ val metricReader = CoroutineMetricReader(this, simulator.producers, exporter, exportInterval = Duration.ofHours(1))
try {
// Instantiate the topology onto the simulator
@@ -207,7 +206,7 @@ class RunnerCli : CliktCommand(name = "runner") {
metricReader.close()
}
- val serviceMetrics = collectServiceMetrics(clock.instant(), simulator.producers[0])
+ val serviceMetrics = collectServiceMetrics(simulator.producers[0])
logger.debug {
"Scheduler " +
"Success=${serviceMetrics.attemptsSuccess} " +
@@ -221,7 +220,7 @@ class RunnerCli : CliktCommand(name = "runner") {
logger.warn(cause) { "Experiment failed" }
}
- return monitor.getResult()
+ return exporter.getResult()
}
private val POLL_INTERVAL = 30000L // ms = 30 s
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
index a0c281e8..1ee835a6 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/ScenarioManager.kt
@@ -61,7 +61,7 @@ public class ScenarioManager(private val client: ApiClient) {
/**
* Persist the specified results.
*/
- public suspend fun finish(id: String, results: List<WebComputeMonitor.Result>) {
+ public suspend fun finish(id: String, results: List<WebComputeMetricExporter.Result>) {
client.updateJob(
id, SimulationState.FINISHED,
mapOf(
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt
index bb412738..7913660d 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMonitor.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebComputeMetricExporter.kt
@@ -22,6 +22,7 @@
package org.opendc.web.runner
+import org.opendc.telemetry.compute.ComputeMetricExporter
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.compute.table.HostData
import org.opendc.telemetry.compute.table.ServiceData
@@ -31,7 +32,7 @@ import kotlin.math.roundToLong
/**
* A [ComputeMonitor] that tracks the aggregate metrics for each repeat.
*/
-class WebComputeMonitor : ComputeMonitor {
+class WebComputeMetricExporter : ComputeMetricExporter() {
override fun record(data: HostData) {
val slices = data.downtime / SLICE_LENGTH
@@ -57,7 +58,7 @@ class WebComputeMonitor : ComputeMonitor {
private var hostAggregateMetrics: AggregateHostMetrics = AggregateHostMetrics()
private val hostMetrics: MutableMap<String, HostMetrics> = mutableMapOf()
- private val SLICE_LENGTH: Long = 5 * 60
+ private val SLICE_LENGTH: Long = 5 * 60L
data class AggregateHostMetrics(
val totalActiveTime: Long = 0L,