diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-06 19:04:03 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-05-06 19:04:03 +0200 |
| commit | c3d8d967f82f39f1ef461d5687eb68fb867336c5 (patch) | |
| tree | 2e9938f63c42e5d02fe203e049377d1d17b5d782 | |
| parent | a9657e4fa3b15e2c1c11884b5a250b0861bcc21d (diff) | |
| parent | 260e2228afea08868e8f7f07233b1861b2d7f0c7 (diff) | |
merge: Move OpenTelemetry integration outside core modules (#81)
This change removes the OpenTelemetry integration from the OpenDC modules.
Previously, we chose to integrate OpenTelemetry to provide a unified way to
report metrics to the users.
Although this worked as expected, the overhead of the OpenTelemetry when
collecting metrics during simulation was considerable and lacked more
optimization opportunities (other than providing a separate API
implementation). Furthermore, since we were tied to OpenTelemetry's SDK
implementation, we experienced issues with throttling and registering
multiple instruments.
We will instead use another approach, where we expose the core metrics
in OpenDC via specialized interfaces (see #80) such that
access is fast and can be done without having to interface with
OpenTelemetry. In addition, we will provide an adapter to that is able
to forward these metrics to OpenTelemetry implementations, so we can
still integrate with the wider ecosystem.
## Implementation Notes :hammer_and_pick:
* Remove OpenTelemetry from "compute" modules
* Remove OpenTelemetry from "workflow" modules
* Remove OpenTelemetry from "FaaS" modules
* Remove OpenTelemetry from TF20 experiment
* Remove dependency on OpenTelemetry SDK
## External Dependencies :four_leaf_clover:
* N/A
## Breaking API Changes :warning:
* Metrics are not anymore directly exposed via OpenTelemetry. Instead, an adapter needs to be used to access the data via OpenTelemetry.
70 files changed, 121 insertions, 2012 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index b05af368..55975919 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -21,9 +21,6 @@ ktlint-gradle = "10.2.1" log4j = "2.17.2" microprofile-openapi = "3.0" mockk = "1.12.3" -opentelemetry-main = "1.12.0" -opentelemetry-metrics = "1.12.0-alpha" -opentelemetry-semconv = "1.12.0-alpha" parquet = "1.12.2" progressbar = "0.9.2" quarkus = "2.8.1.Final" @@ -45,12 +42,6 @@ slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" } log4j-slf4j = { module = "org.apache.logging.log4j:log4j-slf4j-impl", version.ref = "log4j" } sentry-log4j2 = { module = "io.sentry:sentry-log4j2", version.ref = "sentry" } -# Telemetry -opentelemetry-api = { module = "io.opentelemetry:opentelemetry-api", version.ref = "opentelemetry-main" } -opentelemetry-sdk-main = { module = "io.opentelemetry:opentelemetry-sdk", version.ref = "opentelemetry-main" } -opentelemetry-sdk-metrics = { module = "io.opentelemetry:opentelemetry-sdk-metrics", version.ref = "opentelemetry-metrics" } -opentelemetry-semconv = { module = "io.opentelemetry:opentelemetry-semconv", version.ref = "opentelemetry-semconv" } - # Testing junit-jupiter-api = { module = "org.junit.jupiter:junit-jupiter-api", version.ref = "junit-jupiter" } junit-jupiter-engine = { module = "org.junit.jupiter:junit-jupiter-engine", version.ref = "junit-jupiter" } diff --git a/opendc-compute/opendc-compute-service/build.gradle.kts b/opendc-compute/opendc-compute-service/build.gradle.kts index b42c2919..fd15b6e7 100644 --- a/opendc-compute/opendc-compute-service/build.gradle.kts +++ b/opendc-compute/opendc-compute-service/build.gradle.kts @@ -29,10 +29,8 @@ plugins { dependencies { api(projects.opendcCompute.opendcComputeApi) - api(projects.opendcTelemetry.opendcTelemetryApi) implementation(projects.opendcCommon) implementation(libs.kotlin.logging) - implementation(libs.opentelemetry.semconv) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testRuntimeOnly(libs.log4j.slf4j) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index 3a6baaa1..c0b70268 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -22,8 +22,6 @@ package org.opendc.compute.service -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient import org.opendc.compute.api.Server import org.opendc.compute.service.driver.Host @@ -79,18 +77,16 @@ public interface ComputeService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. - * @param meterProvider The [MeterProvider] for creating a [Meter] for the service. * @param scheduler The scheduler implementation to use. * @param schedulingQuantum The interval between scheduling cycles. */ public operator fun invoke( context: CoroutineContext, clock: Clock, - meterProvider: MeterProvider, scheduler: ComputeScheduler, schedulingQuantum: Duration = Duration.ofMinutes(5), ): ComputeService { - return ComputeServiceImpl(context, clock, meterProvider, scheduler, schedulingQuantum) + return ComputeServiceImpl(context, clock, scheduler, schedulingQuantum) } } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt index b3958473..6fec5175 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt @@ -35,5 +35,5 @@ import java.time.Instant public data class GuestSystemStats( val uptime: Duration, val downtime: Duration, - val bootTime: Instant + val bootTime: Instant? ) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt index 1c07023f..9d34a5ce 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt @@ -41,7 +41,7 @@ import java.time.Instant public data class HostSystemStats( val uptime: Duration, val downtime: Duration, - val bootTime: Instant, + val bootTime: Instant?, val powerUsage: Double, val energyUsage: Double, val guestsTerminated: Int, diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index e8664e5c..21aaa19e 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,11 +22,6 @@ package org.opendc.compute.service.internal -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.common.util.Pacer @@ -49,14 +44,12 @@ import kotlin.math.max * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. - * @param meterProvider The [MeterProvider] for creating a [Meter] for the service. * @param scheduler The scheduler implementation to use. * @param schedulingQuantum The interval between scheduling cycles. */ internal class ComputeServiceImpl( private val context: CoroutineContext, private val clock: Clock, - meterProvider: MeterProvider, private val scheduler: ComputeScheduler, schedulingQuantum: Duration ) : ComputeService, HostListener { @@ -71,11 +64,6 @@ internal class ComputeServiceImpl( private val logger = KotlinLogging.logger {} /** - * The [Meter] to track metrics of the [ComputeService]. - */ - private val meter = meterProvider.get("org.opendc.compute.service") - - /** * The [Random] instance used to generate unique identifiers for the objects. */ private val random = Random(0) @@ -117,72 +105,20 @@ internal class ComputeServiceImpl( private var maxCores = 0 private var maxMemory = 0L - - /** - * The number of scheduling attempts. - */ - private val _schedulingAttempts = meter.counterBuilder("scheduler.attempts") - .setDescription("Number of scheduling attempts") - .setUnit("1") - .build() - private val _schedulingAttemptsSuccessAttr = Attributes.of(AttributeKey.stringKey("result"), "success") - private val _schedulingAttemptsFailureAttr = Attributes.of(AttributeKey.stringKey("result"), "failure") - private val _schedulingAttemptsErrorAttr = Attributes.of(AttributeKey.stringKey("result"), "error") private var _attemptsSuccess = 0L private var _attemptsFailure = 0L private var _attemptsError = 0L - - /** - * The response time of the service. - */ - private val _schedulingLatency = meter.histogramBuilder("scheduler.latency") - .setDescription("End to end latency for a server to be scheduled (in multiple attempts)") - .ofLongs() - .setUnit("ms") - .build() - - /** - * The number of servers that are pending. - */ - private val _servers = meter.upDownCounterBuilder("scheduler.servers") - .setDescription("Number of servers managed by the scheduler") - .setUnit("1") - .build() - private val _serversPendingAttr = Attributes.of(AttributeKey.stringKey("state"), "pending") - private val _serversActiveAttr = Attributes.of(AttributeKey.stringKey("state"), "active") private var _serversPending = 0 private var _serversActive = 0 /** * The [Pacer] to use for scheduling the scheduler cycles. */ - private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis(), ::doSchedule) + private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() } override val hosts: Set<Host> get() = hostToView.keys - init { - val upState = Attributes.of(AttributeKey.stringKey("state"), "up") - val downState = Attributes.of(AttributeKey.stringKey("state"), "down") - - meter.upDownCounterBuilder("scheduler.hosts") - .setDescription("Number of hosts registered with the scheduler") - .setUnit("1") - .buildWithCallback { result -> - val total = hosts.size - val available = availableHosts.size.toLong() - - result.record(available, upState) - result.record(total - available, downState) - } - - meter.gaugeBuilder("system.time.provision") - .setDescription("The most recent timestamp where the server entered a provisioned state") - .setUnit("1") - .ofLongs() - .buildWithCallback(::collectProvisionTime) - } - override fun newClient(): ComputeClient { check(scope.isActive) { "Service is already closed" } return object : ComputeClient { @@ -355,7 +291,6 @@ internal class ComputeServiceImpl( server.launchedAt = Instant.ofEpochMilli(now) queue.add(request) _serversPending++ - _servers.add(1, _serversPendingAttr) requestSchedulingCycle() return request } @@ -387,14 +322,13 @@ internal class ComputeServiceImpl( /** * Run a single scheduling iteration. */ - private fun doSchedule(now: Long) { + private fun doSchedule() { while (queue.isNotEmpty()) { val request = queue.peek() if (request.isCancelled) { queue.poll() _serversPending-- - _servers.add(-1, _serversPendingAttr) continue } @@ -407,9 +341,7 @@ internal class ComputeServiceImpl( // Remove the incoming image queue.poll() _serversPending-- - _servers.add(-1, _serversPendingAttr) _attemptsFailure++ - _schedulingAttempts.add(1, _schedulingAttemptsFailureAttr) logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" } @@ -425,8 +357,6 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() _serversPending-- - _servers.add(-1, _serversPendingAttr) - _schedulingLatency.record(now - request.submitTime, server.attributes) logger.info { "Assigned server $server to host $host." } @@ -442,10 +372,8 @@ internal class ComputeServiceImpl( host.spawn(server) activeServers[server] = host - _servers.add(1, _serversActiveAttr) _serversActive++ _attemptsSuccess++ - _schedulingAttempts.add(1, _schedulingAttemptsSuccessAttr) } catch (e: Throwable) { logger.error(e) { "Failed to deploy VM" } @@ -454,7 +382,6 @@ internal class ComputeServiceImpl( hv.availableMemory += server.flavor.memorySize _attemptsError++ - _schedulingAttempts.add(1, _schedulingAttemptsErrorAttr) } } } @@ -511,7 +438,6 @@ internal class ComputeServiceImpl( if (activeServers.remove(server) != null) { _serversActive-- - _servers.add(-1, _serversActiveAttr) } val hv = hostToView[host] @@ -527,14 +453,4 @@ internal class ComputeServiceImpl( requestSchedulingCycle() } } - - /** - * Collect the timestamp when each server entered its provisioning state most recently. - */ - private fun collectProvisionTime(result: ObservableLongMeasurement) { - for ((_, server) in servers) { - val launchedAt = server.launchedAt ?: continue - result.record(launchedAt.toEpochMilli(), server.attributes) - } - } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt index d2a2d896..f9da24d8 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -22,9 +22,6 @@ package org.opendc.compute.service.internal -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host @@ -54,21 +51,6 @@ internal class InternalServer( private val watchers = mutableListOf<ServerWatcher>() /** - * The attributes of a server. - */ - @JvmField internal val attributes: Attributes = Attributes.builder() - .put(ResourceAttributes.HOST_NAME, name) - .put(ResourceAttributes.HOST_ID, uid.toString()) - .put(ResourceAttributes.HOST_TYPE, flavor.name) - .put(AttributeKey.longKey("host.num_cpus"), flavor.cpuCount.toLong()) - .put(AttributeKey.longKey("host.mem_capacity"), flavor.memorySize) - .put(AttributeKey.stringArrayKey("host.labels"), labels.map { (k, v) -> "$k:$v" }) - .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) - .put(ResourceAttributes.HOST_IMAGE_NAME, image.name) - .put(ResourceAttributes.HOST_IMAGE_ID, image.uid.toString()) - .build() - - /** * The [Host] that has been assigned to host the server. */ @JvmField internal var host: Host? = null diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index eb106817..cc7be4a8 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -23,7 +23,6 @@ package org.opendc.compute.service import io.mockk.* -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.delay import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNull @@ -59,7 +58,7 @@ internal class ComputeServiceTest { filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)), weighers = listOf(RamWeigher()) ) - service = ComputeService(scope.coroutineContext, clock, MeterProvider.noop(), computeScheduler) + service = ComputeService(scope.coroutineContext, clock, computeScheduler) } @Test diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts index e81d87ec..72962147 100644 --- a/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -32,11 +32,8 @@ dependencies { api(projects.opendcSimulator.opendcSimulatorCompute) api(libs.commons.math3) implementation(projects.opendcCommon) - implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) testImplementation(projects.opendcSimulator.opendcSimulatorCore) - testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) - testImplementation(projects.opendcTelemetry.opendcTelemetryCompute) testRuntimeOnly(libs.slf4j.simple) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 323ae4fe..c28239b4 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -22,12 +22,6 @@ package org.opendc.compute.simulator -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.api.metrics.ObservableDoubleMeasurement -import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server @@ -67,7 +61,6 @@ public class SimHost( override val meta: Map<String, Any>, context: CoroutineContext, engine: FlowEngine, - meterProvider: MeterProvider, hypervisorProvider: SimHypervisorProvider, scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)), @@ -86,11 +79,6 @@ public class SimHost( private val clock = engine.clock /** - * The [Meter] to track metrics of the simulated host. - */ - private val meter = meterProvider.get("org.opendc.compute.simulator") - - /** * The event listeners registered with this host. */ private val listeners = mutableListOf<HostListener>() @@ -142,48 +130,6 @@ public class SimHost( init { launch() - - meter.upDownCounterBuilder("system.guests") - .setDescription("Number of guests on this host") - .setUnit("1") - .buildWithCallback(::collectGuests) - meter.gaugeBuilder("system.cpu.limit") - .setDescription("Amount of CPU resources available to the host") - .buildWithCallback(::collectCpuLimit) - meter.gaugeBuilder("system.cpu.demand") - .setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits") - .setUnit("MHz") - .buildWithCallback { result -> result.record(hypervisor.cpuDemand) } - meter.gaugeBuilder("system.cpu.usage") - .setDescription("Amount of CPU resources used by the host") - .setUnit("MHz") - .buildWithCallback { result -> result.record(hypervisor.cpuUsage) } - meter.gaugeBuilder("system.cpu.utilization") - .setDescription("Utilization of the CPU resources of the host") - .setUnit("%") - .buildWithCallback { result -> result.record(hypervisor.cpuUsage / _cpuLimit) } - meter.counterBuilder("system.cpu.time") - .setDescription("Amount of CPU time spent by the host") - .setUnit("s") - .buildWithCallback(::collectCpuTime) - meter.gaugeBuilder("system.power.usage") - .setDescription("Power usage of the host ") - .setUnit("W") - .buildWithCallback { result -> result.record(machine.powerUsage) } - meter.counterBuilder("system.power.total") - .setDescription("Amount of energy used by the CPU") - .setUnit("J") - .ofDoubles() - .buildWithCallback { result -> result.record(machine.energyUsage) } - meter.counterBuilder("system.time") - .setDescription("The uptime of the host") - .setUnit("s") - .buildWithCallback(::collectUptime) - meter.gaugeBuilder("system.time.boot") - .setDescription("The boot time of the host") - .setUnit("1") - .ofLongs() - .buildWithCallback(::collectBootTime) } override fun canFit(server: Server): Boolean { @@ -278,7 +224,7 @@ public class SimHost( return HostSystemStats( Duration.ofMillis(_uptime), Duration.ofMillis(_downtime), - Instant.ofEpochMilli(_bootTime), + _bootTime, machine.powerUsage, machine.energyUsage, terminated, @@ -358,7 +304,7 @@ public class SimHost( _ctx = machine.startWorkload(object : SimWorkload { override fun onStart(ctx: SimMachineContext) { try { - _bootTime = clock.millis() + _bootTime = clock.instant() _state = HostState.UP hypervisor.onStart(ctx) } catch (cause: Throwable) { @@ -422,80 +368,11 @@ public class SimHost( return MachineModel(processingUnits, memoryUnits) } - private val STATE_KEY = AttributeKey.stringKey("state") - - private val terminatedState = Attributes.of(STATE_KEY, "terminated") - private val runningState = Attributes.of(STATE_KEY, "running") - private val errorState = Attributes.of(STATE_KEY, "error") - private val invalidState = Attributes.of(STATE_KEY, "invalid") - - /** - * Helper function to collect the guest counts on this host. - */ - private fun collectGuests(result: ObservableLongMeasurement) { - var terminated = 0L - var running = 0L - var error = 0L - var invalid = 0L - - val guests = _guests.listIterator() - for (guest in guests) { - when (guest.state) { - ServerState.TERMINATED -> terminated++ - ServerState.RUNNING -> running++ - ServerState.ERROR -> error++ - ServerState.DELETED -> { - // Remove guests that have been deleted - this.guests.remove(guest.server) - guests.remove() - } - else -> invalid++ - } - } - - result.record(terminated, terminatedState) - result.record(running, runningState) - result.record(error, errorState) - result.record(invalid, invalidState) - } - - private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } - - /** - * Helper function to collect the CPU limits of a machine. - */ - private fun collectCpuLimit(result: ObservableDoubleMeasurement) { - result.record(_cpuLimit) - - val guests = _guests - for (i in guests.indices) { - guests[i].collectCpuLimit(result) - } - } - - private val _activeState = Attributes.of(STATE_KEY, "active") - private val _stealState = Attributes.of(STATE_KEY, "steal") - private val _lostState = Attributes.of(STATE_KEY, "lost") - private val _idleState = Attributes.of(STATE_KEY, "idle") - - /** - * Helper function to track the CPU time of a machine. - */ - private fun collectCpuTime(result: ObservableLongMeasurement) { - val stats = getCpuStats() - - result.record(stats.activeTime, _activeState) - result.record(stats.idleTime, _idleState) - result.record(stats.stealTime, _stealState) - result.record(stats.lostTime, _lostState) - - val guests = _guests - for (i in guests.indices) { - guests[i].collectCpuTime(result) - } - } - private var _lastReport = clock.millis() + private var _uptime = 0L + private var _downtime = 0L + private var _bootTime: Instant? = null + private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } /** * Helper function to track the uptime of a machine. @@ -517,40 +394,4 @@ public class SimHost( guests[i].updateUptime() } } - - private var _uptime = 0L - private var _downtime = 0L - private val _upState = Attributes.of(STATE_KEY, "up") - private val _downState = Attributes.of(STATE_KEY, "down") - - /** - * Helper function to track the uptime of a machine. - */ - private fun collectUptime(result: ObservableLongMeasurement) { - updateUptime() - - result.record(_uptime, _upState) - result.record(_downtime, _downState) - - val guests = _guests - for (i in guests.indices) { - guests[i].collectUptime(result) - } - } - - private var _bootTime = Long.MIN_VALUE - - /** - * Helper function to track the boot time of a machine. - */ - private fun collectBootTime(result: ObservableLongMeasurement) { - if (_bootTime != Long.MIN_VALUE) { - result.record(_bootTime) - } - - val guests = _guests - for (i in guests.indices) { - guests[i].collectBootTime(result) - } - } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 0d4c550d..ea3c6549 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -22,12 +22,6 @@ package org.opendc.compute.simulator.internal -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes -import io.opentelemetry.api.common.AttributesBuilder -import io.opentelemetry.api.metrics.ObservableDoubleMeasurement -import io.opentelemetry.api.metrics.ObservableLongMeasurement -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.Server @@ -77,11 +71,6 @@ internal class Guest( var state: ServerState = ServerState.TERMINATED /** - * The attributes of the guest. - */ - val attributes: Attributes = GuestAttributes(this) - - /** * Start the guest. */ suspend fun start() { @@ -158,7 +147,7 @@ internal class Guest( return GuestSystemStats( Duration.ofMillis(_uptime), Duration.ofMillis(_downtime), - Instant.ofEpochMilli(_bootTime) + _bootTime ) } @@ -235,7 +224,7 @@ internal class Guest( * This method is invoked when the guest was started on the host and has booted into a running state. */ private fun onStart() { - _bootTime = clock.millis() + _bootTime = clock.instant() state = ServerState.RUNNING listener.onStart(this) } @@ -250,18 +239,11 @@ internal class Guest( listener.onStop(this) } - private val STATE_KEY = AttributeKey.stringKey("state") - private var _uptime = 0L private var _downtime = 0L - private val _upState = attributes.toBuilder() - .put(STATE_KEY, "up") - .build() - private val _downState = attributes.toBuilder() - .put(STATE_KEY, "down") - .build() - private var _lastReport = clock.millis() + private var _bootTime: Instant? = null + private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } /** * Helper function to track the uptime and downtime of the guest. @@ -277,122 +259,4 @@ internal class Guest( _downtime += duration } } - - /** - * Helper function to track the uptime of the guest. - */ - fun collectUptime(result: ObservableLongMeasurement) { - updateUptime() - - result.record(_uptime, _upState) - result.record(_downtime, _downState) - } - - private var _bootTime = Long.MIN_VALUE - - /** - * Helper function to track the boot time of the guest. - */ - fun collectBootTime(result: ObservableLongMeasurement) { - if (_bootTime != Long.MIN_VALUE) { - result.record(_bootTime, attributes) - } - } - - private val _activeState = attributes.toBuilder() - .put(STATE_KEY, "active") - .build() - private val _stealState = attributes.toBuilder() - .put(STATE_KEY, "steal") - .build() - private val _lostState = attributes.toBuilder() - .put(STATE_KEY, "lost") - .build() - private val _idleState = attributes.toBuilder() - .put(STATE_KEY, "idle") - .build() - - /** - * Helper function to track the CPU time of a machine. - */ - fun collectCpuTime(result: ObservableLongMeasurement) { - val counters = machine.counters - counters.flush() - - result.record(counters.cpuActiveTime / 1000, _activeState) - result.record(counters.cpuIdleTime / 1000, _idleState) - result.record(counters.cpuStealTime / 1000, _stealState) - result.record(counters.cpuLostTime / 1000, _lostState) - } - - private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } - - /** - * Helper function to collect the CPU limits of a machine. - */ - fun collectCpuLimit(result: ObservableDoubleMeasurement) { - result.record(_cpuLimit, attributes) - } - - /** - * An optimized [Attributes] implementation. - */ - private class GuestAttributes(private val uid: String, private val attributes: Attributes) : Attributes by attributes { - /** - * Construct a [GuestAttributes] instance from a [Guest]. - */ - constructor(guest: Guest) : this( - guest.server.uid.toString(), - Attributes.builder() - .put(ResourceAttributes.HOST_NAME, guest.server.name) - .put(ResourceAttributes.HOST_ID, guest.server.uid.toString()) - .put(ResourceAttributes.HOST_TYPE, guest.server.flavor.name) - .put(AttributeKey.longKey("host.num_cpus"), guest.server.flavor.cpuCount.toLong()) - .put(AttributeKey.longKey("host.mem_capacity"), guest.server.flavor.memorySize) - .put(AttributeKey.stringArrayKey("host.labels"), guest.server.labels.map { (k, v) -> "$k:$v" }) - .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) - .put(ResourceAttributes.HOST_IMAGE_NAME, guest.server.image.name) - .put(ResourceAttributes.HOST_IMAGE_ID, guest.server.image.uid.toString()) - .build() - ) - - override fun <T : Any?> get(key: AttributeKey<T>): T? { - // Optimize access to the HOST_ID key which is accessed quite often - if (key == ResourceAttributes.HOST_ID) { - @Suppress("UNCHECKED_CAST") - return uid as T? - } - return attributes.get(key) - } - - override fun toBuilder(): AttributesBuilder { - val delegate = attributes.toBuilder() - return object : AttributesBuilder { - - override fun putAll(attributes: Attributes): AttributesBuilder { - delegate.putAll(attributes) - return this - } - - override fun <T : Any?> put(key: AttributeKey<Long>, value: Int): AttributesBuilder { - delegate.put<T>(key, value) - return this - } - - override fun <T : Any?> put(key: AttributeKey<T>, value: T): AttributesBuilder { - delegate.put(key, value) - return this - } - - override fun build(): Attributes = GuestAttributes(uid, delegate.build()) - } - } - - override fun equals(other: Any?): Boolean = attributes == other - - // Cache hash code - private val _hash = attributes.hashCode() - - override fun hashCode(): Int = _hash - } } diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index fd54ad1d..5ba4a667 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -22,7 +22,6 @@ package org.opendc.compute.simulator -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -75,7 +74,6 @@ internal class SimHostTest { meta = emptyMap(), coroutineContext, engine, - MeterProvider.noop(), SimFairShareHypervisorProvider() ) val vmImageA = MockImage( @@ -158,7 +156,6 @@ internal class SimHostTest { meta = emptyMap(), coroutineContext, engine, - MeterProvider.noop(), SimFairShareHypervisorProvider() ) val image = MockImage( diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts index 319b2ae3..7b5fe6c1 100644 --- a/opendc-compute/opendc-compute-workload/build.gradle.kts +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -34,9 +34,6 @@ dependencies { implementation(projects.opendcTrace.opendcTraceParquet) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(projects.opendcTelemetry.opendcTelemetryCompute) - implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index 21cfdad2..fddb4890 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -30,7 +30,6 @@ import org.opendc.compute.api.Server import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost -import org.opendc.compute.workload.telemetry.TelemetryManager import org.opendc.compute.workload.topology.HostSpec import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload @@ -46,7 +45,6 @@ import kotlin.math.max * * @param context [CoroutineContext] to run the simulation in. * @param clock [Clock] instance tracking simulation time. - * @param telemetry Helper class for managing telemetry. * @param scheduler [ComputeScheduler] implementation to use for the service. * @param failureModel A failure model to use for injecting failures. * @param interferenceModel The model to use for performance interference. @@ -55,7 +53,6 @@ import kotlin.math.max public class ComputeServiceHelper( private val context: CoroutineContext, private val clock: Clock, - private val telemetry: TelemetryManager, scheduler: ComputeScheduler, private val failureModel: FailureModel? = null, private val interferenceModel: VmInterferenceModel? = null, @@ -167,7 +164,6 @@ public class ComputeServiceHelper( * @return The [SimHost] that has been constructed by the runner. */ public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost { - val meterProvider = telemetry.createMeterProvider(spec) val host = SimHost( spec.uid, spec.name, @@ -175,7 +171,6 @@ public class ComputeServiceHelper( spec.meta, context, _engine, - meterProvider, spec.hypervisor, powerDriver = spec.powerDriver, interferenceDomain = interferenceModel?.newDomain(), @@ -202,7 +197,6 @@ public class ComputeServiceHelper( * Construct a [ComputeService] instance. */ private fun createService(scheduler: ComputeScheduler, schedulingQuantum: Duration): ComputeService { - val meterProvider = telemetry.createMeterProvider(scheduler) - return ComputeService(context, clock, meterProvider, scheduler, schedulingQuantum) + return ComputeService(context, clock, scheduler, schedulingQuantum) } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt index 6c515118..af4dad44 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt @@ -22,10 +22,10 @@ package org.opendc.compute.workload.export.parquet -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.telemetry.compute.table.ServerTableReader -import org.opendc.telemetry.compute.table.ServiceTableReader +import org.opendc.compute.workload.telemetry.ComputeMonitor +import org.opendc.compute.workload.telemetry.table.HostTableReader +import org.opendc.compute.workload.telemetry.table.ServerTableReader +import org.opendc.compute.workload.telemetry.table.ServiceTableReader import java.io.File /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt index 0d5b6b34..e6e7e42d 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt @@ -27,7 +27,7 @@ import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.* -import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.compute.workload.telemetry.table.HostTableReader import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File import java.util.* diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt index 5d11629b..082c7c88 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt @@ -27,7 +27,7 @@ import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.* -import org.opendc.telemetry.compute.table.ServerTableReader +import org.opendc.compute.workload.telemetry.table.ServerTableReader import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File import java.util.* diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt index 5ad3b95e..2a0fdca1 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt @@ -22,12 +22,11 @@ package org.opendc.compute.workload.export.parquet -import io.opentelemetry.context.ContextKey.named import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.* -import org.opendc.telemetry.compute.table.ServiceTableReader +import org.opendc.compute.workload.telemetry.table.ServiceTableReader import java.io.File /** diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt index 593203fc..45bd9ab1 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.telemetry.compute +package org.opendc.compute.workload.telemetry import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.delay @@ -30,7 +30,7 @@ import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host -import org.opendc.telemetry.compute.table.* +import org.opendc.compute.workload.telemetry.table.* import java.time.Clock import java.time.Duration import java.time.Instant diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMonitor.kt index 64b5f337..36a2079a 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMonitor.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,11 +20,11 @@ * SOFTWARE. */ -package org.opendc.telemetry.compute +package org.opendc.compute.workload.telemetry -import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.telemetry.compute.table.ServerTableReader -import org.opendc.telemetry.compute.table.ServiceTableReader +import org.opendc.compute.workload.telemetry.table.HostTableReader +import org.opendc.compute.workload.telemetry.table.ServerTableReader +import org.opendc.compute.workload.telemetry.table.ServiceTableReader /** * A monitor that tracks the metrics and events of the OpenDC Compute service. diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/NoopTelemetryManager.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/NoopTelemetryManager.kt deleted file mode 100644 index 4e7d0b75..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/NoopTelemetryManager.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.telemetry - -import io.opentelemetry.api.metrics.MeterProvider -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.workload.topology.HostSpec - -/** - * A [TelemetryManager] that does nothing. - */ -public class NoopTelemetryManager : TelemetryManager { - override fun createMeterProvider(host: HostSpec): MeterProvider = MeterProvider.noop() - - override fun createMeterProvider(scheduler: ComputeScheduler): MeterProvider = MeterProvider.noop() -} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt deleted file mode 100644 index 478c0609..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.telemetry - -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.data.AggregationTemporality -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.export.MetricProducer -import io.opentelemetry.sdk.metrics.export.MetricReader -import io.opentelemetry.sdk.metrics.export.MetricReaderFactory -import io.opentelemetry.sdk.resources.Resource -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.workload.topology.HostSpec -import org.opendc.telemetry.compute.* -import org.opendc.telemetry.sdk.toOtelClock -import java.time.Clock - -/** - * A [TelemetryManager] using the OpenTelemetry Java SDK. - */ -public class SdkTelemetryManager(private val clock: Clock) : TelemetryManager, AutoCloseable { - /** - * The [SdkMeterProvider]s that belong to the workload runner. - */ - private val _meterProviders = mutableListOf<SdkMeterProvider>() - - /** - * The internal [MetricProducer] registered with the runner. - */ - private val _metricProducers = mutableListOf<MetricProducer>() - - /** - * The list of [MetricReader]s that have been registered with the runner. - */ - private val _metricReaders = mutableListOf<MetricReader>() - - /** - * A [MetricProducer] that combines all the other metric producers. - */ - public val metricProducer: MetricProducer = object : MetricProducer { - private val producers = _metricProducers - - override fun collectAllMetrics(): Collection<MetricData> = producers.flatMap(MetricProducer::collectAllMetrics) - - override fun toString(): String = "SdkTelemetryManager.AggregateMetricProducer" - } - - /** - * Register a [MetricReader] for this manager. - * - * @param factory The factory for the reader to register. - */ - public fun registerMetricReader(factory: MetricReaderFactory) { - val reader = factory.apply(metricProducer) - _metricReaders.add(reader) - } - - override fun createMeterProvider(scheduler: ComputeScheduler): MeterProvider { - val resource = Resource.builder() - .put(ResourceAttributes.SERVICE_NAME, "opendc-compute") - .build() - - return createMeterProvider(resource) - } - - override fun createMeterProvider(host: HostSpec): MeterProvider { - val resource = Resource.builder() - .put(HOST_ID, host.uid.toString()) - .put(HOST_NAME, host.name) - .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) - .put(HOST_NCPUS, host.model.cpus.size) - .put(HOST_MEM_CAPACITY, host.model.memory.sumOf { it.size }) - .build() - - return createMeterProvider(resource) - } - - /** - * Construct a [SdkMeterProvider] for the specified [resource]. - */ - private fun createMeterProvider(resource: Resource): SdkMeterProvider { - val meterProvider = SdkMeterProvider.builder() - .setClock(clock.toOtelClock()) - .setResource(resource) - .registerMetricReader { producer -> - _metricProducers.add(producer) - object : MetricReader { - override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() - } - } - .build() - _meterProviders.add(meterProvider) - return meterProvider - } - - override fun close() { - for (meterProvider in _meterProviders) { - meterProvider.close() - } - - _meterProviders.clear() - - for (metricReader in _metricReaders) { - metricReader.shutdown() - } - - _metricReaders.clear() - _metricProducers.clear() - } -} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/TelemetryManager.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/TelemetryManager.kt deleted file mode 100644 index b67050ce..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/TelemetryManager.kt +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.telemetry - -import io.opentelemetry.api.metrics.MeterProvider -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.workload.topology.HostSpec - -/** - * Helper class to manage the telemetry for a [ComputeServiceHelper] instance. - */ -public interface TelemetryManager { - /** - * Construct a [MeterProvider] for the specified [ComputeScheduler]. - */ - public fun createMeterProvider(scheduler: ComputeScheduler): MeterProvider - - /** - * Construct a [MeterProvider] for the specified [HostSpec]. - */ - public fun createMeterProvider(host: HostSpec): MeterProvider -} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostInfo.kt index d9a5906b..5d383e40 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostInfo.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.telemetry.compute.table +package org.opendc.compute.workload.telemetry.table /** * Information about a host exposed to the telemetry service. diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostTableReader.kt index 1e1ad94e..8f6f0d01 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostTableReader.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.telemetry.compute.table +package org.opendc.compute.workload.telemetry.table import java.time.Instant diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerInfo.kt index b16e5f3d..111135b7 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerInfo.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.telemetry.compute.table +package org.opendc.compute.workload.telemetry.table /** * Static information about a server exposed to the telemetry service. diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerTableReader.kt index c23d1467..bccccd01 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerTableReader.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.telemetry.compute.table +package org.opendc.compute.workload.telemetry.table import java.time.Instant diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceData.kt index 39bf96f4..a1df6ea7 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceData.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.telemetry.compute.table +package org.opendc.compute.workload.telemetry.table import java.time.Instant diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceTableReader.kt index 908f6748..4211ab15 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceTableReader.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.telemetry.compute.table +package org.opendc.compute.workload.telemetry.table import java.time.Instant diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt index dae03513..4344bb08 100644 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt @@ -25,8 +25,8 @@ package org.opendc.compute.workload.export.parquet import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow -import org.opendc.telemetry.compute.table.HostInfo -import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.compute.workload.telemetry.table.HostInfo +import org.opendc.compute.workload.telemetry.table.HostTableReader import java.nio.file.Files import java.time.Instant diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt index 280f5ef8..8465871d 100644 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt @@ -25,9 +25,9 @@ package org.opendc.compute.workload.export.parquet import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow -import org.opendc.telemetry.compute.table.HostInfo -import org.opendc.telemetry.compute.table.ServerInfo -import org.opendc.telemetry.compute.table.ServerTableReader +import org.opendc.compute.workload.telemetry.table.HostInfo +import org.opendc.compute.workload.telemetry.table.ServerInfo +import org.opendc.compute.workload.telemetry.table.ServerTableReader import java.nio.file.Files import java.time.Instant diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt index 7ffa7186..d91982bc 100644 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt @@ -25,7 +25,7 @@ package org.opendc.compute.workload.export.parquet import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow -import org.opendc.telemetry.compute.table.ServiceTableReader +import org.opendc.compute.workload.telemetry.table.ServiceTableReader import java.nio.file.Files import java.time.Instant diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 9495f4ca..39cf101d 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -37,8 +37,6 @@ dependencies { implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcCompute.opendcComputeSimulator) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(projects.opendcTelemetry.opendcTelemetryCompute) implementation(libs.config) implementation(libs.kotlin.logging) @@ -46,7 +44,6 @@ dependencies { implementation(libs.jackson.module.kotlin) implementation(libs.jackson.dataformat.csv) implementation(kotlin("reflect")) - implementation(libs.opentelemetry.semconv) runtimeOnly(projects.opendcTrace.opendcTraceOpendc) diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt index 83b8c0c6..fd2c26f0 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt @@ -22,14 +22,12 @@ package org.opendc.experiments.capelin -import kotlinx.coroutines.ExperimentalCoroutinesApi import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.topology.clusterTopology @@ -46,7 +44,6 @@ import java.util.concurrent.TimeUnit @Fork(1) @Warmup(iterations = 2, time = 5, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) -@OptIn(ExperimentalCoroutinesApi::class) class CapelinBenchmarks { private lateinit var vms: List<VirtualMachine> private lateinit var topology: Topology @@ -59,7 +56,7 @@ class CapelinBenchmarks { val loader = ComputeWorkloadLoader(File("src/test/resources/trace")) val source = trace("bitbrains-small") vms = source.resolve(loader, Random(1L)).vms - topology = checkNotNull(object {}.javaClass.getResourceAsStream("/env/topology.txt")).use { clusterTopology(it) } + topology = checkNotNull(object {}.javaClass.getResourceAsStream("/topology.txt")).use { clusterTopology(it) } } @Benchmark @@ -71,7 +68,6 @@ class CapelinBenchmarks { val runner = ComputeServiceHelper( coroutineContext, clock, - NoopTelemetryManager(), computeScheduler ) diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/resources/topology.txt b/opendc-experiments/opendc-experiments-capelin/src/jmh/resources/topology.txt new file mode 100644 index 00000000..6b347bff --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/resources/topology.txt @@ -0,0 +1,5 @@ +ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost +A01;A01;32;3.2;2048;1;256;32 +B01;B01;48;2.93;1256;6;64;8 +C01;C01;32;3.2;2048;2;128;16 + diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 6fd85e8c..0de8aa7b 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -30,7 +30,7 @@ import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.createComputeScheduler import org.opendc.compute.workload.export.parquet.ParquetComputeMonitor import org.opendc.compute.workload.grid5000 -import org.opendc.compute.workload.telemetry.NoopTelemetryManager +import org.opendc.compute.workload.telemetry.ComputeMetricReader import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology @@ -39,7 +39,6 @@ import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.ComputeMetricReader import java.io.File import java.time.Duration import java.util.* @@ -99,11 +98,9 @@ abstract class Portfolio(name: String) : Experiment(name) { else null val (vms, interferenceModel) = workload.source.resolve(workloadLoader, seeder) - val telemetry = NoopTelemetryManager() val runner = ComputeServiceHelper( coroutineContext, clock, - telemetry, computeScheduler, failureModel, interferenceModel?.withSeed(repeat.toLong()) diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 62cdf123..fa2cd9c8 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -33,14 +33,13 @@ import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.NoopTelemetryManager +import org.opendc.compute.workload.telemetry.ComputeMetricReader +import org.opendc.compute.workload.telemetry.ComputeMonitor +import org.opendc.compute.workload.telemetry.table.HostTableReader import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.ComputeMetricReader -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostTableReader import java.io.File import java.time.Duration import java.util.* @@ -86,7 +85,6 @@ class CapelinIntegrationTest { val runner = ComputeServiceHelper( coroutineContext, clock, - NoopTelemetryManager(), computeScheduler ) val topology = createTopology() @@ -136,7 +134,6 @@ class CapelinIntegrationTest { val runner = ComputeServiceHelper( coroutineContext, clock, - NoopTelemetryManager(), computeScheduler ) val topology = createTopology("single") @@ -182,7 +179,6 @@ class CapelinIntegrationTest { val simulator = ComputeServiceHelper( coroutineContext, clock, - NoopTelemetryManager(), computeScheduler, interferenceModel = interferenceModel?.withSeed(seed.toLong()) ) @@ -226,7 +222,6 @@ class CapelinIntegrationTest { val simulator = ComputeServiceHelper( coroutineContext, clock, - NoopTelemetryManager(), computeScheduler, grid5000(Duration.ofDays(7)) ) diff --git a/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts b/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts index b96647a6..a6391986 100644 --- a/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts @@ -33,7 +33,6 @@ dependencies { implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcFaas.opendcFaasService) implementation(projects.opendcFaas.opendcFaasSimulator) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) implementation(libs.kotlin.logging) implementation(libs.config) } diff --git a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt index 3312d6c0..1c357f67 100644 --- a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt +++ b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt @@ -23,8 +23,6 @@ package org.opendc.experiments.serverless import com.typesafe.config.ConfigFactory -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.SdkMeterProvider import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -44,7 +42,6 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.sdk.toOtelClock import java.io.File import java.time.Duration import java.util.* @@ -76,17 +73,18 @@ public class ServerlessExperiment : Experiment("Serverless") { private val coldStartModel by anyOf(ColdStartModel.LAMBDA, ColdStartModel.AZURE, ColdStartModel.GOOGLE) override fun doRun(repeat: Int): Unit = runBlockingSimulation { - val meterProvider: MeterProvider = SdkMeterProvider - .builder() - .setClock(clock.toOtelClock()) - .build() - val trace = ServerlessTraceReader().parse(File(config.getString("trace-path"))) val traceById = trace.associateBy { it.id } val delayInjector = StochasticDelayInjector(coldStartModel, Random()) val deployer = SimFunctionDeployer(clock, this, createMachineModel(), delayInjector) { FunctionTraceWorkload(traceById.getValue(it.name)) } val service = - FaaSService(coroutineContext, clock, meterProvider, deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMinutes(10))) + FaaSService( + coroutineContext, + clock, + deployer, + routingPolicy, + FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMinutes(10)) + ) val client = service.newClient() coroutineScope { diff --git a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts index 5762ce64..f61c8fef 100644 --- a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts @@ -32,7 +32,6 @@ dependencies { api(projects.opendcHarness.opendcHarnessApi) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) implementation(projects.opendcCommon) implementation(libs.kotlin.logging) diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt index 2153a862..19236029 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt @@ -22,8 +22,6 @@ package org.opendc.experiments.tf20 -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.SdkMeterProvider import org.opendc.experiments.tf20.core.SimTFDevice import org.opendc.experiments.tf20.distribute.* import org.opendc.experiments.tf20.keras.AlexNet @@ -32,7 +30,6 @@ import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.compute.power.LinearPowerModel import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.sdk.toOtelClock /** * Experiments with the TensorFlow simulation model. @@ -49,17 +46,11 @@ public class TensorFlowExperiment : Experiment(name = "tf20") { private val batchSize by anyOf(16, 32, 64, 128) override fun doRun(repeat: Int): Unit = runBlockingSimulation { - val meterProvider: MeterProvider = SdkMeterProvider - .builder() - .setClock(clock.toOtelClock()) - .build() - val meter = meterProvider.get("opendc-tf20") - val envInput = checkNotNull(TensorFlowExperiment::class.java.getResourceAsStream(environmentFile)) val def = MLEnvironmentReader().readEnvironment(envInput).first() val device = SimTFDevice( - def.uid, def.meta["gpu"] as Boolean, coroutineContext, clock, meter, def.model.cpus[0], - def.model.memory[0], LinearPowerModel(250.0, 60.0) + def.uid, def.meta["gpu"] as Boolean, coroutineContext, clock, def.model.cpus[0], def.model.memory[0], + LinearPowerModel(250.0, 60.0) ) val strategy = OneDeviceStrategy(device) diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index 99948c8e..d2105196 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.tf20.core -import io.opentelemetry.api.metrics.Meter import kotlinx.coroutines.* import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.SimMachine @@ -50,7 +49,6 @@ public class SimTFDevice( override val isGpu: Boolean, context: CoroutineContext, clock: Clock, - meter: Meter, pu: ProcessingUnit, private val memory: MemoryUnit, powerModel: PowerModel @@ -69,21 +67,9 @@ public class SimTFDevice( ) /** - * The usage of the device. + * Metrics collected by the device. */ - private val _usage = meter.histogramBuilder("device.usage") - .setDescription("The amount of device resources used") - .setUnit("MHz") - .build() private var _resourceUsage = 0.0 - - /** - * The power draw of the device. - */ - private val _power = meter.histogramBuilder("device.power") - .setDescription("The power draw of the device") - .setUnit("W") - .build() private var _powerUsage = 0.0 private var _energyUsage = 0.0 @@ -171,9 +157,7 @@ public class SimTFDevice( } override fun onConverge(conn: FlowConnection, now: Long) { - _usage.record(conn.rate) _resourceUsage = conn.rate - _power.record(machine.psu.powerDraw) _powerUsage = machine.powerUsage _energyUsage = machine.energyUsage } diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt index 0d5fbebb..fd18a3a7 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.tf20.core -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import org.junit.jupiter.api.Assertions.assertAll @@ -41,14 +40,19 @@ import java.util.* internal class SimTFDeviceTest { @Test fun testSmoke() = runBlockingSimulation { - val meterProvider: MeterProvider = MeterProvider.noop() - val meter = meterProvider.get("opendc-tf20") - val puNode = ProcessingNode("NVIDIA", "Tesla V100", "unknown", 1) val pu = ProcessingUnit(puNode, 0, 960 * 1230.0) val memory = MemoryUnit("NVIDIA", "Tesla V100", 877.0, 32_000) - val device = SimTFDevice(UUID.randomUUID(), isGpu = true, coroutineContext, clock, meter, pu, memory, LinearPowerModel(250.0, 100.0)) + val device = SimTFDevice( + UUID.randomUUID(), + isGpu = true, + coroutineContext, + clock, + pu, + memory, + LinearPowerModel(250.0, 100.0) + ) // Load 1 GiB into GPU memory device.load(1000) diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts index 1803ae69..34f5b7ea 100644 --- a/opendc-faas/opendc-faas-service/build.gradle.kts +++ b/opendc-faas/opendc-faas-service/build.gradle.kts @@ -29,11 +29,9 @@ plugins { dependencies { api(projects.opendcFaas.opendcFaasApi) - api(projects.opendcTelemetry.opendcTelemetryApi) api(libs.commons.math3) implementation(projects.opendcCommon) implementation(libs.kotlin.logging) - implementation(libs.opentelemetry.semconv) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testRuntimeOnly(libs.log4j.slf4j) diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt index f7dc3c1f..7b40d867 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt @@ -22,8 +22,6 @@ package org.opendc.faas.service -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider import org.opendc.faas.api.FaaSClient import org.opendc.faas.api.FaaSFunction import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy @@ -33,6 +31,7 @@ import org.opendc.faas.service.router.RoutingPolicy import org.opendc.faas.service.telemetry.FunctionStats import org.opendc.faas.service.telemetry.SchedulerStats import java.time.Clock +import java.time.Duration import kotlin.coroutines.CoroutineContext /** @@ -65,20 +64,20 @@ public interface FaaSService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. - * @param meterProvider The [MeterProvider] to create a [Meter] with. * @param deployer the [FunctionDeployer] to use for deploying function instances. * @param routingPolicy The policy to route function invocations. * @param terminationPolicy The policy for terminating function instances. + * @param quantum The scheduling quantum of the service (100 ms default) */ public operator fun invoke( context: CoroutineContext, clock: Clock, - meterProvider: MeterProvider, deployer: FunctionDeployer, routingPolicy: RoutingPolicy, terminationPolicy: FunctionTerminationPolicy, + quantum: Duration = Duration.ofMillis(100) ): FaaSService { - return FaaSServiceImpl(context, clock, meterProvider, deployer, routingPolicy, terminationPolicy) + return FaaSServiceImpl(context, clock, deployer, routingPolicy, terminationPolicy, quantum) } } } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt index 52fcffa1..1cc33f6f 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt @@ -22,13 +22,6 @@ package org.opendc.faas.service -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes -import io.opentelemetry.api.metrics.LongCounter -import io.opentelemetry.api.metrics.LongHistogram -import io.opentelemetry.api.metrics.LongUpDownCounter -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.telemetry.FunctionStats @@ -38,7 +31,6 @@ import java.util.* * An [FunctionObject] represents the service's view of a serverless function. */ public class FunctionObject( - meter: Meter, public val uid: UUID, name: String, allocatedMemory: Long, @@ -46,88 +38,16 @@ public class FunctionObject( meta: Map<String, Any> ) : AutoCloseable { /** - * The attributes of this function. + * Metrics tracked per function. */ - private val attributes: Attributes = Attributes.builder() - .put(ResourceAttributes.FAAS_ID, uid.toString()) - .put(ResourceAttributes.FAAS_NAME, name) - .put(ResourceAttributes.FAAS_MAX_MEMORY, allocatedMemory) - .put(AttributeKey.stringArrayKey("faas.labels"), labels.map { (k, v) -> "$k:$v" }) - .build() - - /** - * The total amount of function invocations received by the function. - */ - private val invocations: LongCounter = meter.counterBuilder("function.invocations.total") - .setDescription("Number of function invocations") - .setUnit("1") - .build() private var _invocations = 0L - - /** - * The amount of function invocations that could be handled directly. - */ - private val timelyInvocations: LongCounter = meter.counterBuilder("function.invocations.warm") - .setDescription("Number of function invocations handled directly") - .setUnit("1") - .build() private var _timelyInvocations = 0L - - /** - * The amount of function invocations that were delayed due to function deployment. - */ - private val delayedInvocations: LongCounter = meter.counterBuilder("function.invocations.cold") - .setDescription("Number of function invocations that are delayed") - .setUnit("1") - .build() private var _delayedInvocations = 0L - - /** - * The amount of function invocations that failed. - */ - private val failedInvocations: LongCounter = meter.counterBuilder("function.invocations.failed") - .setDescription("Number of function invocations that failed") - .setUnit("1") - .build() private var _failedInvocations = 0L - - /** - * The amount of instances for this function. - */ - private val activeInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.active") - .setDescription("Number of active function instances") - .setUnit("1") - .build() private var _activeInstances = 0 - - /** - * The amount of idle instances for this function. - */ - private val idleInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle") - .setDescription("Number of idle function instances") - .setUnit("1") - .build() private var _idleInstances = 0 - - /** - * The time that the function waited. - */ - private val waitTime: LongHistogram = meter.histogramBuilder("function.time.wait") - .ofLongs() - .setDescription("Time the function has to wait before being started") - .setUnit("ms") - .build() private val _waitTime = DescriptiveStatistics() .apply { windowSize = 100 } - - /** - * The time that the function was running. - */ - private val activeTime: LongHistogram = meter.histogramBuilder("function.time.active") - .ofLongs() - .setDescription("Time the function was running") - .setUnit("ms") - .build() private val _activeTime = DescriptiveStatistics() .apply { windowSize = 100 } @@ -150,7 +70,6 @@ public class FunctionObject( * Report a scheduled invocation. */ internal fun reportSubmission() { - invocations.add(1, attributes) _invocations++ } @@ -159,13 +78,9 @@ public class FunctionObject( */ internal fun reportDeployment(isDelayed: Boolean) { if (isDelayed) { - delayedInvocations.add(1, attributes) _delayedInvocations++ - - idleInstances.add(1, attributes) _idleInstances++ } else { - timelyInvocations.add(1, attributes) _timelyInvocations++ } } @@ -175,12 +90,9 @@ public class FunctionObject( */ internal fun reportStart(start: Long, submitTime: Long) { val wait = start - submitTime - waitTime.record(wait, attributes) _waitTime.addValue(wait.toDouble()) - idleInstances.add(-1, attributes) _idleInstances-- - activeInstances.add(1, attributes) _activeInstances++ } @@ -188,7 +100,6 @@ public class FunctionObject( * Report the failure of a function invocation. */ internal fun reportFailure() { - failedInvocations.add(1, attributes) _failedInvocations++ } @@ -196,11 +107,8 @@ public class FunctionObject( * Report the end of a function invocation. */ internal fun reportEnd(duration: Long) { - activeTime.record(duration, attributes) _activeTime.addValue(duration.toDouble()) - idleInstances.add(1, attributes) _idleInstances++ - activeInstances.add(-1, attributes) _activeInstances-- } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index ce3b2b98..4ee55dea 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -22,8 +22,6 @@ package org.opendc.faas.service.internal -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import kotlinx.coroutines.intrinsics.startCoroutineCancellable import mu.KotlinLogging @@ -42,6 +40,7 @@ import org.opendc.faas.service.telemetry.FunctionStats import org.opendc.faas.service.telemetry.SchedulerStats import java.lang.IllegalStateException import java.time.Clock +import java.time.Duration import java.util.* import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext @@ -57,10 +56,10 @@ import kotlin.coroutines.resumeWithException internal class FaaSServiceImpl( context: CoroutineContext, private val clock: Clock, - meterProvider: MeterProvider, private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy, - private val terminationPolicy: FunctionTerminationPolicy + private val terminationPolicy: FunctionTerminationPolicy, + quantum: Duration ) : FaaSService, FunctionInstanceListener { /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. @@ -73,14 +72,9 @@ internal class FaaSServiceImpl( private val logger = KotlinLogging.logger {} /** - * The [Meter] that collects the metrics of this service. - */ - private val meter = meterProvider.get("org.opendc.faas.service") - - /** * The [Pacer] to use for scheduling the scheduler cycles. */ - private val pacer = Pacer(scope.coroutineContext, clock, quantum = 100) { doSchedule() } + private val pacer = Pacer(scope.coroutineContext, clock, quantum = quantum.toMillis()) { doSchedule() } /** * The [Random] instance used to generate unique identifiers for the objects. @@ -99,30 +93,10 @@ internal class FaaSServiceImpl( private val queue = ArrayDeque<InvocationRequest>() /** - * The total amount of function invocations received by the service. + * Metrics tracked by the service. */ - private val _invocations = meter.counterBuilder("service.invocations.total") - .setDescription("Number of function invocations") - .setUnit("1") - .build() private var totalInvocations = 0L - - /** - * The amount of function invocations that could be handled directly. - */ - private val _timelyInvocations = meter.counterBuilder("service.invocations.warm") - .setDescription("Number of function invocations handled directly") - .setUnit("1") - .build() private var timelyInvocations = 0L - - /** - * The amount of function invocations that were delayed due to function deployment. - */ - private val _delayedInvocations = meter.counterBuilder("service.invocations.cold") - .setDescription("Number of function invocations that are delayed") - .setUnit("1") - .build() private var delayedInvocations = 0L override fun newClient(): FaaSClient { @@ -165,7 +139,6 @@ internal class FaaSServiceImpl( val uid = UUID(clock.millis(), random.nextLong()) val function = FunctionObject( - meter, uid, name, memorySize, @@ -232,7 +205,6 @@ internal class FaaSServiceImpl( } val instance = if (activeInstance != null) { - _timelyInvocations.add(1) timelyInvocations++ function.reportDeployment(isDelayed = false) @@ -242,7 +214,6 @@ internal class FaaSServiceImpl( instances.add(instance) terminationPolicy.enqueue(instance) - _delayedInvocations.add(1) delayedInvocations++ function.reportDeployment(isDelayed = true) @@ -271,7 +242,6 @@ internal class FaaSServiceImpl( suspend fun invoke(function: FunctionObject) { check(function.uid in functions) { "Function does not exist (anymore)" } - _invocations.add(1) totalInvocations++ function.reportSubmission() diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt index 1612e10b..560039c1 100644 --- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt @@ -23,8 +23,6 @@ package org.opendc.faas.service import io.mockk.* -import io.opentelemetry.api.metrics.MeterProvider -import kotlinx.coroutines.ExperimentalCoroutinesApi import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow @@ -39,12 +37,11 @@ import java.util.* /** * Test suite for the [FaaSService] implementation. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class FaaSServiceTest { @Test fun testClientState() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = assertDoesNotThrow { service.newClient() } assertDoesNotThrow { client.close() } @@ -58,7 +55,7 @@ internal class FaaSServiceTest { @Test fun testClientInvokeUnknown() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -67,7 +64,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCreation() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -78,7 +75,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionQuery() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -91,7 +88,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindById() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -104,7 +101,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindByName() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -117,7 +114,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDuplicateName() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -128,7 +125,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDelete() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -142,7 +139,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCannotInvokeDeleted() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -155,7 +152,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionInvoke() = runBlockingSimulation { val deployer = mockk<FunctionDeployer>() - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), deployer, mockk(), mockk(relaxUnitFun = true)) + val service = FaaSService(coroutineContext, clock, deployer, mockk(), mockk(relaxUnitFun = true)) every { deployer.deploy(any(), any()) } answers { object : FunctionInstance { diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt index 792a8584..d528558c 100644 --- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt @@ -24,7 +24,6 @@ package org.opendc.faas.simulator import io.mockk.coVerify import io.mockk.spyk -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.yield @@ -78,7 +77,10 @@ internal class SimFaaSServiceTest { val delayInjector = StochasticDelayInjector(ColdStartModel.GOOGLE, random) val deployer = SimFunctionDeployer(clock, this, machineModel, delayInjector) { workload } val service = FaaSService( - coroutineContext, clock, MeterProvider.noop(), deployer, RandomRoutingPolicy(), + coroutineContext, + clock, + deployer, + RandomRoutingPolicy(), FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMillis(10000)) ) diff --git a/opendc-telemetry/build.gradle.kts b/opendc-telemetry/build.gradle.kts deleted file mode 100644 index 6473a29e..00000000 --- a/opendc-telemetry/build.gradle.kts +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Telemetry processing for OpenDC" - -subprojects { - group = "org.opendc.telemetry" -} diff --git a/opendc-telemetry/opendc-telemetry-api/build.gradle.kts b/opendc-telemetry/opendc-telemetry-api/build.gradle.kts deleted file mode 100644 index 32a36d68..00000000 --- a/opendc-telemetry/opendc-telemetry-api/build.gradle.kts +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Telemetry API for OpenDC" - -/* Build configuration */ -plugins { - `kotlin-library-conventions` -} - -dependencies { - api(libs.opentelemetry.api) -} diff --git a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts deleted file mode 100644 index b476a669..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Telemetry for OpenDC Compute" - -/* Build configuration */ -plugins { - `kotlin-library-conventions` -} - -dependencies { - api(projects.opendcTelemetry.opendcTelemetrySdk) - - implementation(projects.opendcCompute.opendcComputeService) - implementation(libs.opentelemetry.semconv) - implementation(libs.kotlin.logging) -} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt deleted file mode 100644 index 9557f680..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt +++ /dev/null @@ -1,517 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:Suppress("PropertyName") - -package org.opendc.telemetry.compute - -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.data.PointData -import io.opentelemetry.sdk.resources.Resource -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes -import org.opendc.telemetry.compute.table.* -import java.time.Instant - -/** - * Helper class responsible for aggregating [MetricData] into [ServiceTableReader], [HostTableReader] and [ServerTableReader]. - */ -public class ComputeMetricAggregator { - private val _service = ServiceAggregator() - private val _hosts = mutableMapOf<String, HostAggregator>() - private val _servers = mutableMapOf<String, ServerAggregator>() - - /** - * Process the specified [metrics] for this cycle. - */ - public fun process(metrics: Collection<MetricData>) { - val service = _service - val hosts = _hosts - val servers = _servers - - for (metric in metrics) { - val resource = metric.resource - - when (metric.name) { - // ComputeService - "scheduler.hosts" -> { - for (point in metric.longSumData.points) { - // Record the timestamp for the service - service.recordTimestamp(point) - - when (point.attributes[STATE_KEY]) { - "up" -> service._hostsUp = point.value.toInt() - "down" -> service._hostsDown = point.value.toInt() - } - } - } - "scheduler.servers" -> { - for (point in metric.longSumData.points) { - when (point.attributes[STATE_KEY]) { - "pending" -> service._serversPending = point.value.toInt() - "active" -> service._serversActive = point.value.toInt() - } - } - } - "scheduler.attempts" -> { - for (point in metric.longSumData.points) { - when (point.attributes[RESULT_KEY]) { - "success" -> service._attemptsSuccess = point.value.toInt() - "failure" -> service._attemptsFailure = point.value.toInt() - "error" -> service._attemptsError = point.value.toInt() - } - } - } - - // SimHost - "system.guests" -> { - val agg = getHost(hosts, resource) ?: continue - - for (point in metric.longSumData.points) { - when (point.attributes[STATE_KEY]) { - "terminated" -> agg._guestsTerminated = point.value.toInt() - "running" -> agg._guestsRunning = point.value.toInt() - "error" -> agg._guestsRunning = point.value.toInt() - "invalid" -> agg._guestsInvalid = point.value.toInt() - } - } - } - "system.cpu.limit" -> { - val agg = getHost(hosts, resource) ?: continue - - for (point in metric.doubleGaugeData.points) { - val server = getServer(servers, point) - - if (server != null) { - server._cpuLimit = point.value - server._host = agg.host - } else { - agg._cpuLimit = point.value - } - } - } - "system.cpu.usage" -> { - val agg = getHost(hosts, resource) ?: continue - agg._cpuUsage = metric.doubleGaugeData.points.first().value - } - "system.cpu.demand" -> { - val agg = getHost(hosts, resource) ?: continue - agg._cpuDemand = metric.doubleGaugeData.points.first().value - } - "system.cpu.utilization" -> { - val agg = getHost(hosts, resource) ?: continue - agg._cpuUtilization = metric.doubleGaugeData.points.first().value - } - "system.cpu.time" -> { - val agg = getHost(hosts, resource) ?: continue - - for (point in metric.longSumData.points) { - val server = getServer(servers, point) - val state = point.attributes[STATE_KEY] - if (server != null) { - when (state) { - "active" -> server._cpuActiveTime = point.value - "idle" -> server._cpuIdleTime = point.value - "steal" -> server._cpuStealTime = point.value - "lost" -> server._cpuLostTime = point.value - } - server._host = agg.host - } else { - when (state) { - "active" -> agg._cpuActiveTime = point.value - "idle" -> agg._cpuIdleTime = point.value - "steal" -> agg._cpuStealTime = point.value - "lost" -> agg._cpuLostTime = point.value - } - } - } - } - "system.power.usage" -> { - val agg = getHost(hosts, resource) ?: continue - agg._powerUsage = metric.doubleGaugeData.points.first().value - } - "system.power.total" -> { - val agg = getHost(hosts, resource) ?: continue - agg._powerTotal = metric.doubleSumData.points.first().value - } - "system.time" -> { - val agg = getHost(hosts, resource) ?: continue - - for (point in metric.longSumData.points) { - val server = getServer(servers, point) - - if (server != null) { - server.recordTimestamp(point) - - when (point.attributes[STATE_KEY]) { - "up" -> server._uptime = point.value - "down" -> server._downtime = point.value - } - server._host = agg.host - } else { - agg.recordTimestamp(point) - - when (point.attributes[STATE_KEY]) { - "up" -> agg._uptime = point.value - "down" -> agg._downtime = point.value - } - } - } - } - "system.time.boot" -> { - val agg = getHost(hosts, resource) ?: continue - - for (point in metric.longGaugeData.points) { - val server = getServer(servers, point) - - if (server != null) { - server._bootTime = Instant.ofEpochMilli(point.value) - server._host = agg.host - } else { - agg._bootTime = Instant.ofEpochMilli(point.value) - } - } - } - "system.time.provision" -> { - for (point in metric.longGaugeData.points) { - val server = getServer(servers, point) ?: continue - server.recordTimestamp(point) - server._provisionTime = Instant.ofEpochMilli(point.value) - } - } - } - } - } - - /** - * Collect the data via the [monitor]. - */ - public fun collect(monitor: ComputeMonitor) { - monitor.record(_service) - - for (host in _hosts.values) { - monitor.record(host) - host.reset() - } - - for (server in _servers.values) { - monitor.record(server) - server.reset() - } - } - - /** - * Obtain the [HostAggregator] for the specified [resource]. - */ - private fun getHost(hosts: MutableMap<String, HostAggregator>, resource: Resource): HostAggregator? { - val id = resource.attributes[HOST_ID] - return if (id != null) { - hosts.getOrPut(id) { HostAggregator(resource) } - } else { - null - } - } - - /** - * Obtain the [ServerAggregator] for the specified [point]. - */ - private fun getServer(servers: MutableMap<String, ServerAggregator>, point: PointData): ServerAggregator? { - val id = point.attributes[ResourceAttributes.HOST_ID] - return if (id != null) { - servers.getOrPut(id) { ServerAggregator(point.attributes) } - } else { - null - } - } - - /** - * An aggregator for service metrics before they are reported. - */ - internal class ServiceAggregator : ServiceTableReader { - private var _timestamp: Instant = Instant.MIN - override val timestamp: Instant - get() = _timestamp - - override val hostsUp: Int - get() = _hostsUp - @JvmField var _hostsUp = 0 - - override val hostsDown: Int - get() = _hostsDown - @JvmField var _hostsDown = 0 - - override val serversPending: Int - get() = _serversPending - @JvmField var _serversPending = 0 - - override val serversActive: Int - get() = _serversActive - @JvmField var _serversActive = 0 - - override val attemptsSuccess: Int - get() = _attemptsSuccess - @JvmField var _attemptsSuccess = 0 - - override val attemptsFailure: Int - get() = _attemptsFailure - @JvmField var _attemptsFailure = 0 - - override val attemptsError: Int - get() = _attemptsError - @JvmField var _attemptsError = 0 - - /** - * Record the timestamp of a [point] for this aggregator. - */ - fun recordTimestamp(point: PointData) { - _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms - } - } - - /** - * An aggregator for host metrics before they are reported. - */ - internal class HostAggregator(resource: Resource) : HostTableReader { - /** - * The static information about this host. - */ - override val host = HostInfo( - resource.attributes[HOST_ID]!!, - resource.attributes[HOST_NAME] ?: "", - resource.attributes[HOST_ARCH] ?: "", - resource.attributes[HOST_NCPUS]?.toInt() ?: 0, - resource.attributes[HOST_MEM_CAPACITY] ?: 0, - ) - - override val timestamp: Instant - get() = _timestamp - private var _timestamp = Instant.MIN - - override val guestsTerminated: Int - get() = _guestsTerminated - @JvmField var _guestsTerminated = 0 - - override val guestsRunning: Int - get() = _guestsRunning - @JvmField var _guestsRunning = 0 - - override val guestsError: Int - get() = _guestsError - @JvmField var _guestsError = 0 - - override val guestsInvalid: Int - get() = _guestsInvalid - @JvmField var _guestsInvalid = 0 - - override val cpuLimit: Double - get() = _cpuLimit - @JvmField var _cpuLimit = 0.0 - - override val cpuUsage: Double - get() = _cpuUsage - @JvmField var _cpuUsage = 0.0 - - override val cpuDemand: Double - get() = _cpuDemand - @JvmField var _cpuDemand = 0.0 - - override val cpuUtilization: Double - get() = _cpuUtilization - @JvmField var _cpuUtilization = 0.0 - - override val cpuActiveTime: Long - get() = _cpuActiveTime - previousCpuActiveTime - @JvmField var _cpuActiveTime = 0L - private var previousCpuActiveTime = 0L - - override val cpuIdleTime: Long - get() = _cpuIdleTime - previousCpuIdleTime - @JvmField var _cpuIdleTime = 0L - private var previousCpuIdleTime = 0L - - override val cpuStealTime: Long - get() = _cpuStealTime - previousCpuStealTime - @JvmField var _cpuStealTime = 0L - private var previousCpuStealTime = 0L - - override val cpuLostTime: Long - get() = _cpuLostTime - previousCpuLostTime - @JvmField var _cpuLostTime = 0L - private var previousCpuLostTime = 0L - - override val powerUsage: Double - get() = _powerUsage - @JvmField var _powerUsage = 0.0 - - override val powerTotal: Double - get() = _powerTotal - previousPowerTotal - @JvmField var _powerTotal = 0.0 - private var previousPowerTotal = 0.0 - - override val uptime: Long - get() = _uptime - previousUptime - @JvmField var _uptime = 0L - private var previousUptime = 0L - - override val downtime: Long - get() = _downtime - previousDowntime - @JvmField var _downtime = 0L - private var previousDowntime = 0L - - override val bootTime: Instant? - get() = _bootTime - @JvmField var _bootTime: Instant? = null - - /** - * Finish the aggregation for this cycle. - */ - fun reset() { - // Reset intermediate state for next aggregation - previousCpuActiveTime = _cpuActiveTime - previousCpuIdleTime = _cpuIdleTime - previousCpuStealTime = _cpuStealTime - previousCpuLostTime = _cpuLostTime - previousPowerTotal = _powerTotal - previousUptime = _uptime - previousDowntime = _downtime - - _guestsTerminated = 0 - _guestsRunning = 0 - _guestsError = 0 - _guestsInvalid = 0 - - _cpuLimit = 0.0 - _cpuUsage = 0.0 - _cpuDemand = 0.0 - _cpuUtilization = 0.0 - - _powerUsage = 0.0 - } - - /** - * Record the timestamp of a [point] for this aggregator. - */ - fun recordTimestamp(point: PointData) { - _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms - } - } - - /** - * An aggregator for server metrics before they are reported. - */ - internal class ServerAggregator(attributes: Attributes) : ServerTableReader { - /** - * The static information about this server. - */ - override val server = ServerInfo( - attributes[ResourceAttributes.HOST_ID]!!, - attributes[ResourceAttributes.HOST_NAME]!!, - attributes[ResourceAttributes.HOST_TYPE]!!, - attributes[ResourceAttributes.HOST_ARCH]!!, - attributes[ResourceAttributes.HOST_IMAGE_ID]!!, - attributes[ResourceAttributes.HOST_IMAGE_NAME]!!, - attributes[AttributeKey.longKey("host.num_cpus")]!!.toInt(), - attributes[AttributeKey.longKey("host.mem_capacity")]!!, - ) - - /** - * The [HostInfo] of the host on which the server is hosted. - */ - override val host: HostInfo? - get() = _host - @JvmField var _host: HostInfo? = null - - private var _timestamp = Instant.MIN - override val timestamp: Instant - get() = _timestamp - - override val uptime: Long - get() = _uptime - previousUptime - @JvmField var _uptime: Long = 0 - private var previousUptime = 0L - - override val downtime: Long - get() = _downtime - previousDowntime - @JvmField var _downtime: Long = 0 - private var previousDowntime = 0L - - override val provisionTime: Instant? - get() = _provisionTime - @JvmField var _provisionTime: Instant? = null - - override val bootTime: Instant? - get() = _bootTime - @JvmField var _bootTime: Instant? = null - - override val cpuLimit: Double - get() = _cpuLimit - @JvmField var _cpuLimit = 0.0 - - override val cpuActiveTime: Long - get() = _cpuActiveTime - previousCpuActiveTime - @JvmField var _cpuActiveTime = 0L - private var previousCpuActiveTime = 0L - - override val cpuIdleTime: Long - get() = _cpuIdleTime - previousCpuIdleTime - @JvmField var _cpuIdleTime = 0L - private var previousCpuIdleTime = 0L - - override val cpuStealTime: Long - get() = _cpuStealTime - previousCpuStealTime - @JvmField var _cpuStealTime = 0L - private var previousCpuStealTime = 0L - - override val cpuLostTime: Long - get() = _cpuLostTime - previousCpuLostTime - @JvmField var _cpuLostTime = 0L - private var previousCpuLostTime = 0L - - /** - * Finish the aggregation for this cycle. - */ - fun reset() { - previousUptime = _uptime - previousDowntime = _downtime - previousCpuActiveTime = _cpuActiveTime - previousCpuIdleTime = _cpuIdleTime - previousCpuStealTime = _cpuStealTime - previousCpuLostTime = _cpuLostTime - - _host = null - _cpuLimit = 0.0 - } - - /** - * Record the timestamp of a [point] for this aggregator. - */ - fun recordTimestamp(point: PointData) { - _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms - } - } - - private companion object { - private val STATE_KEY = AttributeKey.stringKey("state") - private val RESULT_KEY = AttributeKey.stringKey("result") - } -} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt deleted file mode 100644 index 3ab6c7b2..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.telemetry.compute - -import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.data.* -import io.opentelemetry.sdk.metrics.export.MetricExporter -import mu.KotlinLogging - -/** - * A [MetricExporter] that redirects data to a [ComputeMonitor] implementation. - */ -public abstract class ComputeMetricExporter : MetricExporter, ComputeMonitor { - /** - * The logging instance for this exporter. - */ - private val logger = KotlinLogging.logger {} - - /** - * A [ComputeMetricAggregator] that actually performs the aggregation. - */ - private val agg = ComputeMetricAggregator() - - override fun export(metrics: Collection<MetricData>): CompletableResultCode { - return try { - agg.process(metrics) - agg.collect(this) - - CompletableResultCode.ofSuccess() - } catch (e: Throwable) { - logger.warn(e) { "Failed to export results" } - CompletableResultCode.ofFailure() - } - } - - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() - - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() -} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt deleted file mode 100644 index 41315b15..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.telemetry.compute - -import io.opentelemetry.sdk.metrics.export.MetricProducer -import org.opendc.telemetry.compute.table.ServiceData -import org.opendc.telemetry.compute.table.ServiceTableReader -import org.opendc.telemetry.compute.table.toServiceData - -/** - * Collect the metrics of the compute service. - */ -public fun collectServiceMetrics(metricProducer: MetricProducer): ServiceData { - lateinit var serviceData: ServiceData - val agg = ComputeMetricAggregator() - val monitor = object : ComputeMonitor { - override fun record(reader: ServiceTableReader) { - serviceData = reader.toServiceData() - } - } - - agg.process(metricProducer.collectAllMetrics()) - agg.collect(monitor) - return serviceData -} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt deleted file mode 100644 index 7dca6186..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("HostAttributes") -package org.opendc.telemetry.compute - -import io.opentelemetry.api.common.AttributeKey - -/** - * The identifier of the node hosting virtual machines. - */ -public val HOST_ID: AttributeKey<String> = AttributeKey.stringKey("node.id") - -/** - * The name of the node hosting virtual machines. - */ -public val HOST_NAME: AttributeKey<String> = AttributeKey.stringKey("node.name") - -/** - * The CPU architecture of the host node. - */ -public val HOST_ARCH: AttributeKey<String> = AttributeKey.stringKey("node.arch") - -/** - * The number of CPUs in the host node. - */ -public val HOST_NCPUS: AttributeKey<Long> = AttributeKey.longKey("node.num_cpus") - -/** - * The amount of memory installed in the host node in MiB. - */ -public val HOST_MEM_CAPACITY: AttributeKey<Long> = AttributeKey.longKey("node.mem_capacity") diff --git a/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts b/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts deleted file mode 100644 index 4b3241bc..00000000 --- a/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Telemetry SDK for OpenDC" - -/* Build configuration */ -plugins { - `kotlin-library-conventions` -} - -dependencies { - api(projects.opendcTelemetry.opendcTelemetryApi) - api(libs.kotlinx.coroutines) - api(libs.opentelemetry.sdk.main) - api(libs.opentelemetry.sdk.metrics) - - implementation(libs.kotlin.logging) -} diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt deleted file mode 100644 index cd191652..00000000 --- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.telemetry.sdk - -import io.opentelemetry.sdk.common.Clock - -/** - * An adapter class that bridges a [java.time.Clock] to a [Clock] - */ -public class OtelClockAdapter(private val clock: java.time.Clock) : Clock { - override fun now(): Long = nanoTime() - - override fun nanoTime(): Long = clock.millis() * 1_000_000L -} - -/** - * Convert the specified [java.time.Clock] to a [Clock]. - */ -public fun java.time.Clock.toOtelClock(): Clock = OtelClockAdapter(this) diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt deleted file mode 100644 index ca5da079..00000000 --- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.telemetry.sdk.metrics.export - -import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.data.AggregationTemporality -import io.opentelemetry.sdk.metrics.export.MetricExporter -import io.opentelemetry.sdk.metrics.export.MetricProducer -import io.opentelemetry.sdk.metrics.export.MetricReader -import io.opentelemetry.sdk.metrics.export.MetricReaderFactory -import kotlinx.coroutines.* -import mu.KotlinLogging -import java.time.Duration - -/** - * A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every - * export interval. - * - * The reader runs in a [CoroutineScope] which enables collection of metrics in environments with a custom clock. - * - * @param scope The [CoroutineScope] to run the reader in. - * @param producer The metric producer to gather metrics from. - * @param exporter The export to export the metrics to. - * @param exportInterval The export interval. - */ -public class CoroutineMetricReader private constructor( - scope: CoroutineScope, - private val producer: MetricProducer, - private val exporter: MetricExporter, - private val exportInterval: Duration -) : MetricReader { - private val logger = KotlinLogging.logger {} - - /** - * The background job that is responsible for collecting the metrics every cycle. - */ - private val job = scope.launch { - val intervalMs = exportInterval.toMillis() - - try { - while (isActive) { - delay(intervalMs) - - try { - val metrics = producer.collectAllMetrics() - val result = exporter.export(metrics) - result.whenComplete { - if (!result.isSuccess) { - logger.warn { "Exporter failed" } - } - } - } catch (cause: Throwable) { - logger.warn(cause) { "Exporter threw an Exception" } - } - } - } finally { - exporter.shutdown() - } - } - - override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE - - override fun flush(): CompletableResultCode { - return exporter.flush() - } - - override fun shutdown(): CompletableResultCode { - job.cancel() - return CompletableResultCode.ofSuccess() - } - - public companion object { - /** - * Construct a [MetricReaderFactory] for this metric reader. - */ - public operator fun invoke( - scope: CoroutineScope, - exporter: MetricExporter, - exportInterval: Duration = Duration.ofMinutes(5) - ): MetricReaderFactory { - return MetricReaderFactory { producer -> - CoroutineMetricReader(scope, producer, exporter, exportInterval) - } - } - } -} diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts index 3c80f605..c1e3b976 100644 --- a/opendc-web/opendc-web-runner/build.gradle.kts +++ b/opendc-web/opendc-web-runner/build.gradle.kts @@ -37,8 +37,6 @@ dependencies { implementation(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcCompute.opendcComputeWorkload) implementation(projects.opendcSimulator.opendcSimulatorCore) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(projects.opendcTelemetry.opendcTelemetryCompute) implementation(projects.opendcTrace.opendcTraceApi) implementation(projects.opendcWeb.opendcWebClient) diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index 7c0c43ed..9c9a866d 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -25,7 +25,7 @@ package org.opendc.web.runner import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.NoopTelemetryManager +import org.opendc.compute.workload.telemetry.ComputeMetricReader import org.opendc.compute.workload.topology.HostSpec import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply @@ -36,7 +36,6 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.LinearPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.ComputeMetricReader import org.opendc.web.client.runner.OpenDCRunnerClient import org.opendc.web.proto.runner.Job import org.opendc.web.proto.runner.Scenario @@ -209,7 +208,6 @@ public class OpenDCRunner( val simulator = ComputeServiceHelper( coroutineContext, clock, - NoopTelemetryManager(), computeScheduler, failureModel, interferenceModel.takeIf { phenomena.interference } diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt index 69350d8c..01002c70 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt @@ -22,9 +22,9 @@ package org.opendc.web.runner.internal -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.telemetry.compute.table.ServiceTableReader +import org.opendc.compute.workload.telemetry.ComputeMonitor +import org.opendc.compute.workload.telemetry.table.HostTableReader +import org.opendc.compute.workload.telemetry.table.ServiceTableReader import kotlin.math.max import kotlin.math.roundToLong diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index 60b5eb13..b6365885 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -30,7 +30,6 @@ plugins { dependencies { api(projects.opendcWorkflow.opendcWorkflowApi) api(projects.opendcCompute.opendcComputeApi) - api(projects.opendcTelemetry.opendcTelemetryApi) implementation(projects.opendcCommon) implementation(libs.kotlin.logging) @@ -38,7 +37,6 @@ dependencies { testImplementation(projects.opendcCompute.opendcComputeWorkload) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testImplementation(projects.opendcTrace.opendcTraceApi) - testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) testRuntimeOnly(projects.opendcTrace.opendcTraceGwf) testRuntimeOnly(libs.log4j.slf4j) } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index b8bc0e33..2436c387 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -22,7 +22,6 @@ package org.opendc.workflow.service -import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient import org.opendc.workflow.api.Job import org.opendc.workflow.service.internal.WorkflowServiceImpl @@ -30,7 +29,7 @@ import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy -import org.opendc.workflow.service.telemetry.SchedulerStats +import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats import java.time.Clock import java.time.Duration import kotlin.coroutines.CoroutineContext @@ -63,7 +62,7 @@ public interface WorkflowService : AutoCloseable { * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. * @param meterProvider The meter provider to use. - * @param compute The compute client to use. + * @param compute The "Compute" client to use. * @param schedulingQuantum The scheduling quantum to use (minimum duration between scheduling cycles). * @param jobAdmissionPolicy The job admission policy to use. * @param jobOrderPolicy The job order policy to use. @@ -73,7 +72,6 @@ public interface WorkflowService : AutoCloseable { public operator fun invoke( context: CoroutineContext, clock: Clock, - meterProvider: MeterProvider, compute: ComputeClient, schedulingQuantum: Duration, jobAdmissionPolicy: JobAdmissionPolicy, @@ -84,7 +82,6 @@ public interface WorkflowService : AutoCloseable { return WorkflowServiceImpl( context, clock, - meterProvider, compute, schedulingQuantum, jobAdmissionPolicy, diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 9c7f18a2..899810a2 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -22,8 +22,6 @@ package org.opendc.workflow.service.internal -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import org.opendc.common.util.Pacer import org.opendc.compute.api.* @@ -34,7 +32,7 @@ import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy -import org.opendc.workflow.service.telemetry.SchedulerStats +import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats import java.time.Clock import java.time.Duration import java.util.* @@ -48,9 +46,8 @@ import kotlin.coroutines.resume public class WorkflowServiceImpl( context: CoroutineContext, private val clock: Clock, - meterProvider: MeterProvider, private val computeClient: ComputeClient, - private val schedulingQuantum: Duration, + schedulingQuantum: Duration, jobAdmissionPolicy: JobAdmissionPolicy, jobOrderPolicy: JobOrderPolicy, taskEligibilityPolicy: TaskEligibilityPolicy, @@ -62,11 +59,6 @@ public class WorkflowServiceImpl( private val scope = CoroutineScope(context + Job()) /** - * The [Meter] to collect metrics of this service. - */ - private val meter = meterProvider.get("org.opendc.workflow.service") - - /** * The incoming jobs ready to be processed by the scheduler. */ private val incomingJobs: MutableSet<JobState> = linkedSetOf() @@ -139,58 +131,11 @@ public class WorkflowServiceImpl( } } - /** - * The number of jobs that have been submitted to the service. - */ - private val submittedJobs = meter.counterBuilder("jobs.submitted") - .setDescription("Number of submitted jobs") - .setUnit("1") - .build() private var _workflowsSubmitted: Int = 0 - - /** - * The number of jobs that are running. - */ - private val runningJobs = meter.upDownCounterBuilder("jobs.active") - .setDescription("Number of jobs running") - .setUnit("1") - .build() private var _workflowsRunning: Int = 0 - - /** - * The number of jobs that have finished running. - */ - private val finishedJobs = meter.counterBuilder("jobs.finished") - .setDescription("Number of jobs that finished running") - .setUnit("1") - .build() private var _workflowsFinished: Int = 0 - - /** - * The number of tasks that have been submitted to the service. - */ - private val submittedTasks = meter.counterBuilder("tasks.submitted") - .setDescription("Number of submitted tasks") - .setUnit("1") - .build() private var _tasksSubmitted: Int = 0 - - /** - * The number of jobs that are running. - */ - private val runningTasks = meter.upDownCounterBuilder("tasks.active") - .setDescription("Number of tasks running") - .setUnit("1") - .build() private var _tasksRunning: Int = 0 - - /** - * The number of jobs that have finished running. - */ - private val finishedTasks = meter.counterBuilder("tasks.finished") - .setDescription("Number of tasks that finished running") - .setUnit("1") - .build() private var _tasksFinished: Int = 0 /** @@ -229,14 +174,12 @@ public class WorkflowServiceImpl( instance.state = TaskStatus.READY } - submittedTasks.add(1) _tasksSubmitted++ } instances.values.toCollection(jobInstance.tasks) incomingJobs += jobInstance rootListener.jobSubmitted(jobInstance) - submittedJobs.add(1) _workflowsSubmitted++ pacer.enqueue() @@ -283,7 +226,6 @@ public class WorkflowServiceImpl( jobQueue.add(jobInstance) activeJobs += jobInstance - runningJobs.add(1) _workflowsRunning++ rootListener.jobStarted(jobInstance) } @@ -363,7 +305,6 @@ public class WorkflowServiceImpl( ServerState.RUNNING -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() - runningTasks.add(1) _tasksRunning++ rootListener.taskStarted(task) } @@ -381,8 +322,6 @@ public class WorkflowServiceImpl( job.tasks.remove(task) activeTasks -= task - runningTasks.add(-1) - finishedTasks.add(1) _tasksRunning-- _tasksFinished++ rootListener.taskFinished(task) @@ -410,8 +349,6 @@ public class WorkflowServiceImpl( private fun finishJob(job: JobState) { activeJobs -= job - runningJobs.add(-1) - finishedJobs.add(1) _workflowsRunning-- _workflowsFinished++ rootListener.jobFinished(job) diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt index 7c7d7c4d..608e82df 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.workflow.service.telemetry +package org.opendc.workflow.service.scheduler.telemetry /** * Statistics about the workflow scheduler. diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index d5f06587..73d1b23b 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -32,7 +32,6 @@ import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.VCpuWeigher import org.opendc.compute.workload.ComputeServiceHelper -import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.HostSpec import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider import org.opendc.simulator.compute.model.MachineModel @@ -70,7 +69,12 @@ internal class WorkflowServiceTest { weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) ) - val computeHelper = ComputeServiceHelper(coroutineContext, clock, NoopTelemetryManager(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1)) + val computeHelper = ComputeServiceHelper( + coroutineContext, + clock, + computeScheduler, + schedulingQuantum = Duration.ofSeconds(1) + ) val hostCount = 4 repeat(hostCount) { computeHelper.registerHost(createHostSpec(it)) } diff --git a/opendc-workflow/opendc-workflow-workload/build.gradle.kts b/opendc-workflow/opendc-workflow-workload/build.gradle.kts index b725a69c..17eadf29 100644 --- a/opendc-workflow/opendc-workflow-workload/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-workload/build.gradle.kts @@ -32,6 +32,4 @@ dependencies { implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcTrace.opendcTraceApi) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(libs.opentelemetry.semconv) } diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt index a7d0ed6c..435d0190 100644 --- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt @@ -22,24 +22,13 @@ package org.opendc.workflow.workload -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.data.AggregationTemporality -import io.opentelemetry.sdk.metrics.export.MetricProducer -import io.opentelemetry.sdk.metrics.export.MetricReader -import io.opentelemetry.sdk.metrics.export.MetricReaderFactory -import io.opentelemetry.sdk.resources.Resource -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import org.opendc.compute.api.ComputeClient -import org.opendc.telemetry.sdk.toOtelClock import org.opendc.workflow.api.Job import org.opendc.workflow.service.WorkflowService import java.time.Clock -import java.util.* import kotlin.coroutines.CoroutineContext /** @@ -59,60 +48,16 @@ public class WorkflowServiceHelper( /** * The [WorkflowService] that is constructed by this runner. */ - public val service: WorkflowService - - /** - * The [MetricProducer] exposed by the [WorkflowService]. - */ - public lateinit var metricProducer: MetricProducer - private set - - /** - * The [MeterProvider] used for the service. - */ - private val _meterProvider: SdkMeterProvider - - /** - * The list of [MetricReader]s that have been registered with the runner. - */ - private val _metricReaders = mutableListOf<MetricReader>() - - init { - val resource = Resource.builder() - .put(ResourceAttributes.SERVICE_NAME, "opendc-workflow") - .build() - - _meterProvider = SdkMeterProvider.builder() - .setClock(clock.toOtelClock()) - .setResource(resource) - .registerMetricReader { producer -> - metricProducer = producer - - val metricReaders = _metricReaders - object : MetricReader { - override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE - override fun flush(): CompletableResultCode { - return CompletableResultCode.ofAll(metricReaders.map { it.flush() }) - } - override fun shutdown(): CompletableResultCode { - return CompletableResultCode.ofAll(metricReaders.map { it.shutdown() }) - } - } - } - .build() - - service = WorkflowService( - context, - clock, - _meterProvider, - computeClient, - schedulerSpec.schedulingQuantum, - jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy, - jobOrderPolicy = schedulerSpec.jobOrderPolicy, - taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy, - taskOrderPolicy = schedulerSpec.taskOrderPolicy, - ) - } + public val service: WorkflowService = WorkflowService( + context, + clock, + computeClient, + schedulerSpec.schedulingQuantum, + jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy, + jobOrderPolicy = schedulerSpec.jobOrderPolicy, + taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy, + taskOrderPolicy = schedulerSpec.taskOrderPolicy, + ) /** * Run the specified list of [jobs] using the workflow service and suspend execution until all jobs have @@ -146,19 +91,8 @@ public class WorkflowServiceHelper( } } - /** - * Register a [MetricReader] for this helper. - * - * @param factory The factory for the reader to register. - */ - public fun registerMetricReader(factory: MetricReaderFactory) { - val reader = factory.apply(metricProducer) - _metricReaders.add(reader) - } - override fun close() { computeClient.close() service.close() - _meterProvider.close() } } diff --git a/settings.gradle.kts b/settings.gradle.kts index a779edcc..f651f4c1 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -47,9 +47,6 @@ include(":opendc-simulator:opendc-simulator-flow") include(":opendc-simulator:opendc-simulator-power") include(":opendc-simulator:opendc-simulator-network") include(":opendc-simulator:opendc-simulator-compute") -include(":opendc-telemetry:opendc-telemetry-api") -include(":opendc-telemetry:opendc-telemetry-sdk") -include(":opendc-telemetry:opendc-telemetry-compute") include(":opendc-trace:opendc-trace-api") include(":opendc-trace:opendc-trace-gwf") include(":opendc-trace:opendc-trace-swf") |
