summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-09 16:10:00 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-02-15 18:08:37 +0100
commit02c215ad57e1e4d56c54d22be58e1845bdeebf25 (patch)
tree7794b53ca3bb6fa197a118cee92114135be15def
parent48c04fb74ee170f58f292b077c62b4da237f507e (diff)
refactor: Update OpenTelemetry to version 1.11
This change updates the OpenDC codebase to use OpenTelemetry v1.11, which stabilizes the metrics API. This stabilization brings quite a few breaking changes, so significant changes are necessary inside the OpenDC codebase.
-rw-r--r--gradle/libs.versions.toml9
-rw-r--r--opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt37
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt34
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt16
-rw-r--r--opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt82
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt61
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/NoopTelemetryManager.kt36
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt135
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/TelemetryManager.kt42
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt18
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt159
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt3
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt30
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt22
-rw-r--r--opendc-telemetry/opendc-telemetry-api/build.gradle.kts3
-rw-r--r--opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt42
-rw-r--r--opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt29
-rw-r--r--opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt4
-rw-r--r--opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt49
20 files changed, 535 insertions, 278 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 16bf22dc..5fe5cc33 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -12,9 +12,9 @@ kotlinx-coroutines = "1.6.0"
ktor = "1.6.7"
log4j = "2.17.1"
mockk = "1.12.2"
-opentelemetry-main = "1.6.0"
-opentelemetry-metrics = "1.6.0-alpha"
-opentelemetry-semconv = "1.6.0-alpha"
+opentelemetry-main = "1.11.0"
+opentelemetry-metrics = "1.10.1-alpha"
+opentelemetry-semconv = "1.10.1-alpha"
parquet = "1.12.2"
progressbar = "0.9.2"
sentry = "5.5.2"
@@ -31,9 +31,8 @@ log4j-slf4j = { module = "org.apache.logging.log4j:log4j-slf4j-impl", version.re
sentry-log4j2 = { module = "io.sentry:sentry-log4j2", version.ref = "sentry" }
# Telemetry
-opentelemetry-api-main = { module = "io.opentelemetry:opentelemetry-api", version.ref = "opentelemetry-main" }
+opentelemetry-api = { module = "io.opentelemetry:opentelemetry-api", version.ref = "opentelemetry-main" }
opentelemetry-sdk-main = { module = "io.opentelemetry:opentelemetry-sdk", version.ref = "opentelemetry-main" }
-opentelemetry-api-metrics = { module = "io.opentelemetry:opentelemetry-api-metrics", version.ref = "opentelemetry-metrics" }
opentelemetry-sdk-metrics = { module = "io.opentelemetry:opentelemetry-sdk-metrics", version.ref = "opentelemetry-metrics" }
opentelemetry-semconv = { module = "io.opentelemetry:opentelemetry-semconv", version.ref = "opentelemetry-semconv" }
diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
index 292feabe..27a6ecae 100644
--- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
+++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt
@@ -123,12 +123,9 @@ internal class ComputeServiceImpl(
.setDescription("Number of scheduling attempts")
.setUnit("1")
.build()
- private val _schedulingAttemptsSuccess = _schedulingAttempts
- .bind(Attributes.of(AttributeKey.stringKey("result"), "success"))
- private val _schedulingAttemptsFailure = _schedulingAttempts
- .bind(Attributes.of(AttributeKey.stringKey("result"), "failure"))
- private val _schedulingAttemptsError = _schedulingAttempts
- .bind(Attributes.of(AttributeKey.stringKey("result"), "error"))
+ private val _schedulingAttemptsSuccessAttr = Attributes.of(AttributeKey.stringKey("result"), "success")
+ private val _schedulingAttemptsFailureAttr = Attributes.of(AttributeKey.stringKey("result"), "failure")
+ private val _schedulingAttemptsErrorAttr = Attributes.of(AttributeKey.stringKey("result"), "error")
/**
* The response time of the service.
@@ -146,8 +143,8 @@ internal class ComputeServiceImpl(
.setDescription("Number of servers managed by the scheduler")
.setUnit("1")
.build()
- private val _serversPending = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "pending"))
- private val _serversActive = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "active"))
+ private val _serversPendingAttr = Attributes.of(AttributeKey.stringKey("state"), "pending")
+ private val _serversActiveAttr = Attributes.of(AttributeKey.stringKey("state"), "active")
/**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
@@ -171,8 +168,8 @@ internal class ComputeServiceImpl(
val total = hostCount
val available = availableHosts.size.toLong()
- result.observe(available, upState)
- result.observe(total - available, downState)
+ result.record(available, upState)
+ result.record(total - available, downState)
}
meter.gaugeBuilder("system.time.provision")
@@ -336,7 +333,7 @@ internal class ComputeServiceImpl(
server.lastProvisioningTimestamp = now
queue.add(request)
- _serversPending.add(1)
+ _servers.add(1, _serversPendingAttr)
requestSchedulingCycle()
return request
}
@@ -384,7 +381,7 @@ internal class ComputeServiceImpl(
if (request.isCancelled) {
queue.poll()
- _serversPending.add(-1)
+ _servers.add(-1, _serversPendingAttr)
continue
}
@@ -396,8 +393,8 @@ internal class ComputeServiceImpl(
if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) {
// Remove the incoming image
queue.poll()
- _serversPending.add(-1)
- _schedulingAttemptsFailure.add(1)
+ _servers.add(-1, _serversPendingAttr)
+ _schedulingAttempts.add(1, _schedulingAttemptsFailureAttr)
logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" }
@@ -412,7 +409,7 @@ internal class ComputeServiceImpl(
// Remove request from queue
queue.poll()
- _serversPending.add(-1)
+ _servers.add(-1, _serversPendingAttr)
_schedulingLatency.record(now - request.submitTime, server.attributes)
logger.info { "Assigned server $server to host $host." }
@@ -429,8 +426,8 @@ internal class ComputeServiceImpl(
host.spawn(server)
activeServers[server] = host
- _serversActive.add(1)
- _schedulingAttemptsSuccess.add(1)
+ _servers.add(1, _serversActiveAttr)
+ _schedulingAttempts.add(1, _schedulingAttemptsSuccessAttr)
} catch (e: Throwable) {
logger.error(e) { "Failed to deploy VM" }
@@ -438,7 +435,7 @@ internal class ComputeServiceImpl(
hv.provisionedCores -= server.flavor.cpuCount
hv.availableMemory += server.flavor.memorySize
- _schedulingAttemptsError.add(1)
+ _schedulingAttempts.add(1, _schedulingAttemptsErrorAttr)
}
}
}
@@ -494,7 +491,7 @@ internal class ComputeServiceImpl(
logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." }
if (activeServers.remove(server) != null) {
- _serversActive.add(-1)
+ _servers.add(-1, _serversActiveAttr)
}
val hv = hostToView[host]
@@ -516,7 +513,7 @@ internal class ComputeServiceImpl(
*/
private fun collectProvisionTime(result: ObservableLongMeasurement) {
for ((_, server) in servers) {
- result.observe(server.lastProvisioningTimestamp, server.attributes)
+ result.record(server.lastProvisioningTimestamp, server.attributes)
}
}
}
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 908a58e9..95921e8b 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -150,15 +150,15 @@ public class SimHost(
meter.gaugeBuilder("system.cpu.demand")
.setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
.setUnit("MHz")
- .buildWithCallback { result -> result.observe(hypervisor.cpuDemand) }
+ .buildWithCallback { result -> result.record(hypervisor.cpuDemand) }
meter.gaugeBuilder("system.cpu.usage")
.setDescription("Amount of CPU resources used by the host")
.setUnit("MHz")
- .buildWithCallback { result -> result.observe(hypervisor.cpuUsage) }
+ .buildWithCallback { result -> result.record(hypervisor.cpuUsage) }
meter.gaugeBuilder("system.cpu.utilization")
.setDescription("Utilization of the CPU resources of the host")
.setUnit("%")
- .buildWithCallback { result -> result.observe(hypervisor.cpuUsage / _cpuLimit) }
+ .buildWithCallback { result -> result.record(hypervisor.cpuUsage / _cpuLimit) }
meter.counterBuilder("system.cpu.time")
.setDescription("Amount of CPU time spent by the host")
.setUnit("s")
@@ -166,12 +166,12 @@ public class SimHost(
meter.gaugeBuilder("system.power.usage")
.setDescription("Power usage of the host ")
.setUnit("W")
- .buildWithCallback { result -> result.observe(machine.powerUsage) }
+ .buildWithCallback { result -> result.record(machine.powerUsage) }
meter.counterBuilder("system.power.total")
.setDescription("Amount of energy used by the CPU")
.setUnit("J")
.ofDoubles()
- .buildWithCallback { result -> result.observe(machine.energyUsage) }
+ .buildWithCallback { result -> result.record(machine.energyUsage) }
meter.counterBuilder("system.time")
.setDescription("The uptime of the host")
.setUnit("s")
@@ -382,10 +382,10 @@ public class SimHost(
}
}
- result.observe(terminated, terminatedState)
- result.observe(running, runningState)
- result.observe(error, errorState)
- result.observe(invalid, invalidState)
+ result.record(terminated, terminatedState)
+ result.record(running, runningState)
+ result.record(error, errorState)
+ result.record(invalid, invalidState)
}
private val _cpuLimit = machine.model.cpus.sumOf { it.frequency }
@@ -394,7 +394,7 @@ public class SimHost(
* Helper function to collect the CPU limits of a machine.
*/
private fun collectCpuLimit(result: ObservableDoubleMeasurement) {
- result.observe(_cpuLimit)
+ result.record(_cpuLimit)
val guests = _guests
for (i in guests.indices) {
@@ -413,10 +413,10 @@ public class SimHost(
private fun collectCpuTime(result: ObservableLongMeasurement) {
val counters = hypervisor.counters
- result.observe(counters.cpuActiveTime / 1000L, _activeState)
- result.observe(counters.cpuIdleTime / 1000L, _idleState)
- result.observe(counters.cpuStealTime / 1000L, _stealState)
- result.observe(counters.cpuLostTime / 1000L, _lostState)
+ result.record(counters.cpuActiveTime / 1000L, _activeState)
+ result.record(counters.cpuIdleTime / 1000L, _idleState)
+ result.record(counters.cpuStealTime / 1000L, _stealState)
+ result.record(counters.cpuLostTime / 1000L, _lostState)
val guests = _guests
for (i in guests.indices) {
@@ -458,8 +458,8 @@ public class SimHost(
private fun collectUptime(result: ObservableLongMeasurement) {
updateUptime()
- result.observe(_uptime, _upState)
- result.observe(_downtime, _downState)
+ result.record(_uptime, _upState)
+ result.record(_downtime, _downState)
val guests = _guests
for (i in guests.indices) {
@@ -474,7 +474,7 @@ public class SimHost(
*/
private fun collectBootTime(result: ObservableLongMeasurement) {
if (_bootTime != Long.MIN_VALUE) {
- result.observe(_bootTime)
+ result.record(_bootTime)
}
val guests = _guests
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
index 9f3122db..f49c2824 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt
@@ -239,8 +239,8 @@ internal class Guest(
* Helper function to track the uptime of the guest.
*/
fun collectUptime(result: ObservableLongMeasurement) {
- result.observe(_uptime, _upState)
- result.observe(_downtime, _downState)
+ result.record(_uptime, _upState)
+ result.record(_downtime, _downState)
}
private var _bootTime = Long.MIN_VALUE
@@ -250,7 +250,7 @@ internal class Guest(
*/
fun collectBootTime(result: ObservableLongMeasurement) {
if (_bootTime != Long.MIN_VALUE) {
- result.observe(_bootTime, attributes)
+ result.record(_bootTime, attributes)
}
}
@@ -273,10 +273,10 @@ internal class Guest(
fun collectCpuTime(result: ObservableLongMeasurement) {
val counters = machine.counters
- result.observe(counters.cpuActiveTime / 1000, _activeState)
- result.observe(counters.cpuIdleTime / 1000, _idleState)
- result.observe(counters.cpuStealTime / 1000, _stealState)
- result.observe(counters.cpuLostTime / 1000, _lostState)
+ result.record(counters.cpuActiveTime / 1000, _activeState)
+ result.record(counters.cpuIdleTime / 1000, _idleState)
+ result.record(counters.cpuStealTime / 1000, _stealState)
+ result.record(counters.cpuLostTime / 1000, _lostState)
}
private val _cpuLimit = machine.model.cpus.sumOf { it.frequency }
@@ -285,7 +285,7 @@ internal class Guest(
* Helper function to collect the CPU limits of a machine.
*/
fun collectCpuLimit(result: ObservableDoubleMeasurement) {
- result.observe(_cpuLimit, attributes)
+ result.record(_cpuLimit, attributes)
}
/**
diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index 799a8cf0..dd13b60c 100644
--- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -22,9 +22,7 @@
package org.opendc.compute.simulator
-import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.export.MetricProducer
import io.opentelemetry.sdk.resources.Resource
import kotlinx.coroutines.*
import org.junit.jupiter.api.Assertions.assertEquals
@@ -83,10 +81,26 @@ internal class SimHostTest {
val hostResource = Resource.builder()
.put(HOST_ID, hostId.toString())
.build()
- val meterProvider: MeterProvider = SdkMeterProvider
+
+ // Setup metric reader
+ val duration = 5 * 60L
+ val reader = CoroutineMetricReader(
+ this,
+ object : ComputeMetricExporter() {
+ override fun record(reader: HostTableReader) {
+ activeTime += reader.cpuActiveTime
+ idleTime += reader.cpuIdleTime
+ stealTime += reader.cpuStealTime
+ }
+ },
+ exportInterval = Duration.ofSeconds(duration)
+ )
+
+ val meterProvider = SdkMeterProvider
.builder()
.setResource(hostResource)
.setClock(clock.toOtelClock())
+ .registerMetricReader(reader)
.build()
val engine = FlowEngine(coroutineContext, clock)
@@ -100,7 +114,6 @@ internal class SimHostTest {
meterProvider,
SimFairShareHypervisorProvider()
)
- val duration = 5 * 60L
val vmImageA = MockImage(
UUID.randomUUID(),
"<unnamed>",
@@ -136,19 +149,6 @@ internal class SimHostTest {
val flavor = MockFlavor(2, 0)
- // Setup metric reader
- val reader = CoroutineMetricReader(
- this, listOf(meterProvider as MetricProducer),
- object : ComputeMetricExporter() {
- override fun record(reader: HostTableReader) {
- activeTime += reader.cpuActiveTime
- idleTime += reader.cpuIdleTime
- stealTime += reader.cpuStealTime
- }
- },
- exportInterval = Duration.ofSeconds(duration)
- )
-
coroutineScope {
launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) }
launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) }
@@ -169,7 +169,7 @@ internal class SimHostTest {
// Ensure last cycle is collected
delay(1000L * duration)
virtDriver.close()
- reader.close()
+ meterProvider.close()
assertAll(
{ assertEquals(658, activeTime, "Active time does not match") },
@@ -195,10 +195,32 @@ internal class SimHostTest {
val hostResource = Resource.builder()
.put(HOST_ID, hostId.toString())
.build()
- val meterProvider: MeterProvider = SdkMeterProvider
+
+ // Setup metric reader
+ val duration = 5 * 60L
+ val reader = CoroutineMetricReader(
+ this,
+ object : ComputeMetricExporter() {
+ override fun record(reader: HostTableReader) {
+ activeTime += reader.cpuActiveTime
+ idleTime += reader.cpuIdleTime
+ uptime += reader.uptime
+ downtime += reader.downtime
+ }
+
+ override fun record(reader: ServerTableReader) {
+ guestUptime += reader.uptime
+ guestDowntime += reader.downtime
+ }
+ },
+ exportInterval = Duration.ofSeconds(duration)
+ )
+
+ val meterProvider = SdkMeterProvider
.builder()
.setResource(hostResource)
.setClock(clock.toOtelClock())
+ .registerMetricReader(reader)
.build()
val engine = FlowEngine(coroutineContext, clock)
@@ -212,7 +234,6 @@ internal class SimHostTest {
meterProvider,
SimFairShareHypervisorProvider()
)
- val duration = 5 * 60L
val image = MockImage(
UUID.randomUUID(),
"<unnamed>",
@@ -232,25 +253,6 @@ internal class SimHostTest {
val flavor = MockFlavor(2, 0)
val server = MockServer(UUID.randomUUID(), "a", flavor, image)
- // Setup metric reader
- val reader = CoroutineMetricReader(
- this, listOf(meterProvider as MetricProducer),
- object : ComputeMetricExporter() {
- override fun record(reader: HostTableReader) {
- activeTime += reader.cpuActiveTime
- idleTime += reader.cpuIdleTime
- uptime += reader.uptime
- downtime += reader.downtime
- }
-
- override fun record(reader: ServerTableReader) {
- guestUptime += reader.uptime
- guestDowntime += reader.downtime
- }
- },
- exportInterval = Duration.ofSeconds(duration)
- )
-
coroutineScope {
host.spawn(server)
delay(5000L)
@@ -273,7 +275,7 @@ internal class SimHostTest {
// Ensure last cycle is collected
delay(1000L * duration)
- reader.close()
+ meterProvider.close()
assertAll(
{ assertEquals(1175, idleTime, "Idle time does not match") },
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
index 59203b66..a1a65da3 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt
@@ -22,10 +22,6 @@
package org.opendc.compute.workload
-import io.opentelemetry.sdk.metrics.SdkMeterProvider
-import io.opentelemetry.sdk.metrics.export.MetricProducer
-import io.opentelemetry.sdk.resources.Resource
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
@@ -33,12 +29,11 @@ import kotlinx.coroutines.yield
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.ComputeScheduler
import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.workload.telemetry.TelemetryManager
import org.opendc.compute.workload.topology.HostSpec
import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.flow.FlowEngine
-import org.opendc.telemetry.compute.*
-import org.opendc.telemetry.sdk.toOtelClock
import java.time.Clock
import java.time.Duration
import java.util.*
@@ -50,6 +45,7 @@ import kotlin.math.max
*
* @param context [CoroutineContext] to run the simulation in.
* @param clock [Clock] instance tracking simulation time.
+ * @param telemetry Helper class for managing telemetry.
* @param scheduler [ComputeScheduler] implementation to use for the service.
* @param failureModel A failure model to use for injecting failures.
* @param interferenceModel The model to use for performance interference.
@@ -58,6 +54,7 @@ import kotlin.math.max
public class ComputeServiceHelper(
private val context: CoroutineContext,
private val clock: Clock,
+ private val telemetry: TelemetryManager,
scheduler: ComputeScheduler,
private val failureModel: FailureModel? = null,
private val interferenceModel: VmInterferenceModel? = null,
@@ -69,25 +66,17 @@ public class ComputeServiceHelper(
public val service: ComputeService
/**
- * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts.
- */
- public val producers: List<MetricProducer>
- get() = _metricProducers
- private val _metricProducers = mutableListOf<MetricProducer>()
-
- /**
* The [FlowEngine] to simulate the hosts.
*/
- private val engine = FlowEngine(context, clock)
+ private val _engine = FlowEngine(context, clock)
/**
* The hosts that belong to this class.
*/
- private val hosts = mutableSetOf<SimHost>()
+ private val _hosts = mutableSetOf<SimHost>()
init {
- val (service, serviceMeterProvider) = createService(scheduler, schedulingQuantum)
- this._metricProducers.add(serviceMeterProvider)
+ val service = createService(scheduler, schedulingQuantum)
this.service = service
}
@@ -165,27 +154,14 @@ public class ComputeServiceHelper(
* @return The [SimHost] that has been constructed by the runner.
*/
public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost {
- val resource = Resource.builder()
- .put(HOST_ID, spec.uid.toString())
- .put(HOST_NAME, spec.name)
- .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
- .put(HOST_NCPUS, spec.model.cpus.size)
- .put(HOST_MEM_CAPACITY, spec.model.memory.sumOf { it.size })
- .build()
-
- val meterProvider = SdkMeterProvider.builder()
- .setClock(clock.toOtelClock())
- .setResource(resource)
- .build()
- _metricProducers.add(meterProvider)
-
+ val meterProvider = telemetry.createMeterProvider(spec)
val host = SimHost(
spec.uid,
spec.name,
spec.model,
spec.meta,
context,
- engine,
+ _engine,
meterProvider,
spec.hypervisor,
powerDriver = spec.powerDriver,
@@ -193,7 +169,7 @@ public class ComputeServiceHelper(
optimize = optimize
)
- hosts.add(host)
+ _hosts.add(host)
service.addHost(host)
return host
@@ -202,27 +178,18 @@ public class ComputeServiceHelper(
override fun close() {
service.close()
- for (host in hosts) {
+ for (host in _hosts) {
host.close()
}
- hosts.clear()
+ _hosts.clear()
}
/**
* Construct a [ComputeService] instance.
*/
- private fun createService(scheduler: ComputeScheduler, schedulingQuantum: Duration): Pair<ComputeService, SdkMeterProvider> {
- val resource = Resource.builder()
- .put(ResourceAttributes.SERVICE_NAME, "opendc-compute")
- .build()
-
- val meterProvider = SdkMeterProvider.builder()
- .setClock(clock.toOtelClock())
- .setResource(resource)
- .build()
-
- val service = ComputeService(context, clock, meterProvider, scheduler, schedulingQuantum)
- return service to meterProvider
+ private fun createService(scheduler: ComputeScheduler, schedulingQuantum: Duration): ComputeService {
+ val meterProvider = telemetry.createMeterProvider(scheduler)
+ return ComputeService(context, clock, meterProvider, scheduler, schedulingQuantum)
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/NoopTelemetryManager.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/NoopTelemetryManager.kt
new file mode 100644
index 00000000..4e7d0b75
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/NoopTelemetryManager.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.telemetry
+
+import io.opentelemetry.api.metrics.MeterProvider
+import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.workload.topology.HostSpec
+
+/**
+ * A [TelemetryManager] that does nothing.
+ */
+public class NoopTelemetryManager : TelemetryManager {
+ override fun createMeterProvider(host: HostSpec): MeterProvider = MeterProvider.noop()
+
+ override fun createMeterProvider(scheduler: ComputeScheduler): MeterProvider = MeterProvider.noop()
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt
new file mode 100644
index 00000000..478c0609
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt
@@ -0,0 +1,135 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.telemetry
+
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.common.CompletableResultCode
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import io.opentelemetry.sdk.metrics.export.MetricReader
+import io.opentelemetry.sdk.metrics.export.MetricReaderFactory
+import io.opentelemetry.sdk.resources.Resource
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.workload.topology.HostSpec
+import org.opendc.telemetry.compute.*
+import org.opendc.telemetry.sdk.toOtelClock
+import java.time.Clock
+
+/**
+ * A [TelemetryManager] using the OpenTelemetry Java SDK.
+ */
+public class SdkTelemetryManager(private val clock: Clock) : TelemetryManager, AutoCloseable {
+ /**
+ * The [SdkMeterProvider]s that belong to the workload runner.
+ */
+ private val _meterProviders = mutableListOf<SdkMeterProvider>()
+
+ /**
+ * The internal [MetricProducer] registered with the runner.
+ */
+ private val _metricProducers = mutableListOf<MetricProducer>()
+
+ /**
+ * The list of [MetricReader]s that have been registered with the runner.
+ */
+ private val _metricReaders = mutableListOf<MetricReader>()
+
+ /**
+ * A [MetricProducer] that combines all the other metric producers.
+ */
+ public val metricProducer: MetricProducer = object : MetricProducer {
+ private val producers = _metricProducers
+
+ override fun collectAllMetrics(): Collection<MetricData> = producers.flatMap(MetricProducer::collectAllMetrics)
+
+ override fun toString(): String = "SdkTelemetryManager.AggregateMetricProducer"
+ }
+
+ /**
+ * Register a [MetricReader] for this manager.
+ *
+ * @param factory The factory for the reader to register.
+ */
+ public fun registerMetricReader(factory: MetricReaderFactory) {
+ val reader = factory.apply(metricProducer)
+ _metricReaders.add(reader)
+ }
+
+ override fun createMeterProvider(scheduler: ComputeScheduler): MeterProvider {
+ val resource = Resource.builder()
+ .put(ResourceAttributes.SERVICE_NAME, "opendc-compute")
+ .build()
+
+ return createMeterProvider(resource)
+ }
+
+ override fun createMeterProvider(host: HostSpec): MeterProvider {
+ val resource = Resource.builder()
+ .put(HOST_ID, host.uid.toString())
+ .put(HOST_NAME, host.name)
+ .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64)
+ .put(HOST_NCPUS, host.model.cpus.size)
+ .put(HOST_MEM_CAPACITY, host.model.memory.sumOf { it.size })
+ .build()
+
+ return createMeterProvider(resource)
+ }
+
+ /**
+ * Construct a [SdkMeterProvider] for the specified [resource].
+ */
+ private fun createMeterProvider(resource: Resource): SdkMeterProvider {
+ val meterProvider = SdkMeterProvider.builder()
+ .setClock(clock.toOtelClock())
+ .setResource(resource)
+ .registerMetricReader { producer ->
+ _metricProducers.add(producer)
+ object : MetricReader {
+ override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE
+ override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
+ override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
+ }
+ }
+ .build()
+ _meterProviders.add(meterProvider)
+ return meterProvider
+ }
+
+ override fun close() {
+ for (meterProvider in _meterProviders) {
+ meterProvider.close()
+ }
+
+ _meterProviders.clear()
+
+ for (metricReader in _metricReaders) {
+ metricReader.shutdown()
+ }
+
+ _metricReaders.clear()
+ _metricProducers.clear()
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/TelemetryManager.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/TelemetryManager.kt
new file mode 100644
index 00000000..b67050ce
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/TelemetryManager.kt
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.telemetry
+
+import io.opentelemetry.api.metrics.MeterProvider
+import org.opendc.compute.service.scheduler.ComputeScheduler
+import org.opendc.compute.workload.topology.HostSpec
+
+/**
+ * Helper class to manage the telemetry for a [ComputeServiceHelper] instance.
+ */
+public interface TelemetryManager {
+ /**
+ * Construct a [MeterProvider] for the specified [ComputeScheduler].
+ */
+ public fun createMeterProvider(scheduler: ComputeScheduler): MeterProvider
+
+ /**
+ * Construct a [MeterProvider] for the specified [HostSpec].
+ */
+ public fun createMeterProvider(host: HostSpec): MeterProvider
+}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
index 4b35de95..bb9cb201 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt
@@ -29,6 +29,7 @@ import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.compute.workload.*
+import org.opendc.compute.workload.telemetry.NoopTelemetryManager
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
import org.opendc.experiments.capelin.topology.clusterTopology
@@ -70,6 +71,7 @@ class CapelinBenchmarks {
val runner = ComputeServiceHelper(
coroutineContext,
clock,
+ NoopTelemetryManager(),
computeScheduler
)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index b548ae58..6604a190 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -29,6 +29,7 @@ import org.opendc.compute.workload.ComputeWorkloadLoader
import org.opendc.compute.workload.createComputeScheduler
import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter
import org.opendc.compute.workload.grid5000
+import org.opendc.compute.workload.telemetry.SdkTelemetryManager
import org.opendc.compute.workload.topology.apply
import org.opendc.compute.workload.util.VmInterferenceModelReader
import org.opendc.experiments.capelin.model.OperationalPhenomena
@@ -38,7 +39,6 @@ import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.harness.dsl.Experiment
import org.opendc.harness.dsl.anyOf
import org.opendc.simulator.core.runBlockingSimulation
-import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.time.Duration
@@ -109,9 +109,11 @@ abstract class Portfolio(name: String) : Experiment(name) {
grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong()))
else
null
+ val telemetry = SdkTelemetryManager(clock)
val runner = ComputeServiceHelper(
coroutineContext,
clock,
+ telemetry,
computeScheduler,
failureModel,
performanceInterferenceModel?.withSeed(repeat.toLong())
@@ -122,7 +124,8 @@ abstract class Portfolio(name: String) : Experiment(name) {
"portfolio_id=$name/scenario_id=$id/run_id=$repeat",
4096
)
- val metricReader = CoroutineMetricReader(this, runner.producers, exporter)
+ telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
+
val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt"))
try {
@@ -133,17 +136,6 @@ abstract class Portfolio(name: String) : Experiment(name) {
runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong())
} finally {
runner.close()
- metricReader.close()
- }
-
- val monitorResults = collectServiceMetrics(runner.producers[0])
- logger.debug {
- "Scheduler " +
- "Success=${monitorResults.attemptsSuccess} " +
- "Failure=${monitorResults.attemptsFailure} " +
- "Error=${monitorResults.attemptsError} " +
- "Pending=${monitorResults.serversPending} " +
- "Active=${monitorResults.serversActive}"
}
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index eedc3131..aefd8304 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -32,17 +32,20 @@ import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
import org.opendc.compute.workload.*
+import org.opendc.compute.workload.telemetry.SdkTelemetryManager
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
import org.opendc.compute.workload.util.VmInterferenceModelReader
import org.opendc.experiments.capelin.topology.clusterTopology
import org.opendc.simulator.core.runBlockingSimulation
import org.opendc.telemetry.compute.ComputeMetricExporter
-import org.opendc.telemetry.compute.collectServiceMetrics
import org.opendc.telemetry.compute.table.HostTableReader
+import org.opendc.telemetry.compute.table.ServiceData
+import org.opendc.telemetry.compute.table.ServiceTableReader
import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
import java.io.File
import java.time.Duration
+import java.time.Instant
import java.util.*
/**
@@ -83,44 +86,47 @@ class CapelinIntegrationTest {
@Test
fun testLarge() = runBlockingSimulation {
val workload = createTestWorkload(1.0)
+ val telemetry = SdkTelemetryManager(clock)
val runner = ComputeServiceHelper(
coroutineContext,
clock,
+ telemetry,
computeScheduler
)
val topology = createTopology()
- val metricReader = CoroutineMetricReader(this, runner.producers, exporter)
+
+ telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
try {
runner.apply(topology)
runner.run(workload, 0)
+
+ val serviceMetrics = exporter.serviceMetrics
+ println(
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
+ )
+
+ // Note that these values have been verified beforehand
+ assertAll(
+ { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") },
+ { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
+ { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
+ { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
+ { assertEquals(223388307, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
+ { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
+ { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
+ { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } },
+ { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
+ )
} finally {
runner.close()
- metricReader.close()
+ telemetry.close()
}
-
- val serviceMetrics = collectServiceMetrics(runner.producers[0])
- println(
- "Scheduler " +
- "Success=${serviceMetrics.attemptsSuccess} " +
- "Failure=${serviceMetrics.attemptsFailure} " +
- "Error=${serviceMetrics.attemptsError} " +
- "Pending=${serviceMetrics.serversPending} " +
- "Active=${serviceMetrics.serversActive}"
- )
-
- // Note that these values have been verified beforehand
- assertAll(
- { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") },
- { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
- { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
- { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
- { assertEquals(223388307, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
- { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
- { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
- { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } },
- { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
- )
}
/**
@@ -130,33 +136,34 @@ class CapelinIntegrationTest {
fun testSmall() = runBlockingSimulation {
val seed = 1
val workload = createTestWorkload(0.25, seed)
-
- val simulator = ComputeServiceHelper(
+ val telemetry = SdkTelemetryManager(clock)
+ val runner = ComputeServiceHelper(
coroutineContext,
clock,
+ telemetry,
computeScheduler
)
val topology = createTopology("single")
- val metricReader = CoroutineMetricReader(this, simulator.producers, exporter)
+
+ telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
try {
- simulator.apply(topology)
- simulator.run(workload, seed.toLong())
+ runner.apply(topology)
+ runner.run(workload, seed.toLong())
+
+ println(
+ "Scheduler " +
+ "Success=${exporter.serviceMetrics.attemptsSuccess} " +
+ "Failure=${exporter.serviceMetrics.attemptsFailure} " +
+ "Error=${exporter.serviceMetrics.attemptsError} " +
+ "Pending=${exporter.serviceMetrics.serversPending} " +
+ "Active=${exporter.serviceMetrics.serversActive}"
+ )
} finally {
- simulator.close()
- metricReader.close()
+ runner.close()
+ telemetry.close()
}
- val serviceMetrics = collectServiceMetrics(simulator.producers[0])
- println(
- "Scheduler " +
- "Success=${serviceMetrics.attemptsSuccess} " +
- "Failure=${serviceMetrics.attemptsFailure} " +
- "Error=${serviceMetrics.attemptsError} " +
- "Pending=${serviceMetrics.serversPending} " +
- "Active=${serviceMetrics.serversActive}"
- )
-
// Note that these values have been verified beforehand
assertAll(
{ assertEquals(10999208, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
@@ -180,33 +187,35 @@ class CapelinIntegrationTest {
.read(perfInterferenceInput)
.withSeed(seed.toLong())
+ val telemetry = SdkTelemetryManager(clock)
val simulator = ComputeServiceHelper(
coroutineContext,
clock,
+ telemetry,
computeScheduler,
interferenceModel = performanceInterferenceModel
)
val topology = createTopology("single")
- val metricReader = CoroutineMetricReader(this, simulator.producers, exporter)
+
+ telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
try {
simulator.apply(topology)
simulator.run(workload, seed.toLong())
+
+ println(
+ "Scheduler " +
+ "Success=${exporter.serviceMetrics.attemptsSuccess} " +
+ "Failure=${exporter.serviceMetrics.attemptsFailure} " +
+ "Error=${exporter.serviceMetrics.attemptsError} " +
+ "Pending=${exporter.serviceMetrics.serversPending} " +
+ "Active=${exporter.serviceMetrics.serversActive}"
+ )
} finally {
simulator.close()
- metricReader.close()
+ telemetry.close()
}
- val serviceMetrics = collectServiceMetrics(simulator.producers[0])
- println(
- "Scheduler " +
- "Success=${serviceMetrics.attemptsSuccess} " +
- "Failure=${serviceMetrics.attemptsFailure} " +
- "Error=${serviceMetrics.attemptsError} " +
- "Pending=${serviceMetrics.serversPending} " +
- "Active=${serviceMetrics.serversActive}"
- )
-
// Note that these values have been verified beforehand
assertAll(
{ assertEquals(6027666, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
@@ -222,34 +231,36 @@ class CapelinIntegrationTest {
@Test
fun testFailures() = runBlockingSimulation {
val seed = 1
+ val telemetry = SdkTelemetryManager(clock)
val simulator = ComputeServiceHelper(
coroutineContext,
clock,
+ telemetry,
computeScheduler,
grid5000(Duration.ofDays(7))
)
val topology = createTopology("single")
val workload = createTestWorkload(0.25, seed)
- val metricReader = CoroutineMetricReader(this, simulator.producers, exporter)
+
+ telemetry.registerMetricReader(CoroutineMetricReader(this, exporter))
try {
simulator.apply(topology)
simulator.run(workload, seed.toLong())
+
+ println(
+ "Scheduler " +
+ "Success=${exporter.serviceMetrics.attemptsSuccess} " +
+ "Failure=${exporter.serviceMetrics.attemptsFailure} " +
+ "Error=${exporter.serviceMetrics.attemptsError} " +
+ "Pending=${exporter.serviceMetrics.serversPending} " +
+ "Active=${exporter.serviceMetrics.serversActive}"
+ )
} finally {
simulator.close()
- metricReader.close()
+ telemetry.close()
}
- val serviceMetrics = collectServiceMetrics(simulator.producers[0])
- println(
- "Scheduler " +
- "Success=${serviceMetrics.attemptsSuccess} " +
- "Failure=${serviceMetrics.attemptsFailure} " +
- "Error=${serviceMetrics.attemptsError} " +
- "Pending=${serviceMetrics.serversPending} " +
- "Active=${serviceMetrics.serversActive}"
- )
-
// Note that these values have been verified beforehand
assertAll(
{ assertEquals(10866961, exporter.idleTime) { "Idle time incorrect" } },
@@ -277,6 +288,7 @@ class CapelinIntegrationTest {
}
class TestComputeMetricExporter : ComputeMetricExporter() {
+ var serviceMetrics: ServiceData = ServiceData(Instant.ofEpochMilli(0), 0, 0, 0, 0, 0, 0, 0)
var idleTime = 0L
var activeTime = 0L
var stealTime = 0L
@@ -284,6 +296,19 @@ class CapelinIntegrationTest {
var energyUsage = 0.0
var uptime = 0L
+ override fun record(reader: ServiceTableReader) {
+ serviceMetrics = ServiceData(
+ reader.timestamp,
+ reader.hostsUp,
+ reader.hostsDown,
+ reader.serversPending,
+ reader.serversActive,
+ reader.attemptsSuccess,
+ reader.attemptsFailure,
+ reader.attemptsError
+ )
+ }
+
override fun record(reader: HostTableReader) {
idleTime += reader.cpuIdleTime
activeTime += reader.cpuActiveTime
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index 1752802f..c751463d 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -23,7 +23,6 @@
package org.opendc.experiments.tf20.core
import io.opentelemetry.api.common.AttributeKey
-import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.metrics.Meter
import kotlinx.coroutines.*
import org.opendc.simulator.compute.SimBareMetalMachine
@@ -82,7 +81,6 @@ public class SimTFDevice(
.setDescription("The amount of device resources used")
.setUnit("MHz")
.build()
- .bind(Attributes.of(deviceId, uid.toString()))
/**
* The power draw of the device.
@@ -91,7 +89,6 @@ public class SimTFDevice(
.setDescription("The power draw of the device")
.setUnit("W")
.build()
- .bind(Attributes.of(deviceId, uid.toString()))
/**
* The workload that will be run by the device.
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
index 54df2b59..836231c8 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
@@ -24,9 +24,9 @@ package org.opendc.faas.service
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
-import io.opentelemetry.api.metrics.BoundLongCounter
-import io.opentelemetry.api.metrics.BoundLongHistogram
-import io.opentelemetry.api.metrics.BoundLongUpDownCounter
+import io.opentelemetry.api.metrics.LongCounter
+import io.opentelemetry.api.metrics.LongHistogram
+import io.opentelemetry.api.metrics.LongUpDownCounter
import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import org.opendc.faas.service.deployer.FunctionInstance
@@ -56,76 +56,68 @@ public class FunctionObject(
/**
* The total amount of function invocations received by the function.
*/
- public val invocations: BoundLongCounter = meter.counterBuilder("function.invocations.total")
+ public val invocations: LongCounter = meter.counterBuilder("function.invocations.total")
.setDescription("Number of function invocations")
.setUnit("1")
.build()
- .bind(attributes)
/**
* The amount of function invocations that could be handled directly.
*/
- public val timelyInvocations: BoundLongCounter = meter.counterBuilder("function.invocations.warm")
+ public val timelyInvocations: LongCounter = meter.counterBuilder("function.invocations.warm")
.setDescription("Number of function invocations handled directly")
.setUnit("1")
.build()
- .bind(attributes)
/**
* The amount of function invocations that were delayed due to function deployment.
*/
- public val delayedInvocations: BoundLongCounter = meter.counterBuilder("function.invocations.cold")
+ public val delayedInvocations: LongCounter = meter.counterBuilder("function.invocations.cold")
.setDescription("Number of function invocations that are delayed")
.setUnit("1")
.build()
- .bind(attributes)
/**
* The amount of function invocations that failed.
*/
- public val failedInvocations: BoundLongCounter = meter.counterBuilder("function.invocations.failed")
+ public val failedInvocations: LongCounter = meter.counterBuilder("function.invocations.failed")
.setDescription("Number of function invocations that failed")
.setUnit("1")
.build()
- .bind(attributes)
/**
* The amount of instances for this function.
*/
- public val activeInstances: BoundLongUpDownCounter = meter.upDownCounterBuilder("function.instances.active")
+ public val activeInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.active")
.setDescription("Number of active function instances")
.setUnit("1")
.build()
- .bind(attributes)
/**
* The amount of idle instances for this function.
*/
- public val idleInstances: BoundLongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle")
+ public val idleInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle")
.setDescription("Number of idle function instances")
.setUnit("1")
.build()
- .bind(attributes)
/**
* The time that the function waited.
*/
- public val waitTime: BoundLongHistogram = meter.histogramBuilder("function.time.wait")
+ public val waitTime: LongHistogram = meter.histogramBuilder("function.time.wait")
.ofLongs()
.setDescription("Time the function has to wait before being started")
.setUnit("ms")
.build()
- .bind(attributes)
/**
* The time that the function was running.
*/
- public val activeTime: BoundLongHistogram = meter.histogramBuilder("function.time.active")
+ public val activeTime: LongHistogram = meter.histogramBuilder("function.time.active")
.ofLongs()
.setDescription("Time the function was running")
.setUnit("ms")
.build()
- .bind(attributes)
/**
* The instances associated with this function.
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
index 3b560cd3..c285585a 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
@@ -226,7 +226,7 @@ internal class FaaSServiceImpl(
val instance = if (activeInstance != null) {
_timelyInvocations.add(1)
- function.timelyInvocations.add(1)
+ function.timelyInvocations.add(1, function.attributes)
activeInstance
} else {
@@ -234,29 +234,29 @@ internal class FaaSServiceImpl(
instances.add(instance)
terminationPolicy.enqueue(instance)
- function.idleInstances.add(1)
+ function.idleInstances.add(1, function.attributes)
_delayedInvocations.add(1)
- function.delayedInvocations.add(1)
+ function.delayedInvocations.add(1, function.attributes)
instance
}
suspend {
val start = clock.millis()
- function.waitTime.record(start - submitTime)
- function.idleInstances.add(-1)
- function.activeInstances.add(1)
+ function.waitTime.record(start - submitTime, function.attributes)
+ function.idleInstances.add(-1, function.attributes)
+ function.activeInstances.add(1, function.attributes)
try {
instance.invoke()
} catch (e: Throwable) {
logger.debug(e) { "Function invocation failed" }
- function.failedInvocations.add(1)
+ function.failedInvocations.add(1, function.attributes)
} finally {
val end = clock.millis()
- function.activeTime.record(end - start)
- function.idleInstances.add(1)
- function.activeInstances.add(-1)
+ function.activeTime.record(end - start, function.attributes)
+ function.idleInstances.add(1, function.attributes)
+ function.activeInstances.add(-1, function.attributes)
}
}.startCoroutineCancellable(cont)
}
@@ -269,7 +269,7 @@ internal class FaaSServiceImpl(
check(function.uid in functions) { "Function does not exist (anymore)" }
_invocations.add(1)
- function.invocations.add(1)
+ function.invocations.add(1, function.attributes)
return suspendCancellableCoroutine { cont ->
if (!queue.add(InvocationRequest(clock.millis(), function, cont))) {
diff --git a/opendc-telemetry/opendc-telemetry-api/build.gradle.kts b/opendc-telemetry/opendc-telemetry-api/build.gradle.kts
index c544b7d6..5492fc14 100644
--- a/opendc-telemetry/opendc-telemetry-api/build.gradle.kts
+++ b/opendc-telemetry/opendc-telemetry-api/build.gradle.kts
@@ -29,6 +29,5 @@ plugins {
dependencies {
api(platform(projects.opendcPlatform))
- api(libs.opentelemetry.api.main)
- api(libs.opentelemetry.api.metrics)
+ api(libs.opentelemetry.api)
}
diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
index 1de235e7..a9290c47 100644
--- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
+++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
@@ -22,11 +22,16 @@
package org.opendc.telemetry.sdk.metrics.export
+import io.opentelemetry.sdk.common.CompletableResultCode
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality
import io.opentelemetry.sdk.metrics.export.MetricExporter
import io.opentelemetry.sdk.metrics.export.MetricProducer
+import io.opentelemetry.sdk.metrics.export.MetricReader
+import io.opentelemetry.sdk.metrics.export.MetricReaderFactory
import kotlinx.coroutines.*
import mu.KotlinLogging
import java.time.Duration
+import java.util.*
/**
* A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every
@@ -35,16 +40,16 @@ import java.time.Duration
* The reader runs in a [CoroutineScope] which enables collection of metrics in environments with a custom clock.
*
* @param scope The [CoroutineScope] to run the reader in.
- * @param producers The metric producers to gather metrics from.
+ * @param producer The metric producer to gather metrics from.
* @param exporter The export to export the metrics to.
* @param exportInterval The export interval.
*/
-public class CoroutineMetricReader(
+public class CoroutineMetricReader private constructor(
scope: CoroutineScope,
- private val producers: List<MetricProducer>,
+ private val producer: MetricProducer,
private val exporter: MetricExporter,
- private val exportInterval: Duration = Duration.ofMinutes(5)
-) : AutoCloseable {
+ private val exportInterval: Duration
+) : MetricReader {
private val logger = KotlinLogging.logger {}
/**
@@ -57,9 +62,8 @@ public class CoroutineMetricReader(
while (isActive) {
delay(intervalMs)
- val metrics = producers.flatMap(MetricProducer::collectAllMetrics)
-
try {
+ val metrics = producer.collectAllMetrics()
val result = exporter.export(metrics)
result.whenComplete {
if (!result.isSuccess) {
@@ -75,7 +79,29 @@ public class CoroutineMetricReader(
}
}
- override fun close() {
+ override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE
+
+ override fun flush(): CompletableResultCode {
+ return exporter.flush()
+ }
+
+ override fun shutdown(): CompletableResultCode {
job.cancel()
+ return CompletableResultCode.ofSuccess()
+ }
+
+ public companion object {
+ /**
+ * Construct a [MetricReaderFactory] for this metric reader.
+ */
+ public operator fun invoke(
+ scope: CoroutineScope,
+ exporter: MetricExporter,
+ exportInterval: Duration = Duration.ofMinutes(5)
+ ): MetricReaderFactory {
+ return MetricReaderFactory { producer ->
+ CoroutineMetricReader(scope, producer, exporter, exportInterval)
+ }
+ }
}
}
diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
index 5d6bc37f..8f4e9d6d 100644
--- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
+++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt
@@ -29,6 +29,7 @@ import com.github.ajalt.clikt.parameters.types.long
import kotlinx.coroutines.*
import mu.KotlinLogging
import org.opendc.compute.workload.*
+import org.opendc.compute.workload.telemetry.SdkTelemetryManager
import org.opendc.compute.workload.topology.HostSpec
import org.opendc.compute.workload.topology.Topology
import org.opendc.compute.workload.topology.apply
@@ -185,34 +186,36 @@ class RunnerCli : CliktCommand(name = "runner") {
else
null
+ val telemetry = SdkTelemetryManager(clock)
val simulator = ComputeServiceHelper(
coroutineContext,
clock,
+ telemetry,
computeScheduler,
failureModel,
interferenceModel.takeIf { operational.performanceInterferenceEnabled }
)
- val metricReader = CoroutineMetricReader(this, simulator.producers, exporter, exportInterval = Duration.ofHours(1))
+ telemetry.registerMetricReader(CoroutineMetricReader(this, exporter, exportInterval = Duration.ofHours(1)))
try {
// Instantiate the topology onto the simulator
simulator.apply(topology)
- // Converge workload trace
+ // Run workload trace
simulator.run(workload.resolve(workloadLoader, seeder), seeder.nextLong())
+
+ val serviceMetrics = collectServiceMetrics(telemetry.metricProducer)
+ logger.debug {
+ "Scheduler " +
+ "Success=${serviceMetrics.attemptsSuccess} " +
+ "Failure=${serviceMetrics.attemptsFailure} " +
+ "Error=${serviceMetrics.attemptsError} " +
+ "Pending=${serviceMetrics.serversPending} " +
+ "Active=${serviceMetrics.serversActive}"
+ }
} finally {
simulator.close()
- metricReader.close()
- }
-
- val serviceMetrics = collectServiceMetrics(simulator.producers[0])
- logger.debug {
- "Scheduler " +
- "Success=${serviceMetrics.attemptsSuccess} " +
- "Failure=${serviceMetrics.attemptsFailure} " +
- "Error=${serviceMetrics.attemptsError} " +
- "Pending=${serviceMetrics.serversPending} " +
- "Active=${serviceMetrics.serversActive}"
+ telemetry.close()
}
}
} catch (cause: Throwable) {
diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
index 214d5135..1fd332b9 100644
--- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
+++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt
@@ -33,6 +33,7 @@ import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.VCpuWeigher
import org.opendc.compute.workload.ComputeServiceHelper
+import org.opendc.compute.workload.telemetry.NoopTelemetryManager
import org.opendc.compute.workload.topology.HostSpec
import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider
import org.opendc.simulator.compute.model.MachineModel
@@ -70,7 +71,8 @@ internal class WorkflowServiceTest {
filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)),
weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0))
)
- val computeHelper = ComputeServiceHelper(coroutineContext, clock, computeScheduler, schedulingQuantum = Duration.ofSeconds(1))
+
+ val computeHelper = ComputeServiceHelper(coroutineContext, clock, NoopTelemetryManager(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1))
repeat(HOST_COUNT) { computeHelper.registerHost(createHostSpec(it)) }
diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
index 0198900f..a7d0ed6c 100644
--- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
+++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt
@@ -22,8 +22,13 @@
package org.opendc.workflow.workload
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality
import io.opentelemetry.sdk.metrics.export.MetricProducer
+import io.opentelemetry.sdk.metrics.export.MetricReader
+import io.opentelemetry.sdk.metrics.export.MetricReaderFactory
import io.opentelemetry.sdk.resources.Resource
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import kotlinx.coroutines.coroutineScope
@@ -34,6 +39,7 @@ import org.opendc.telemetry.sdk.toOtelClock
import org.opendc.workflow.api.Job
import org.opendc.workflow.service.WorkflowService
import java.time.Clock
+import java.util.*
import kotlin.coroutines.CoroutineContext
/**
@@ -58,23 +64,47 @@ public class WorkflowServiceHelper(
/**
* The [MetricProducer] exposed by the [WorkflowService].
*/
- public val metricProducer: MetricProducer
+ public lateinit var metricProducer: MetricProducer
+ private set
+
+ /**
+ * The [MeterProvider] used for the service.
+ */
+ private val _meterProvider: SdkMeterProvider
+
+ /**
+ * The list of [MetricReader]s that have been registered with the runner.
+ */
+ private val _metricReaders = mutableListOf<MetricReader>()
init {
val resource = Resource.builder()
.put(ResourceAttributes.SERVICE_NAME, "opendc-workflow")
.build()
- val meterProvider = SdkMeterProvider.builder()
+ _meterProvider = SdkMeterProvider.builder()
.setClock(clock.toOtelClock())
.setResource(resource)
+ .registerMetricReader { producer ->
+ metricProducer = producer
+
+ val metricReaders = _metricReaders
+ object : MetricReader {
+ override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE
+ override fun flush(): CompletableResultCode {
+ return CompletableResultCode.ofAll(metricReaders.map { it.flush() })
+ }
+ override fun shutdown(): CompletableResultCode {
+ return CompletableResultCode.ofAll(metricReaders.map { it.shutdown() })
+ }
+ }
+ }
.build()
- metricProducer = meterProvider
service = WorkflowService(
context,
clock,
- meterProvider,
+ _meterProvider,
computeClient,
schedulerSpec.schedulingQuantum,
jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy,
@@ -116,8 +146,19 @@ public class WorkflowServiceHelper(
}
}
+ /**
+ * Register a [MetricReader] for this helper.
+ *
+ * @param factory The factory for the reader to register.
+ */
+ public fun registerMetricReader(factory: MetricReaderFactory) {
+ val reader = factory.apply(metricProducer)
+ _metricReaders.add(reader)
+ }
+
override fun close() {
computeClient.close()
service.close()
+ _meterProvider.close()
}
}