summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-25 14:06:39 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-25 18:04:36 +0200
commitf111081627280d4e7e1d7147c56cdce708e32433 (patch)
tree509c005e0b94c89eeb61cc8df0aee360c1540943 /opendc-experiments/opendc-experiments-capelin
parentac48fa12f36180de31154a7c828b4dc281dac94b (diff)
build: Upgrade to OpenTelemetry 1.5
This change upgrades the OpenTelemetry dependency to version 1.5, which contains various breaking changes in the metrics API.
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/build.gradle.kts1
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt13
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt77
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt8
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt34
6 files changed, 59 insertions, 77 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index 324cae3e..53643aba 100644
--- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -37,6 +37,7 @@ dependencies {
implementation(projects.opendcSimulator.opendcSimulatorFailures)
implementation(projects.opendcCompute.opendcComputeSimulator)
implementation(projects.opendcTelemetry.opendcTelemetrySdk)
+ implementation(libs.opentelemetry.semconv)
implementation(libs.kotlin.logging)
implementation(libs.config)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index d7df4454..4cffb8d3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -24,11 +24,7 @@ package org.opendc.experiments.capelin
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory
-import io.opentelemetry.sdk.metrics.common.InstrumentType
import io.opentelemetry.sdk.metrics.export.MetricProducer
-import io.opentelemetry.sdk.metrics.view.InstrumentSelector
-import io.opentelemetry.sdk.metrics.view.View
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import mu.KotlinLogging
@@ -298,18 +294,9 @@ suspend fun processTrace(
* Create a [MeterProvider] instance for the experiment.
*/
fun createMeterProvider(clock: Clock): MeterProvider {
- val powerSelector = InstrumentSelector.builder()
- .setInstrumentNameRegex("power\\.usage")
- .setInstrumentType(InstrumentType.VALUE_RECORDER)
- .build()
- val powerView = View.builder()
- .setAggregatorFactory(AggregatorFactory.lastValue())
- .build()
-
return SdkMeterProvider
.builder()
.setClock(clock.toOtelClock())
- .registerView(powerSelector, powerView)
.build()
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
index 7fb2f83c..16358817 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
@@ -22,10 +22,10 @@
package org.opendc.experiments.capelin.monitor
-import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricExporter
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import org.opendc.compute.service.driver.Host
import java.time.Clock
@@ -37,7 +37,7 @@ public class ExperimentMetricExporter(
private val clock: Clock,
private val hosts: Map<String, Host>
) : MetricExporter {
- private val hostKey = AttributeKey.stringKey("host")
+ private val hostKey = ResourceAttributes.HOST_ID
override fun export(metrics: Collection<MetricData>): CompletableResultCode {
val metricsByName = metrics.associateBy { it.name }
@@ -46,50 +46,31 @@ public class ExperimentMetricExporter(
return CompletableResultCode.ofSuccess()
}
+ private var lastHostMetrics: Map<String, HostMetrics> = emptyMap()
+ private val hostMetricsSingleton = HostMetrics()
+
private fun reportHostMetrics(metrics: Map<String, MetricData>) {
val hostMetrics = mutableMapOf<String, HostMetrics>()
hosts.mapValuesTo(hostMetrics) { HostMetrics() }
- mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v ->
- m.cpuDemand = v
- }
-
- mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v ->
- m.cpuUsage = v
- }
-
- mapDoubleGauge(metrics["power.usage"], hostMetrics) { m, v ->
- m.powerDraw = v
- }
-
- mapDoubleSummary(metrics["cpu.work.total"], hostMetrics) { m, v ->
- m.requestedBurst = v.toLong()
- }
-
- mapDoubleSummary(metrics["cpu.work.granted"], hostMetrics) { m, v ->
- m.grantedBurst = v.toLong()
- }
-
- mapDoubleSummary(metrics["cpu.work.overcommit"], hostMetrics) { m, v ->
- m.overcommissionedBurst = v.toLong()
- }
-
- mapDoubleSummary(metrics["cpu.work.interference"], hostMetrics) { m, v ->
- m.interferedBurst = v.toLong()
- }
-
- mapLongSum(metrics["guests.active"], hostMetrics) { m, v ->
- m.numberOfDeployedImages = v.toInt()
- }
+ mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v -> m.cpuDemand = v }
+ mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v -> m.cpuUsage = v }
+ mapDoubleGauge(metrics["power.usage"], hostMetrics) { m, v -> m.powerDraw = v }
+ mapDoubleSum(metrics["cpu.work.total"], hostMetrics) { m, v -> m.requestedBurst = v }
+ mapDoubleSum(metrics["cpu.work.granted"], hostMetrics) { m, v -> m.grantedBurst = v }
+ mapDoubleSum(metrics["cpu.work.overcommit"], hostMetrics) { m, v -> m.overcommissionedBurst = v }
+ mapDoubleSum(metrics["cpu.work.interference"], hostMetrics) { m, v -> m.interferedBurst = v }
+ mapLongSum(metrics["guests.active"], hostMetrics) { m, v -> m.numberOfDeployedImages = v.toInt() }
for ((id, hostMetric) in hostMetrics) {
+ val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton)
val host = hosts.getValue(id)
monitor.reportHostSlice(
clock.millis(),
- hostMetric.requestedBurst,
- hostMetric.grantedBurst,
- hostMetric.overcommissionedBurst,
- hostMetric.interferedBurst,
+ (hostMetric.requestedBurst - lastHostMetric.requestedBurst).toLong(),
+ (hostMetric.grantedBurst - lastHostMetric.grantedBurst).toLong(),
+ (hostMetric.overcommissionedBurst - lastHostMetric.overcommissionedBurst).toLong(),
+ (hostMetric.interferedBurst - lastHostMetric.interferedBurst).toLong(),
hostMetric.cpuUsage,
hostMetric.cpuDemand,
hostMetric.powerDraw,
@@ -97,6 +78,8 @@ public class ExperimentMetricExporter(
host
)
}
+
+ lastHostMetrics = hostMetrics
}
private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
@@ -137,6 +120,18 @@ public class ExperimentMetricExporter(
}
}
+ private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ val points = data?.doubleSumData?.points ?: emptyList()
+ for (point in points) {
+ val uid = point.attributes[hostKey]
+ val hostMetric = hostMetrics[uid]
+
+ if (hostMetric != null) {
+ block(hostMetric, point.value)
+ }
+ }
+ }
+
private fun reportProvisionerMetrics(metrics: Map<String, MetricData>) {
val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
@@ -159,10 +154,10 @@ public class ExperimentMetricExporter(
}
private class HostMetrics {
- var requestedBurst: Long = 0
- var grantedBurst: Long = 0
- var overcommissionedBurst: Long = 0
- var interferedBurst: Long = 0
+ var requestedBurst: Double = 0.0
+ var grantedBurst: Double = 0.0
+ var overcommissionedBurst: Double = 0.0
+ var interferedBurst: Double = 0.0
var cpuUsage: Double = 0.0
var cpuDemand: Double = 0.0
var numberOfDeployedImages: Int = 0
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
index 68631dee..79af88fe 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt
@@ -55,8 +55,7 @@ public interface ExperimentMonitor : AutoCloseable {
powerDraw: Double,
numberOfDeployedImages: Int,
host: Host
- ) {
- }
+ ) {}
/**
* This method is invoked for a provisioner event.
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
index bfdf5f3e..d314c6f5 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt
@@ -75,10 +75,10 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize:
5 * 60 * 1000L,
host,
numberOfDeployedImages,
- requestedBurst,
- grantedBurst,
- overcommissionedBurst,
- interferedBurst,
+ requestedBurst.toLong(),
+ grantedBurst.toLong(),
+ overcommissionedBurst.toLong(),
+ interferedBurst.toLong(),
cpuUsage,
cpuDemand,
powerDraw,
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index a3300b71..9b98b329 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -120,9 +120,9 @@ class CapelinIntegrationTest {
{ assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") },
{ assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") },
{ assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") },
- { assertEquals(219751355711, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
- { assertEquals(206351165081, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
- { assertEquals(1148906334, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
+ { assertEquals(220346369672, monitor.totalRequestedBurst) { "Incorrect requested burst" } },
+ { assertEquals(206667809431, monitor.totalGrantedBurst) { "Incorrect granted burst" } },
+ { assertEquals(1151611104, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } }
)
}
@@ -160,9 +160,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(37954956986, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(34840774250, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
- { assertEquals(971076806, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
+ { assertEquals(38051879542, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(34888186396, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(971668973, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
)
}
@@ -204,10 +204,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(37954956986, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(34840774250, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
- { assertEquals(971076806, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
- { assertEquals(13885404, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
+ { assertEquals(38051879542, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(34888186396, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(971668973, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
+ { assertEquals(13910799, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
)
}
@@ -256,9 +256,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(25336984869, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
- { assertEquals(23668547517, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
- { assertEquals(368151656, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
+ { assertEquals(25412073100, monitor.totalRequestedBurst) { "Total requested work incorrect" } },
+ { assertEquals(23695061847, monitor.totalGrantedBurst) { "Total granted work incorrect" } },
+ { assertEquals(368502468, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } }
)
}
@@ -300,10 +300,10 @@ class CapelinIntegrationTest {
numberOfDeployedImages: Int,
host: Host,
) {
- totalRequestedBurst += requestedBurst
- totalGrantedBurst += grantedBurst
- totalOvercommissionedBurst += overcommissionedBurst
- totalInterferedBurst += interferedBurst
+ totalRequestedBurst += requestedBurst.toLong()
+ totalGrantedBurst += grantedBurst.toLong()
+ totalOvercommissionedBurst += overcommissionedBurst.toLong()
+ totalInterferedBurst += interferedBurst.toLong()
}
override fun close() {}