diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-09 16:10:00 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-02-15 18:08:37 +0100 |
| commit | 02c215ad57e1e4d56c54d22be58e1845bdeebf25 (patch) | |
| tree | 7794b53ca3bb6fa197a118cee92114135be15def | |
| parent | 48c04fb74ee170f58f292b077c62b4da237f507e (diff) | |
refactor: Update OpenTelemetry to version 1.11
This change updates the OpenDC codebase to use OpenTelemetry v1.11,
which stabilizes the metrics API. This stabilization brings quite a few
breaking changes, so significant changes are necessary inside the OpenDC
codebase.
20 files changed, 535 insertions, 278 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 16bf22dc..5fe5cc33 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -12,9 +12,9 @@ kotlinx-coroutines = "1.6.0" ktor = "1.6.7" log4j = "2.17.1" mockk = "1.12.2" -opentelemetry-main = "1.6.0" -opentelemetry-metrics = "1.6.0-alpha" -opentelemetry-semconv = "1.6.0-alpha" +opentelemetry-main = "1.11.0" +opentelemetry-metrics = "1.10.1-alpha" +opentelemetry-semconv = "1.10.1-alpha" parquet = "1.12.2" progressbar = "0.9.2" sentry = "5.5.2" @@ -31,9 +31,8 @@ log4j-slf4j = { module = "org.apache.logging.log4j:log4j-slf4j-impl", version.re sentry-log4j2 = { module = "io.sentry:sentry-log4j2", version.ref = "sentry" } # Telemetry -opentelemetry-api-main = { module = "io.opentelemetry:opentelemetry-api", version.ref = "opentelemetry-main" } +opentelemetry-api = { module = "io.opentelemetry:opentelemetry-api", version.ref = "opentelemetry-main" } opentelemetry-sdk-main = { module = "io.opentelemetry:opentelemetry-sdk", version.ref = "opentelemetry-main" } -opentelemetry-api-metrics = { module = "io.opentelemetry:opentelemetry-api-metrics", version.ref = "opentelemetry-metrics" } opentelemetry-sdk-metrics = { module = "io.opentelemetry:opentelemetry-sdk-metrics", version.ref = "opentelemetry-metrics" } opentelemetry-semconv = { module = "io.opentelemetry:opentelemetry-semconv", version.ref = "opentelemetry-semconv" } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index 292feabe..27a6ecae 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -123,12 +123,9 @@ internal class ComputeServiceImpl( .setDescription("Number of scheduling attempts") .setUnit("1") .build() - private val _schedulingAttemptsSuccess = _schedulingAttempts - .bind(Attributes.of(AttributeKey.stringKey("result"), "success")) - private val _schedulingAttemptsFailure = _schedulingAttempts - .bind(Attributes.of(AttributeKey.stringKey("result"), "failure")) - private val _schedulingAttemptsError = _schedulingAttempts - .bind(Attributes.of(AttributeKey.stringKey("result"), "error")) + private val _schedulingAttemptsSuccessAttr = Attributes.of(AttributeKey.stringKey("result"), "success") + private val _schedulingAttemptsFailureAttr = Attributes.of(AttributeKey.stringKey("result"), "failure") + private val _schedulingAttemptsErrorAttr = Attributes.of(AttributeKey.stringKey("result"), "error") /** * The response time of the service. @@ -146,8 +143,8 @@ internal class ComputeServiceImpl( .setDescription("Number of servers managed by the scheduler") .setUnit("1") .build() - private val _serversPending = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "pending")) - private val _serversActive = _servers.bind(Attributes.of(AttributeKey.stringKey("state"), "active")) + private val _serversPendingAttr = Attributes.of(AttributeKey.stringKey("state"), "pending") + private val _serversActiveAttr = Attributes.of(AttributeKey.stringKey("state"), "active") /** * The [TimerScheduler] to use for scheduling the scheduler cycles. @@ -171,8 +168,8 @@ internal class ComputeServiceImpl( val total = hostCount val available = availableHosts.size.toLong() - result.observe(available, upState) - result.observe(total - available, downState) + result.record(available, upState) + result.record(total - available, downState) } meter.gaugeBuilder("system.time.provision") @@ -336,7 +333,7 @@ internal class ComputeServiceImpl( server.lastProvisioningTimestamp = now queue.add(request) - _serversPending.add(1) + _servers.add(1, _serversPendingAttr) requestSchedulingCycle() return request } @@ -384,7 +381,7 @@ internal class ComputeServiceImpl( if (request.isCancelled) { queue.poll() - _serversPending.add(-1) + _servers.add(-1, _serversPendingAttr) continue } @@ -396,8 +393,8 @@ internal class ComputeServiceImpl( if (server.flavor.memorySize > maxMemory || server.flavor.cpuCount > maxCores) { // Remove the incoming image queue.poll() - _serversPending.add(-1) - _schedulingAttemptsFailure.add(1) + _servers.add(-1, _serversPendingAttr) + _schedulingAttempts.add(1, _schedulingAttemptsFailureAttr) logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" } @@ -412,7 +409,7 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() - _serversPending.add(-1) + _servers.add(-1, _serversPendingAttr) _schedulingLatency.record(now - request.submitTime, server.attributes) logger.info { "Assigned server $server to host $host." } @@ -429,8 +426,8 @@ internal class ComputeServiceImpl( host.spawn(server) activeServers[server] = host - _serversActive.add(1) - _schedulingAttemptsSuccess.add(1) + _servers.add(1, _serversActiveAttr) + _schedulingAttempts.add(1, _schedulingAttemptsSuccessAttr) } catch (e: Throwable) { logger.error(e) { "Failed to deploy VM" } @@ -438,7 +435,7 @@ internal class ComputeServiceImpl( hv.provisionedCores -= server.flavor.cpuCount hv.availableMemory += server.flavor.memorySize - _schedulingAttemptsError.add(1) + _schedulingAttempts.add(1, _schedulingAttemptsErrorAttr) } } } @@ -494,7 +491,7 @@ internal class ComputeServiceImpl( logger.info { "[${clock.instant()}] Server ${server.uid} ${server.name} ${server.flavor} finished." } if (activeServers.remove(server) != null) { - _serversActive.add(-1) + _servers.add(-1, _serversActiveAttr) } val hv = hostToView[host] @@ -516,7 +513,7 @@ internal class ComputeServiceImpl( */ private fun collectProvisionTime(result: ObservableLongMeasurement) { for ((_, server) in servers) { - result.observe(server.lastProvisioningTimestamp, server.attributes) + result.record(server.lastProvisioningTimestamp, server.attributes) } } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 908a58e9..95921e8b 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -150,15 +150,15 @@ public class SimHost( meter.gaugeBuilder("system.cpu.demand") .setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits") .setUnit("MHz") - .buildWithCallback { result -> result.observe(hypervisor.cpuDemand) } + .buildWithCallback { result -> result.record(hypervisor.cpuDemand) } meter.gaugeBuilder("system.cpu.usage") .setDescription("Amount of CPU resources used by the host") .setUnit("MHz") - .buildWithCallback { result -> result.observe(hypervisor.cpuUsage) } + .buildWithCallback { result -> result.record(hypervisor.cpuUsage) } meter.gaugeBuilder("system.cpu.utilization") .setDescription("Utilization of the CPU resources of the host") .setUnit("%") - .buildWithCallback { result -> result.observe(hypervisor.cpuUsage / _cpuLimit) } + .buildWithCallback { result -> result.record(hypervisor.cpuUsage / _cpuLimit) } meter.counterBuilder("system.cpu.time") .setDescription("Amount of CPU time spent by the host") .setUnit("s") @@ -166,12 +166,12 @@ public class SimHost( meter.gaugeBuilder("system.power.usage") .setDescription("Power usage of the host ") .setUnit("W") - .buildWithCallback { result -> result.observe(machine.powerUsage) } + .buildWithCallback { result -> result.record(machine.powerUsage) } meter.counterBuilder("system.power.total") .setDescription("Amount of energy used by the CPU") .setUnit("J") .ofDoubles() - .buildWithCallback { result -> result.observe(machine.energyUsage) } + .buildWithCallback { result -> result.record(machine.energyUsage) } meter.counterBuilder("system.time") .setDescription("The uptime of the host") .setUnit("s") @@ -382,10 +382,10 @@ public class SimHost( } } - result.observe(terminated, terminatedState) - result.observe(running, runningState) - result.observe(error, errorState) - result.observe(invalid, invalidState) + result.record(terminated, terminatedState) + result.record(running, runningState) + result.record(error, errorState) + result.record(invalid, invalidState) } private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } @@ -394,7 +394,7 @@ public class SimHost( * Helper function to collect the CPU limits of a machine. */ private fun collectCpuLimit(result: ObservableDoubleMeasurement) { - result.observe(_cpuLimit) + result.record(_cpuLimit) val guests = _guests for (i in guests.indices) { @@ -413,10 +413,10 @@ public class SimHost( private fun collectCpuTime(result: ObservableLongMeasurement) { val counters = hypervisor.counters - result.observe(counters.cpuActiveTime / 1000L, _activeState) - result.observe(counters.cpuIdleTime / 1000L, _idleState) - result.observe(counters.cpuStealTime / 1000L, _stealState) - result.observe(counters.cpuLostTime / 1000L, _lostState) + result.record(counters.cpuActiveTime / 1000L, _activeState) + result.record(counters.cpuIdleTime / 1000L, _idleState) + result.record(counters.cpuStealTime / 1000L, _stealState) + result.record(counters.cpuLostTime / 1000L, _lostState) val guests = _guests for (i in guests.indices) { @@ -458,8 +458,8 @@ public class SimHost( private fun collectUptime(result: ObservableLongMeasurement) { updateUptime() - result.observe(_uptime, _upState) - result.observe(_downtime, _downState) + result.record(_uptime, _upState) + result.record(_downtime, _downState) val guests = _guests for (i in guests.indices) { @@ -474,7 +474,7 @@ public class SimHost( */ private fun collectBootTime(result: ObservableLongMeasurement) { if (_bootTime != Long.MIN_VALUE) { - result.observe(_bootTime) + result.record(_bootTime) } val guests = _guests diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 9f3122db..f49c2824 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -239,8 +239,8 @@ internal class Guest( * Helper function to track the uptime of the guest. */ fun collectUptime(result: ObservableLongMeasurement) { - result.observe(_uptime, _upState) - result.observe(_downtime, _downState) + result.record(_uptime, _upState) + result.record(_downtime, _downState) } private var _bootTime = Long.MIN_VALUE @@ -250,7 +250,7 @@ internal class Guest( */ fun collectBootTime(result: ObservableLongMeasurement) { if (_bootTime != Long.MIN_VALUE) { - result.observe(_bootTime, attributes) + result.record(_bootTime, attributes) } } @@ -273,10 +273,10 @@ internal class Guest( fun collectCpuTime(result: ObservableLongMeasurement) { val counters = machine.counters - result.observe(counters.cpuActiveTime / 1000, _activeState) - result.observe(counters.cpuIdleTime / 1000, _idleState) - result.observe(counters.cpuStealTime / 1000, _stealState) - result.observe(counters.cpuLostTime / 1000, _lostState) + result.record(counters.cpuActiveTime / 1000, _activeState) + result.record(counters.cpuIdleTime / 1000, _idleState) + result.record(counters.cpuStealTime / 1000, _stealState) + result.record(counters.cpuLostTime / 1000, _lostState) } private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } @@ -285,7 +285,7 @@ internal class Guest( * Helper function to collect the CPU limits of a machine. */ fun collectCpuLimit(result: ObservableDoubleMeasurement) { - result.observe(_cpuLimit, attributes) + result.record(_cpuLimit, attributes) } /** diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 799a8cf0..dd13b60c 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -22,9 +22,7 @@ package org.opendc.compute.simulator -import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.export.MetricProducer import io.opentelemetry.sdk.resources.Resource import kotlinx.coroutines.* import org.junit.jupiter.api.Assertions.assertEquals @@ -83,10 +81,26 @@ internal class SimHostTest { val hostResource = Resource.builder() .put(HOST_ID, hostId.toString()) .build() - val meterProvider: MeterProvider = SdkMeterProvider + + // Setup metric reader + val duration = 5 * 60L + val reader = CoroutineMetricReader( + this, + object : ComputeMetricExporter() { + override fun record(reader: HostTableReader) { + activeTime += reader.cpuActiveTime + idleTime += reader.cpuIdleTime + stealTime += reader.cpuStealTime + } + }, + exportInterval = Duration.ofSeconds(duration) + ) + + val meterProvider = SdkMeterProvider .builder() .setResource(hostResource) .setClock(clock.toOtelClock()) + .registerMetricReader(reader) .build() val engine = FlowEngine(coroutineContext, clock) @@ -100,7 +114,6 @@ internal class SimHostTest { meterProvider, SimFairShareHypervisorProvider() ) - val duration = 5 * 60L val vmImageA = MockImage( UUID.randomUUID(), "<unnamed>", @@ -136,19 +149,6 @@ internal class SimHostTest { val flavor = MockFlavor(2, 0) - // Setup metric reader - val reader = CoroutineMetricReader( - this, listOf(meterProvider as MetricProducer), - object : ComputeMetricExporter() { - override fun record(reader: HostTableReader) { - activeTime += reader.cpuActiveTime - idleTime += reader.cpuIdleTime - stealTime += reader.cpuStealTime - } - }, - exportInterval = Duration.ofSeconds(duration) - ) - coroutineScope { launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) } launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) } @@ -169,7 +169,7 @@ internal class SimHostTest { // Ensure last cycle is collected delay(1000L * duration) virtDriver.close() - reader.close() + meterProvider.close() assertAll( { assertEquals(658, activeTime, "Active time does not match") }, @@ -195,10 +195,32 @@ internal class SimHostTest { val hostResource = Resource.builder() .put(HOST_ID, hostId.toString()) .build() - val meterProvider: MeterProvider = SdkMeterProvider + + // Setup metric reader + val duration = 5 * 60L + val reader = CoroutineMetricReader( + this, + object : ComputeMetricExporter() { + override fun record(reader: HostTableReader) { + activeTime += reader.cpuActiveTime + idleTime += reader.cpuIdleTime + uptime += reader.uptime + downtime += reader.downtime + } + + override fun record(reader: ServerTableReader) { + guestUptime += reader.uptime + guestDowntime += reader.downtime + } + }, + exportInterval = Duration.ofSeconds(duration) + ) + + val meterProvider = SdkMeterProvider .builder() .setResource(hostResource) .setClock(clock.toOtelClock()) + .registerMetricReader(reader) .build() val engine = FlowEngine(coroutineContext, clock) @@ -212,7 +234,6 @@ internal class SimHostTest { meterProvider, SimFairShareHypervisorProvider() ) - val duration = 5 * 60L val image = MockImage( UUID.randomUUID(), "<unnamed>", @@ -232,25 +253,6 @@ internal class SimHostTest { val flavor = MockFlavor(2, 0) val server = MockServer(UUID.randomUUID(), "a", flavor, image) - // Setup metric reader - val reader = CoroutineMetricReader( - this, listOf(meterProvider as MetricProducer), - object : ComputeMetricExporter() { - override fun record(reader: HostTableReader) { - activeTime += reader.cpuActiveTime - idleTime += reader.cpuIdleTime - uptime += reader.uptime - downtime += reader.downtime - } - - override fun record(reader: ServerTableReader) { - guestUptime += reader.uptime - guestDowntime += reader.downtime - } - }, - exportInterval = Duration.ofSeconds(duration) - ) - coroutineScope { host.spawn(server) delay(5000L) @@ -273,7 +275,7 @@ internal class SimHostTest { // Ensure last cycle is collected delay(1000L * duration) - reader.close() + meterProvider.close() assertAll( { assertEquals(1175, idleTime, "Idle time does not match") }, diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index 59203b66..a1a65da3 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -22,10 +22,6 @@ package org.opendc.compute.workload -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.export.MetricProducer -import io.opentelemetry.sdk.resources.Resource -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -33,12 +29,11 @@ import kotlinx.coroutines.yield import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost +import org.opendc.compute.workload.telemetry.TelemetryManager import org.opendc.compute.workload.topology.HostSpec import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.flow.FlowEngine -import org.opendc.telemetry.compute.* -import org.opendc.telemetry.sdk.toOtelClock import java.time.Clock import java.time.Duration import java.util.* @@ -50,6 +45,7 @@ import kotlin.math.max * * @param context [CoroutineContext] to run the simulation in. * @param clock [Clock] instance tracking simulation time. + * @param telemetry Helper class for managing telemetry. * @param scheduler [ComputeScheduler] implementation to use for the service. * @param failureModel A failure model to use for injecting failures. * @param interferenceModel The model to use for performance interference. @@ -58,6 +54,7 @@ import kotlin.math.max public class ComputeServiceHelper( private val context: CoroutineContext, private val clock: Clock, + private val telemetry: TelemetryManager, scheduler: ComputeScheduler, private val failureModel: FailureModel? = null, private val interferenceModel: VmInterferenceModel? = null, @@ -69,25 +66,17 @@ public class ComputeServiceHelper( public val service: ComputeService /** - * The [MetricProducer] that are used by the [ComputeService] and the simulated hosts. - */ - public val producers: List<MetricProducer> - get() = _metricProducers - private val _metricProducers = mutableListOf<MetricProducer>() - - /** * The [FlowEngine] to simulate the hosts. */ - private val engine = FlowEngine(context, clock) + private val _engine = FlowEngine(context, clock) /** * The hosts that belong to this class. */ - private val hosts = mutableSetOf<SimHost>() + private val _hosts = mutableSetOf<SimHost>() init { - val (service, serviceMeterProvider) = createService(scheduler, schedulingQuantum) - this._metricProducers.add(serviceMeterProvider) + val service = createService(scheduler, schedulingQuantum) this.service = service } @@ -165,27 +154,14 @@ public class ComputeServiceHelper( * @return The [SimHost] that has been constructed by the runner. */ public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost { - val resource = Resource.builder() - .put(HOST_ID, spec.uid.toString()) - .put(HOST_NAME, spec.name) - .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) - .put(HOST_NCPUS, spec.model.cpus.size) - .put(HOST_MEM_CAPACITY, spec.model.memory.sumOf { it.size }) - .build() - - val meterProvider = SdkMeterProvider.builder() - .setClock(clock.toOtelClock()) - .setResource(resource) - .build() - _metricProducers.add(meterProvider) - + val meterProvider = telemetry.createMeterProvider(spec) val host = SimHost( spec.uid, spec.name, spec.model, spec.meta, context, - engine, + _engine, meterProvider, spec.hypervisor, powerDriver = spec.powerDriver, @@ -193,7 +169,7 @@ public class ComputeServiceHelper( optimize = optimize ) - hosts.add(host) + _hosts.add(host) service.addHost(host) return host @@ -202,27 +178,18 @@ public class ComputeServiceHelper( override fun close() { service.close() - for (host in hosts) { + for (host in _hosts) { host.close() } - hosts.clear() + _hosts.clear() } /** * Construct a [ComputeService] instance. */ - private fun createService(scheduler: ComputeScheduler, schedulingQuantum: Duration): Pair<ComputeService, SdkMeterProvider> { - val resource = Resource.builder() - .put(ResourceAttributes.SERVICE_NAME, "opendc-compute") - .build() - - val meterProvider = SdkMeterProvider.builder() - .setClock(clock.toOtelClock()) - .setResource(resource) - .build() - - val service = ComputeService(context, clock, meterProvider, scheduler, schedulingQuantum) - return service to meterProvider + private fun createService(scheduler: ComputeScheduler, schedulingQuantum: Duration): ComputeService { + val meterProvider = telemetry.createMeterProvider(scheduler) + return ComputeService(context, clock, meterProvider, scheduler, schedulingQuantum) } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/NoopTelemetryManager.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/NoopTelemetryManager.kt new file mode 100644 index 00000000..4e7d0b75 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/NoopTelemetryManager.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry + +import io.opentelemetry.api.metrics.MeterProvider +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.workload.topology.HostSpec + +/** + * A [TelemetryManager] that does nothing. + */ +public class NoopTelemetryManager : TelemetryManager { + override fun createMeterProvider(host: HostSpec): MeterProvider = MeterProvider.noop() + + override fun createMeterProvider(scheduler: ComputeScheduler): MeterProvider = MeterProvider.noop() +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt new file mode 100644 index 00000000..478c0609 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry + +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.data.AggregationTemporality +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.metrics.export.MetricReader +import io.opentelemetry.sdk.metrics.export.MetricReaderFactory +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.workload.topology.HostSpec +import org.opendc.telemetry.compute.* +import org.opendc.telemetry.sdk.toOtelClock +import java.time.Clock + +/** + * A [TelemetryManager] using the OpenTelemetry Java SDK. + */ +public class SdkTelemetryManager(private val clock: Clock) : TelemetryManager, AutoCloseable { + /** + * The [SdkMeterProvider]s that belong to the workload runner. + */ + private val _meterProviders = mutableListOf<SdkMeterProvider>() + + /** + * The internal [MetricProducer] registered with the runner. + */ + private val _metricProducers = mutableListOf<MetricProducer>() + + /** + * The list of [MetricReader]s that have been registered with the runner. + */ + private val _metricReaders = mutableListOf<MetricReader>() + + /** + * A [MetricProducer] that combines all the other metric producers. + */ + public val metricProducer: MetricProducer = object : MetricProducer { + private val producers = _metricProducers + + override fun collectAllMetrics(): Collection<MetricData> = producers.flatMap(MetricProducer::collectAllMetrics) + + override fun toString(): String = "SdkTelemetryManager.AggregateMetricProducer" + } + + /** + * Register a [MetricReader] for this manager. + * + * @param factory The factory for the reader to register. + */ + public fun registerMetricReader(factory: MetricReaderFactory) { + val reader = factory.apply(metricProducer) + _metricReaders.add(reader) + } + + override fun createMeterProvider(scheduler: ComputeScheduler): MeterProvider { + val resource = Resource.builder() + .put(ResourceAttributes.SERVICE_NAME, "opendc-compute") + .build() + + return createMeterProvider(resource) + } + + override fun createMeterProvider(host: HostSpec): MeterProvider { + val resource = Resource.builder() + .put(HOST_ID, host.uid.toString()) + .put(HOST_NAME, host.name) + .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) + .put(HOST_NCPUS, host.model.cpus.size) + .put(HOST_MEM_CAPACITY, host.model.memory.sumOf { it.size }) + .build() + + return createMeterProvider(resource) + } + + /** + * Construct a [SdkMeterProvider] for the specified [resource]. + */ + private fun createMeterProvider(resource: Resource): SdkMeterProvider { + val meterProvider = SdkMeterProvider.builder() + .setClock(clock.toOtelClock()) + .setResource(resource) + .registerMetricReader { producer -> + _metricProducers.add(producer) + object : MetricReader { + override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE + override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() + } + } + .build() + _meterProviders.add(meterProvider) + return meterProvider + } + + override fun close() { + for (meterProvider in _meterProviders) { + meterProvider.close() + } + + _meterProviders.clear() + + for (metricReader in _metricReaders) { + metricReader.shutdown() + } + + _metricReaders.clear() + _metricProducers.clear() + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/TelemetryManager.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/TelemetryManager.kt new file mode 100644 index 00000000..b67050ce --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/TelemetryManager.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry + +import io.opentelemetry.api.metrics.MeterProvider +import org.opendc.compute.service.scheduler.ComputeScheduler +import org.opendc.compute.workload.topology.HostSpec + +/** + * Helper class to manage the telemetry for a [ComputeServiceHelper] instance. + */ +public interface TelemetryManager { + /** + * Construct a [MeterProvider] for the specified [ComputeScheduler]. + */ + public fun createMeterProvider(scheduler: ComputeScheduler): MeterProvider + + /** + * Construct a [MeterProvider] for the specified [HostSpec]. + */ + public fun createMeterProvider(host: HostSpec): MeterProvider +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt index 4b35de95..bb9cb201 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt @@ -29,6 +29,7 @@ import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.workload.* +import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.topology.clusterTopology @@ -70,6 +71,7 @@ class CapelinBenchmarks { val runner = ComputeServiceHelper( coroutineContext, clock, + NoopTelemetryManager(), computeScheduler ) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index b548ae58..6604a190 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -29,6 +29,7 @@ import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.createComputeScheduler import org.opendc.compute.workload.export.parquet.ParquetComputeMetricExporter import org.opendc.compute.workload.grid5000 +import org.opendc.compute.workload.telemetry.SdkTelemetryManager import org.opendc.compute.workload.topology.apply import org.opendc.compute.workload.util.VmInterferenceModelReader import org.opendc.experiments.capelin.model.OperationalPhenomena @@ -38,7 +39,6 @@ import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Duration @@ -109,9 +109,11 @@ abstract class Portfolio(name: String) : Experiment(name) { grid5000(Duration.ofSeconds((operationalPhenomena.failureFrequency * 60).roundToLong())) else null + val telemetry = SdkTelemetryManager(clock) val runner = ComputeServiceHelper( coroutineContext, clock, + telemetry, computeScheduler, failureModel, performanceInterferenceModel?.withSeed(repeat.toLong()) @@ -122,7 +124,8 @@ abstract class Portfolio(name: String) : Experiment(name) { "portfolio_id=$name/scenario_id=$id/run_id=$repeat", 4096 ) - val metricReader = CoroutineMetricReader(this, runner.producers, exporter) + telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) + val topology = clusterTopology(File(config.getString("env-path"), "${topology.name}.txt")) try { @@ -133,17 +136,6 @@ abstract class Portfolio(name: String) : Experiment(name) { runner.run(workload.source.resolve(workloadLoader, seeder), seeder.nextLong()) } finally { runner.close() - metricReader.close() - } - - val monitorResults = collectServiceMetrics(runner.producers[0]) - logger.debug { - "Scheduler " + - "Success=${monitorResults.attemptsSuccess} " + - "Failure=${monitorResults.attemptsFailure} " + - "Error=${monitorResults.attemptsError} " + - "Pending=${monitorResults.serversPending} " + - "Active=${monitorResults.serversActive}" } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index eedc3131..aefd8304 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -32,17 +32,20 @@ import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.workload.* +import org.opendc.compute.workload.telemetry.SdkTelemetryManager import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply import org.opendc.compute.workload.util.VmInterferenceModelReader import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.simulator.core.runBlockingSimulation import org.opendc.telemetry.compute.ComputeMetricExporter -import org.opendc.telemetry.compute.collectServiceMetrics import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.telemetry.compute.table.ServiceData +import org.opendc.telemetry.compute.table.ServiceTableReader import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Duration +import java.time.Instant import java.util.* /** @@ -83,44 +86,47 @@ class CapelinIntegrationTest { @Test fun testLarge() = runBlockingSimulation { val workload = createTestWorkload(1.0) + val telemetry = SdkTelemetryManager(clock) val runner = ComputeServiceHelper( coroutineContext, clock, + telemetry, computeScheduler ) val topology = createTopology() - val metricReader = CoroutineMetricReader(this, runner.producers, exporter) + + telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) try { runner.apply(topology) runner.run(workload, 0) + + val serviceMetrics = exporter.serviceMetrics + println( + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" + ) + + // Note that these values have been verified beforehand + assertAll( + { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") }, + { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, + { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, + { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, + { assertEquals(223388307, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, + { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, + { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, + { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, + { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, + ) } finally { runner.close() - metricReader.close() + telemetry.close() } - - val serviceMetrics = collectServiceMetrics(runner.producers[0]) - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" - ) - - // Note that these values have been verified beforehand - assertAll( - { assertEquals(50, serviceMetrics.attemptsSuccess, "The scheduler should schedule 50 VMs") }, - { assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") }, - { assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") }, - { assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") }, - { assertEquals(223388307, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } }, - { assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, - { assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, - { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, - ) } /** @@ -130,33 +136,34 @@ class CapelinIntegrationTest { fun testSmall() = runBlockingSimulation { val seed = 1 val workload = createTestWorkload(0.25, seed) - - val simulator = ComputeServiceHelper( + val telemetry = SdkTelemetryManager(clock) + val runner = ComputeServiceHelper( coroutineContext, clock, + telemetry, computeScheduler ) val topology = createTopology("single") - val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) + + telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) try { - simulator.apply(topology) - simulator.run(workload, seed.toLong()) + runner.apply(topology) + runner.run(workload, seed.toLong()) + + println( + "Scheduler " + + "Success=${exporter.serviceMetrics.attemptsSuccess} " + + "Failure=${exporter.serviceMetrics.attemptsFailure} " + + "Error=${exporter.serviceMetrics.attemptsError} " + + "Pending=${exporter.serviceMetrics.serversPending} " + + "Active=${exporter.serviceMetrics.serversActive}" + ) } finally { - simulator.close() - metricReader.close() + runner.close() + telemetry.close() } - val serviceMetrics = collectServiceMetrics(simulator.producers[0]) - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" - ) - // Note that these values have been verified beforehand assertAll( { assertEquals(10999208, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, @@ -180,33 +187,35 @@ class CapelinIntegrationTest { .read(perfInterferenceInput) .withSeed(seed.toLong()) + val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( coroutineContext, clock, + telemetry, computeScheduler, interferenceModel = performanceInterferenceModel ) val topology = createTopology("single") - val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) + + telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) try { simulator.apply(topology) simulator.run(workload, seed.toLong()) + + println( + "Scheduler " + + "Success=${exporter.serviceMetrics.attemptsSuccess} " + + "Failure=${exporter.serviceMetrics.attemptsFailure} " + + "Error=${exporter.serviceMetrics.attemptsError} " + + "Pending=${exporter.serviceMetrics.serversPending} " + + "Active=${exporter.serviceMetrics.serversActive}" + ) } finally { simulator.close() - metricReader.close() + telemetry.close() } - val serviceMetrics = collectServiceMetrics(simulator.producers[0]) - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" - ) - // Note that these values have been verified beforehand assertAll( { assertEquals(6027666, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, @@ -222,34 +231,36 @@ class CapelinIntegrationTest { @Test fun testFailures() = runBlockingSimulation { val seed = 1 + val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( coroutineContext, clock, + telemetry, computeScheduler, grid5000(Duration.ofDays(7)) ) val topology = createTopology("single") val workload = createTestWorkload(0.25, seed) - val metricReader = CoroutineMetricReader(this, simulator.producers, exporter) + + telemetry.registerMetricReader(CoroutineMetricReader(this, exporter)) try { simulator.apply(topology) simulator.run(workload, seed.toLong()) + + println( + "Scheduler " + + "Success=${exporter.serviceMetrics.attemptsSuccess} " + + "Failure=${exporter.serviceMetrics.attemptsFailure} " + + "Error=${exporter.serviceMetrics.attemptsError} " + + "Pending=${exporter.serviceMetrics.serversPending} " + + "Active=${exporter.serviceMetrics.serversActive}" + ) } finally { simulator.close() - metricReader.close() + telemetry.close() } - val serviceMetrics = collectServiceMetrics(simulator.producers[0]) - println( - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" - ) - // Note that these values have been verified beforehand assertAll( { assertEquals(10866961, exporter.idleTime) { "Idle time incorrect" } }, @@ -277,6 +288,7 @@ class CapelinIntegrationTest { } class TestComputeMetricExporter : ComputeMetricExporter() { + var serviceMetrics: ServiceData = ServiceData(Instant.ofEpochMilli(0), 0, 0, 0, 0, 0, 0, 0) var idleTime = 0L var activeTime = 0L var stealTime = 0L @@ -284,6 +296,19 @@ class CapelinIntegrationTest { var energyUsage = 0.0 var uptime = 0L + override fun record(reader: ServiceTableReader) { + serviceMetrics = ServiceData( + reader.timestamp, + reader.hostsUp, + reader.hostsDown, + reader.serversPending, + reader.serversActive, + reader.attemptsSuccess, + reader.attemptsFailure, + reader.attemptsError + ) + } + override fun record(reader: HostTableReader) { idleTime += reader.cpuIdleTime activeTime += reader.cpuActiveTime diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index 1752802f..c751463d 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -23,7 +23,6 @@ package org.opendc.experiments.tf20.core import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter import kotlinx.coroutines.* import org.opendc.simulator.compute.SimBareMetalMachine @@ -82,7 +81,6 @@ public class SimTFDevice( .setDescription("The amount of device resources used") .setUnit("MHz") .build() - .bind(Attributes.of(deviceId, uid.toString())) /** * The power draw of the device. @@ -91,7 +89,6 @@ public class SimTFDevice( .setDescription("The power draw of the device") .setUnit("W") .build() - .bind(Attributes.of(deviceId, uid.toString())) /** * The workload that will be run by the device. diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt index 54df2b59..836231c8 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt @@ -24,9 +24,9 @@ package org.opendc.faas.service import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.common.Attributes -import io.opentelemetry.api.metrics.BoundLongCounter -import io.opentelemetry.api.metrics.BoundLongHistogram -import io.opentelemetry.api.metrics.BoundLongUpDownCounter +import io.opentelemetry.api.metrics.LongCounter +import io.opentelemetry.api.metrics.LongHistogram +import io.opentelemetry.api.metrics.LongUpDownCounter import io.opentelemetry.api.metrics.Meter import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import org.opendc.faas.service.deployer.FunctionInstance @@ -56,76 +56,68 @@ public class FunctionObject( /** * The total amount of function invocations received by the function. */ - public val invocations: BoundLongCounter = meter.counterBuilder("function.invocations.total") + public val invocations: LongCounter = meter.counterBuilder("function.invocations.total") .setDescription("Number of function invocations") .setUnit("1") .build() - .bind(attributes) /** * The amount of function invocations that could be handled directly. */ - public val timelyInvocations: BoundLongCounter = meter.counterBuilder("function.invocations.warm") + public val timelyInvocations: LongCounter = meter.counterBuilder("function.invocations.warm") .setDescription("Number of function invocations handled directly") .setUnit("1") .build() - .bind(attributes) /** * The amount of function invocations that were delayed due to function deployment. */ - public val delayedInvocations: BoundLongCounter = meter.counterBuilder("function.invocations.cold") + public val delayedInvocations: LongCounter = meter.counterBuilder("function.invocations.cold") .setDescription("Number of function invocations that are delayed") .setUnit("1") .build() - .bind(attributes) /** * The amount of function invocations that failed. */ - public val failedInvocations: BoundLongCounter = meter.counterBuilder("function.invocations.failed") + public val failedInvocations: LongCounter = meter.counterBuilder("function.invocations.failed") .setDescription("Number of function invocations that failed") .setUnit("1") .build() - .bind(attributes) /** * The amount of instances for this function. */ - public val activeInstances: BoundLongUpDownCounter = meter.upDownCounterBuilder("function.instances.active") + public val activeInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.active") .setDescription("Number of active function instances") .setUnit("1") .build() - .bind(attributes) /** * The amount of idle instances for this function. */ - public val idleInstances: BoundLongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle") + public val idleInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle") .setDescription("Number of idle function instances") .setUnit("1") .build() - .bind(attributes) /** * The time that the function waited. */ - public val waitTime: BoundLongHistogram = meter.histogramBuilder("function.time.wait") + public val waitTime: LongHistogram = meter.histogramBuilder("function.time.wait") .ofLongs() .setDescription("Time the function has to wait before being started") .setUnit("ms") .build() - .bind(attributes) /** * The time that the function was running. */ - public val activeTime: BoundLongHistogram = meter.histogramBuilder("function.time.active") + public val activeTime: LongHistogram = meter.histogramBuilder("function.time.active") .ofLongs() .setDescription("Time the function was running") .setUnit("ms") .build() - .bind(attributes) /** * The instances associated with this function. diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index 3b560cd3..c285585a 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -226,7 +226,7 @@ internal class FaaSServiceImpl( val instance = if (activeInstance != null) { _timelyInvocations.add(1) - function.timelyInvocations.add(1) + function.timelyInvocations.add(1, function.attributes) activeInstance } else { @@ -234,29 +234,29 @@ internal class FaaSServiceImpl( instances.add(instance) terminationPolicy.enqueue(instance) - function.idleInstances.add(1) + function.idleInstances.add(1, function.attributes) _delayedInvocations.add(1) - function.delayedInvocations.add(1) + function.delayedInvocations.add(1, function.attributes) instance } suspend { val start = clock.millis() - function.waitTime.record(start - submitTime) - function.idleInstances.add(-1) - function.activeInstances.add(1) + function.waitTime.record(start - submitTime, function.attributes) + function.idleInstances.add(-1, function.attributes) + function.activeInstances.add(1, function.attributes) try { instance.invoke() } catch (e: Throwable) { logger.debug(e) { "Function invocation failed" } - function.failedInvocations.add(1) + function.failedInvocations.add(1, function.attributes) } finally { val end = clock.millis() - function.activeTime.record(end - start) - function.idleInstances.add(1) - function.activeInstances.add(-1) + function.activeTime.record(end - start, function.attributes) + function.idleInstances.add(1, function.attributes) + function.activeInstances.add(-1, function.attributes) } }.startCoroutineCancellable(cont) } @@ -269,7 +269,7 @@ internal class FaaSServiceImpl( check(function.uid in functions) { "Function does not exist (anymore)" } _invocations.add(1) - function.invocations.add(1) + function.invocations.add(1, function.attributes) return suspendCancellableCoroutine { cont -> if (!queue.add(InvocationRequest(clock.millis(), function, cont))) { diff --git a/opendc-telemetry/opendc-telemetry-api/build.gradle.kts b/opendc-telemetry/opendc-telemetry-api/build.gradle.kts index c544b7d6..5492fc14 100644 --- a/opendc-telemetry/opendc-telemetry-api/build.gradle.kts +++ b/opendc-telemetry/opendc-telemetry-api/build.gradle.kts @@ -29,6 +29,5 @@ plugins { dependencies { api(platform(projects.opendcPlatform)) - api(libs.opentelemetry.api.main) - api(libs.opentelemetry.api.metrics) + api(libs.opentelemetry.api) } diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt index 1de235e7..a9290c47 100644 --- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt +++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt @@ -22,11 +22,16 @@ package org.opendc.telemetry.sdk.metrics.export +import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.metrics.data.AggregationTemporality import io.opentelemetry.sdk.metrics.export.MetricExporter import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.metrics.export.MetricReader +import io.opentelemetry.sdk.metrics.export.MetricReaderFactory import kotlinx.coroutines.* import mu.KotlinLogging import java.time.Duration +import java.util.* /** * A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every @@ -35,16 +40,16 @@ import java.time.Duration * The reader runs in a [CoroutineScope] which enables collection of metrics in environments with a custom clock. * * @param scope The [CoroutineScope] to run the reader in. - * @param producers The metric producers to gather metrics from. + * @param producer The metric producer to gather metrics from. * @param exporter The export to export the metrics to. * @param exportInterval The export interval. */ -public class CoroutineMetricReader( +public class CoroutineMetricReader private constructor( scope: CoroutineScope, - private val producers: List<MetricProducer>, + private val producer: MetricProducer, private val exporter: MetricExporter, - private val exportInterval: Duration = Duration.ofMinutes(5) -) : AutoCloseable { + private val exportInterval: Duration +) : MetricReader { private val logger = KotlinLogging.logger {} /** @@ -57,9 +62,8 @@ public class CoroutineMetricReader( while (isActive) { delay(intervalMs) - val metrics = producers.flatMap(MetricProducer::collectAllMetrics) - try { + val metrics = producer.collectAllMetrics() val result = exporter.export(metrics) result.whenComplete { if (!result.isSuccess) { @@ -75,7 +79,29 @@ public class CoroutineMetricReader( } } - override fun close() { + override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE + + override fun flush(): CompletableResultCode { + return exporter.flush() + } + + override fun shutdown(): CompletableResultCode { job.cancel() + return CompletableResultCode.ofSuccess() + } + + public companion object { + /** + * Construct a [MetricReaderFactory] for this metric reader. + */ + public operator fun invoke( + scope: CoroutineScope, + exporter: MetricExporter, + exportInterval: Duration = Duration.ofMinutes(5) + ): MetricReaderFactory { + return MetricReaderFactory { producer -> + CoroutineMetricReader(scope, producer, exporter, exportInterval) + } + } } } diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt index 5d6bc37f..8f4e9d6d 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/Main.kt @@ -29,6 +29,7 @@ import com.github.ajalt.clikt.parameters.types.long import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.workload.* +import org.opendc.compute.workload.telemetry.SdkTelemetryManager import org.opendc.compute.workload.topology.HostSpec import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply @@ -185,34 +186,36 @@ class RunnerCli : CliktCommand(name = "runner") { else null + val telemetry = SdkTelemetryManager(clock) val simulator = ComputeServiceHelper( coroutineContext, clock, + telemetry, computeScheduler, failureModel, interferenceModel.takeIf { operational.performanceInterferenceEnabled } ) - val metricReader = CoroutineMetricReader(this, simulator.producers, exporter, exportInterval = Duration.ofHours(1)) + telemetry.registerMetricReader(CoroutineMetricReader(this, exporter, exportInterval = Duration.ofHours(1))) try { // Instantiate the topology onto the simulator simulator.apply(topology) - // Converge workload trace + // Run workload trace simulator.run(workload.resolve(workloadLoader, seeder), seeder.nextLong()) + + val serviceMetrics = collectServiceMetrics(telemetry.metricProducer) + logger.debug { + "Scheduler " + + "Success=${serviceMetrics.attemptsSuccess} " + + "Failure=${serviceMetrics.attemptsFailure} " + + "Error=${serviceMetrics.attemptsError} " + + "Pending=${serviceMetrics.serversPending} " + + "Active=${serviceMetrics.serversActive}" + } } finally { simulator.close() - metricReader.close() - } - - val serviceMetrics = collectServiceMetrics(simulator.producers[0]) - logger.debug { - "Scheduler " + - "Success=${serviceMetrics.attemptsSuccess} " + - "Failure=${serviceMetrics.attemptsFailure} " + - "Error=${serviceMetrics.attemptsError} " + - "Pending=${serviceMetrics.serversPending} " + - "Active=${serviceMetrics.serversActive}" + telemetry.close() } } } catch (cause: Throwable) { diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index 214d5135..1fd332b9 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -33,6 +33,7 @@ import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.VCpuWeigher import org.opendc.compute.workload.ComputeServiceHelper +import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.HostSpec import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider import org.opendc.simulator.compute.model.MachineModel @@ -70,7 +71,8 @@ internal class WorkflowServiceTest { filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) ) - val computeHelper = ComputeServiceHelper(coroutineContext, clock, computeScheduler, schedulingQuantum = Duration.ofSeconds(1)) + + val computeHelper = ComputeServiceHelper(coroutineContext, clock, NoopTelemetryManager(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1)) repeat(HOST_COUNT) { computeHelper.registerHost(createHostSpec(it)) } diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt index 0198900f..a7d0ed6c 100644 --- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt @@ -22,8 +22,13 @@ package org.opendc.workflow.workload +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.data.AggregationTemporality import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.metrics.export.MetricReader +import io.opentelemetry.sdk.metrics.export.MetricReaderFactory import io.opentelemetry.sdk.resources.Resource import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import kotlinx.coroutines.coroutineScope @@ -34,6 +39,7 @@ import org.opendc.telemetry.sdk.toOtelClock import org.opendc.workflow.api.Job import org.opendc.workflow.service.WorkflowService import java.time.Clock +import java.util.* import kotlin.coroutines.CoroutineContext /** @@ -58,23 +64,47 @@ public class WorkflowServiceHelper( /** * The [MetricProducer] exposed by the [WorkflowService]. */ - public val metricProducer: MetricProducer + public lateinit var metricProducer: MetricProducer + private set + + /** + * The [MeterProvider] used for the service. + */ + private val _meterProvider: SdkMeterProvider + + /** + * The list of [MetricReader]s that have been registered with the runner. + */ + private val _metricReaders = mutableListOf<MetricReader>() init { val resource = Resource.builder() .put(ResourceAttributes.SERVICE_NAME, "opendc-workflow") .build() - val meterProvider = SdkMeterProvider.builder() + _meterProvider = SdkMeterProvider.builder() .setClock(clock.toOtelClock()) .setResource(resource) + .registerMetricReader { producer -> + metricProducer = producer + + val metricReaders = _metricReaders + object : MetricReader { + override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE + override fun flush(): CompletableResultCode { + return CompletableResultCode.ofAll(metricReaders.map { it.flush() }) + } + override fun shutdown(): CompletableResultCode { + return CompletableResultCode.ofAll(metricReaders.map { it.shutdown() }) + } + } + } .build() - metricProducer = meterProvider service = WorkflowService( context, clock, - meterProvider, + _meterProvider, computeClient, schedulerSpec.schedulingQuantum, jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy, @@ -116,8 +146,19 @@ public class WorkflowServiceHelper( } } + /** + * Register a [MetricReader] for this helper. + * + * @param factory The factory for the reader to register. + */ + public fun registerMetricReader(factory: MetricReaderFactory) { + val reader = factory.apply(metricProducer) + _metricReaders.add(reader) + } + override fun close() { computeClient.close() service.close() + _meterProvider.close() } } |
