From c7eec7904e08029b3ab31d3e7b21afa1ea9ab7e6 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 4 May 2022 16:24:53 +0200 Subject: refactor(compute/service): Remove OpenTelemetry from "compute" modules This change removes the OpenTelemetry integration from the OpenDC Compute modules. Previously, we chose to integrate OpenTelemetry to provide a unified way to report metrics to the users. Although this worked as expected, the overhead of the OpenTelemetry when collecting metrics during simulation was considerable and lacked more optimization opportunities (other than providing a separate API implementation). Furthermore, since we were tied to OpenTelemetry's SDK implementation, we experienced issues with throttling and registering multiple instruments. We will instead use another approach, where we expose the core metrics in OpenDC via specialized interfaces (see the commits before) such that access is fast and can be done without having to interface with OpenTelemetry. In addition, we will provide an adapter to that is able to forward these metrics to OpenTelemetry implementations, so we can still integrate with the wider ecosystem. --- .../opendc-compute-service/build.gradle.kts | 2 - .../org/opendc/compute/service/ComputeService.kt | 6 +- .../service/driver/telemetry/GuestSystemStats.kt | 2 +- .../service/driver/telemetry/HostSystemStats.kt | 2 +- .../compute/service/internal/ComputeServiceImpl.kt | 88 +--- .../compute/service/internal/InternalServer.kt | 18 - .../opendc/compute/service/ComputeServiceTest.kt | 3 +- .../opendc-compute-simulator/build.gradle.kts | 3 - .../kotlin/org/opendc/compute/simulator/SimHost.kt | 171 +------ .../org/opendc/compute/simulator/internal/Guest.kt | 144 +----- .../org/opendc/compute/simulator/SimHostTest.kt | 3 - .../opendc-compute-workload/build.gradle.kts | 2 - .../compute/workload/ComputeServiceHelper.kt | 8 +- .../export/parquet/ParquetComputeMonitor.kt | 8 +- .../export/parquet/ParquetHostDataWriter.kt | 2 +- .../export/parquet/ParquetServerDataWriter.kt | 2 +- .../export/parquet/ParquetServiceDataWriter.kt | 3 +- .../workload/telemetry/ComputeMetricReader.kt | 424 +++++++++++++++++ .../compute/workload/telemetry/ComputeMonitor.kt | 47 ++ .../workload/telemetry/NoopTelemetryManager.kt | 36 -- .../workload/telemetry/SdkTelemetryManager.kt | 135 ------ .../compute/workload/telemetry/TelemetryManager.kt | 42 -- .../compute/workload/telemetry/table/HostInfo.kt | 28 ++ .../workload/telemetry/table/HostTableReader.kt | 125 +++++ .../compute/workload/telemetry/table/ServerInfo.kt | 37 ++ .../workload/telemetry/table/ServerTableReader.kt | 90 ++++ .../workload/telemetry/table/ServiceData.kt | 46 ++ .../workload/telemetry/table/ServiceTableReader.kt | 70 +++ .../workload/export/parquet/HostDataWriterTest.kt | 4 +- .../export/parquet/ServerDataWriterTest.kt | 6 +- .../export/parquet/ServiceDataWriterTest.kt | 2 +- .../opendc-experiments-capelin/build.gradle.kts | 3 - .../experiments/capelin/CapelinBenchmarks.kt | 6 +- .../src/jmh/resources/topology.txt | 5 + .../org/opendc/experiments/capelin/Portfolio.kt | 5 +- .../experiments/capelin/CapelinIntegrationTest.kt | 11 +- .../opendc-telemetry-compute/build.gradle.kts | 36 -- .../telemetry/compute/ComputeMetricAggregator.kt | 517 --------------------- .../telemetry/compute/ComputeMetricExporter.kt | 59 --- .../telemetry/compute/ComputeMetricReader.kt | 424 ----------------- .../org/opendc/telemetry/compute/ComputeMonitor.kt | 47 -- .../kotlin/org/opendc/telemetry/compute/Helpers.kt | 45 -- .../org/opendc/telemetry/compute/HostAttributes.kt | 51 -- .../org/opendc/telemetry/compute/table/HostInfo.kt | 28 -- .../telemetry/compute/table/HostTableReader.kt | 125 ----- .../opendc/telemetry/compute/table/ServerInfo.kt | 37 -- .../telemetry/compute/table/ServerTableReader.kt | 90 ---- .../opendc/telemetry/compute/table/ServiceData.kt | 46 -- .../telemetry/compute/table/ServiceTableReader.kt | 70 --- opendc-web/opendc-web-runner/build.gradle.kts | 2 - .../kotlin/org/opendc/web/runner/OpenDCRunner.kt | 4 +- .../web/runner/internal/WebComputeMonitor.kt | 6 +- .../opendc/workflow/service/WorkflowServiceTest.kt | 8 +- settings.gradle.kts | 1 - 54 files changed, 917 insertions(+), 2268 deletions(-) create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMonitor.kt delete mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/NoopTelemetryManager.kt delete mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt delete mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/TelemetryManager.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostInfo.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostTableReader.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerInfo.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerTableReader.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceData.kt create mode 100644 opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceTableReader.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/jmh/resources/topology.txt delete mode 100644 opendc-telemetry/opendc-telemetry-compute/build.gradle.kts delete mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt delete mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt delete mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt delete mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt delete mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt delete mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt delete mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt delete mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt delete mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt delete mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt delete mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt delete mode 100644 opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt diff --git a/opendc-compute/opendc-compute-service/build.gradle.kts b/opendc-compute/opendc-compute-service/build.gradle.kts index b42c2919..fd15b6e7 100644 --- a/opendc-compute/opendc-compute-service/build.gradle.kts +++ b/opendc-compute/opendc-compute-service/build.gradle.kts @@ -29,10 +29,8 @@ plugins { dependencies { api(projects.opendcCompute.opendcComputeApi) - api(projects.opendcTelemetry.opendcTelemetryApi) implementation(projects.opendcCommon) implementation(libs.kotlin.logging) - implementation(libs.opentelemetry.semconv) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testRuntimeOnly(libs.log4j.slf4j) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt index 3a6baaa1..c0b70268 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/ComputeService.kt @@ -22,8 +22,6 @@ package org.opendc.compute.service -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient import org.opendc.compute.api.Server import org.opendc.compute.service.driver.Host @@ -79,18 +77,16 @@ public interface ComputeService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. - * @param meterProvider The [MeterProvider] for creating a [Meter] for the service. * @param scheduler The scheduler implementation to use. * @param schedulingQuantum The interval between scheduling cycles. */ public operator fun invoke( context: CoroutineContext, clock: Clock, - meterProvider: MeterProvider, scheduler: ComputeScheduler, schedulingQuantum: Duration = Duration.ofMinutes(5), ): ComputeService { - return ComputeServiceImpl(context, clock, meterProvider, scheduler, schedulingQuantum) + return ComputeServiceImpl(context, clock, scheduler, schedulingQuantum) } } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt index b3958473..6fec5175 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/GuestSystemStats.kt @@ -35,5 +35,5 @@ import java.time.Instant public data class GuestSystemStats( val uptime: Duration, val downtime: Duration, - val bootTime: Instant + val bootTime: Instant? ) diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt index 1c07023f..9d34a5ce 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/driver/telemetry/HostSystemStats.kt @@ -41,7 +41,7 @@ import java.time.Instant public data class HostSystemStats( val uptime: Duration, val downtime: Duration, - val bootTime: Instant, + val bootTime: Instant?, val powerUsage: Double, val energyUsage: Double, val guestsTerminated: Int, diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index e8664e5c..21aaa19e 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -22,11 +22,6 @@ package org.opendc.compute.service.internal -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.common.util.Pacer @@ -49,14 +44,12 @@ import kotlin.math.max * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. - * @param meterProvider The [MeterProvider] for creating a [Meter] for the service. * @param scheduler The scheduler implementation to use. * @param schedulingQuantum The interval between scheduling cycles. */ internal class ComputeServiceImpl( private val context: CoroutineContext, private val clock: Clock, - meterProvider: MeterProvider, private val scheduler: ComputeScheduler, schedulingQuantum: Duration ) : ComputeService, HostListener { @@ -70,11 +63,6 @@ internal class ComputeServiceImpl( */ private val logger = KotlinLogging.logger {} - /** - * The [Meter] to track metrics of the [ComputeService]. - */ - private val meter = meterProvider.get("org.opendc.compute.service") - /** * The [Random] instance used to generate unique identifiers for the objects. */ @@ -117,72 +105,20 @@ internal class ComputeServiceImpl( private var maxCores = 0 private var maxMemory = 0L - - /** - * The number of scheduling attempts. - */ - private val _schedulingAttempts = meter.counterBuilder("scheduler.attempts") - .setDescription("Number of scheduling attempts") - .setUnit("1") - .build() - private val _schedulingAttemptsSuccessAttr = Attributes.of(AttributeKey.stringKey("result"), "success") - private val _schedulingAttemptsFailureAttr = Attributes.of(AttributeKey.stringKey("result"), "failure") - private val _schedulingAttemptsErrorAttr = Attributes.of(AttributeKey.stringKey("result"), "error") private var _attemptsSuccess = 0L private var _attemptsFailure = 0L private var _attemptsError = 0L - - /** - * The response time of the service. - */ - private val _schedulingLatency = meter.histogramBuilder("scheduler.latency") - .setDescription("End to end latency for a server to be scheduled (in multiple attempts)") - .ofLongs() - .setUnit("ms") - .build() - - /** - * The number of servers that are pending. - */ - private val _servers = meter.upDownCounterBuilder("scheduler.servers") - .setDescription("Number of servers managed by the scheduler") - .setUnit("1") - .build() - private val _serversPendingAttr = Attributes.of(AttributeKey.stringKey("state"), "pending") - private val _serversActiveAttr = Attributes.of(AttributeKey.stringKey("state"), "active") private var _serversPending = 0 private var _serversActive = 0 /** * The [Pacer] to use for scheduling the scheduler cycles. */ - private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis(), ::doSchedule) + private val pacer = Pacer(scope.coroutineContext, clock, schedulingQuantum.toMillis()) { doSchedule() } override val hosts: Set get() = hostToView.keys - init { - val upState = Attributes.of(AttributeKey.stringKey("state"), "up") - val downState = Attributes.of(AttributeKey.stringKey("state"), "down") - - meter.upDownCounterBuilder("scheduler.hosts") - .setDescription("Number of hosts registered with the scheduler") - .setUnit("1") - .buildWithCallback { result -> - val total = hosts.size - val available = availableHosts.size.toLong() - - result.record(available, upState) - result.record(total - available, downState) - } - - meter.gaugeBuilder("system.time.provision") - .setDescription("The most recent timestamp where the server entered a provisioned state") - .setUnit("1") - .ofLongs() - .buildWithCallback(::collectProvisionTime) - } - override fun newClient(): ComputeClient { check(scope.isActive) { "Service is already closed" } return object : ComputeClient { @@ -355,7 +291,6 @@ internal class ComputeServiceImpl( server.launchedAt = Instant.ofEpochMilli(now) queue.add(request) _serversPending++ - _servers.add(1, _serversPendingAttr) requestSchedulingCycle() return request } @@ -387,14 +322,13 @@ internal class ComputeServiceImpl( /** * Run a single scheduling iteration. */ - private fun doSchedule(now: Long) { + private fun doSchedule() { while (queue.isNotEmpty()) { val request = queue.peek() if (request.isCancelled) { queue.poll() _serversPending-- - _servers.add(-1, _serversPendingAttr) continue } @@ -407,9 +341,7 @@ internal class ComputeServiceImpl( // Remove the incoming image queue.poll() _serversPending-- - _servers.add(-1, _serversPendingAttr) _attemptsFailure++ - _schedulingAttempts.add(1, _schedulingAttemptsFailureAttr) logger.warn { "Failed to spawn $server: does not fit [${clock.instant()}]" } @@ -425,8 +357,6 @@ internal class ComputeServiceImpl( // Remove request from queue queue.poll() _serversPending-- - _servers.add(-1, _serversPendingAttr) - _schedulingLatency.record(now - request.submitTime, server.attributes) logger.info { "Assigned server $server to host $host." } @@ -442,10 +372,8 @@ internal class ComputeServiceImpl( host.spawn(server) activeServers[server] = host - _servers.add(1, _serversActiveAttr) _serversActive++ _attemptsSuccess++ - _schedulingAttempts.add(1, _schedulingAttemptsSuccessAttr) } catch (e: Throwable) { logger.error(e) { "Failed to deploy VM" } @@ -454,7 +382,6 @@ internal class ComputeServiceImpl( hv.availableMemory += server.flavor.memorySize _attemptsError++ - _schedulingAttempts.add(1, _schedulingAttemptsErrorAttr) } } } @@ -511,7 +438,6 @@ internal class ComputeServiceImpl( if (activeServers.remove(server) != null) { _serversActive-- - _servers.add(-1, _serversActiveAttr) } val hv = hostToView[host] @@ -527,14 +453,4 @@ internal class ComputeServiceImpl( requestSchedulingCycle() } } - - /** - * Collect the timestamp when each server entered its provisioning state most recently. - */ - private fun collectProvisionTime(result: ObservableLongMeasurement) { - for ((_, server) in servers) { - val launchedAt = server.launchedAt ?: continue - result.record(launchedAt.toEpochMilli(), server.attributes) - } - } } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt index d2a2d896..f9da24d8 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/InternalServer.kt @@ -22,9 +22,6 @@ package org.opendc.compute.service.internal -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.driver.Host @@ -53,21 +50,6 @@ internal class InternalServer( */ private val watchers = mutableListOf() - /** - * The attributes of a server. - */ - @JvmField internal val attributes: Attributes = Attributes.builder() - .put(ResourceAttributes.HOST_NAME, name) - .put(ResourceAttributes.HOST_ID, uid.toString()) - .put(ResourceAttributes.HOST_TYPE, flavor.name) - .put(AttributeKey.longKey("host.num_cpus"), flavor.cpuCount.toLong()) - .put(AttributeKey.longKey("host.mem_capacity"), flavor.memorySize) - .put(AttributeKey.stringArrayKey("host.labels"), labels.map { (k, v) -> "$k:$v" }) - .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) - .put(ResourceAttributes.HOST_IMAGE_NAME, image.name) - .put(ResourceAttributes.HOST_IMAGE_ID, image.uid.toString()) - .build() - /** * The [Host] that has been assigned to host the server. */ diff --git a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt index eb106817..cc7be4a8 100644 --- a/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt +++ b/opendc-compute/opendc-compute-service/src/test/kotlin/org/opendc/compute/service/ComputeServiceTest.kt @@ -23,7 +23,6 @@ package org.opendc.compute.service import io.mockk.* -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.delay import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNull @@ -59,7 +58,7 @@ internal class ComputeServiceTest { filters = listOf(ComputeFilter(), VCpuFilter(allocationRatio = 1.0), RamFilter(allocationRatio = 1.0)), weighers = listOf(RamWeigher()) ) - service = ComputeService(scope.coroutineContext, clock, MeterProvider.noop(), computeScheduler) + service = ComputeService(scope.coroutineContext, clock, computeScheduler) } @Test diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts index e81d87ec..72962147 100644 --- a/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -32,11 +32,8 @@ dependencies { api(projects.opendcSimulator.opendcSimulatorCompute) api(libs.commons.math3) implementation(projects.opendcCommon) - implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) testImplementation(projects.opendcSimulator.opendcSimulatorCore) - testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) - testImplementation(projects.opendcTelemetry.opendcTelemetryCompute) testRuntimeOnly(libs.slf4j.simple) } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 323ae4fe..c28239b4 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -22,12 +22,6 @@ package org.opendc.compute.simulator -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.api.metrics.ObservableDoubleMeasurement -import io.opentelemetry.api.metrics.ObservableLongMeasurement import kotlinx.coroutines.* import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server @@ -67,7 +61,6 @@ public class SimHost( override val meta: Map, context: CoroutineContext, engine: FlowEngine, - meterProvider: MeterProvider, hypervisorProvider: SimHypervisorProvider, scalingGovernor: ScalingGovernor = PerformanceScalingGovernor(), powerDriver: PowerDriver = SimplePowerDriver(ConstantPowerModel(0.0)), @@ -85,11 +78,6 @@ public class SimHost( */ private val clock = engine.clock - /** - * The [Meter] to track metrics of the simulated host. - */ - private val meter = meterProvider.get("org.opendc.compute.simulator") - /** * The event listeners registered with this host. */ @@ -142,48 +130,6 @@ public class SimHost( init { launch() - - meter.upDownCounterBuilder("system.guests") - .setDescription("Number of guests on this host") - .setUnit("1") - .buildWithCallback(::collectGuests) - meter.gaugeBuilder("system.cpu.limit") - .setDescription("Amount of CPU resources available to the host") - .buildWithCallback(::collectCpuLimit) - meter.gaugeBuilder("system.cpu.demand") - .setDescription("Amount of CPU resources the guests would use if there were no CPU contention or CPU limits") - .setUnit("MHz") - .buildWithCallback { result -> result.record(hypervisor.cpuDemand) } - meter.gaugeBuilder("system.cpu.usage") - .setDescription("Amount of CPU resources used by the host") - .setUnit("MHz") - .buildWithCallback { result -> result.record(hypervisor.cpuUsage) } - meter.gaugeBuilder("system.cpu.utilization") - .setDescription("Utilization of the CPU resources of the host") - .setUnit("%") - .buildWithCallback { result -> result.record(hypervisor.cpuUsage / _cpuLimit) } - meter.counterBuilder("system.cpu.time") - .setDescription("Amount of CPU time spent by the host") - .setUnit("s") - .buildWithCallback(::collectCpuTime) - meter.gaugeBuilder("system.power.usage") - .setDescription("Power usage of the host ") - .setUnit("W") - .buildWithCallback { result -> result.record(machine.powerUsage) } - meter.counterBuilder("system.power.total") - .setDescription("Amount of energy used by the CPU") - .setUnit("J") - .ofDoubles() - .buildWithCallback { result -> result.record(machine.energyUsage) } - meter.counterBuilder("system.time") - .setDescription("The uptime of the host") - .setUnit("s") - .buildWithCallback(::collectUptime) - meter.gaugeBuilder("system.time.boot") - .setDescription("The boot time of the host") - .setUnit("1") - .ofLongs() - .buildWithCallback(::collectBootTime) } override fun canFit(server: Server): Boolean { @@ -278,7 +224,7 @@ public class SimHost( return HostSystemStats( Duration.ofMillis(_uptime), Duration.ofMillis(_downtime), - Instant.ofEpochMilli(_bootTime), + _bootTime, machine.powerUsage, machine.energyUsage, terminated, @@ -358,7 +304,7 @@ public class SimHost( _ctx = machine.startWorkload(object : SimWorkload { override fun onStart(ctx: SimMachineContext) { try { - _bootTime = clock.millis() + _bootTime = clock.instant() _state = HostState.UP hypervisor.onStart(ctx) } catch (cause: Throwable) { @@ -422,80 +368,11 @@ public class SimHost( return MachineModel(processingUnits, memoryUnits) } - private val STATE_KEY = AttributeKey.stringKey("state") - - private val terminatedState = Attributes.of(STATE_KEY, "terminated") - private val runningState = Attributes.of(STATE_KEY, "running") - private val errorState = Attributes.of(STATE_KEY, "error") - private val invalidState = Attributes.of(STATE_KEY, "invalid") - - /** - * Helper function to collect the guest counts on this host. - */ - private fun collectGuests(result: ObservableLongMeasurement) { - var terminated = 0L - var running = 0L - var error = 0L - var invalid = 0L - - val guests = _guests.listIterator() - for (guest in guests) { - when (guest.state) { - ServerState.TERMINATED -> terminated++ - ServerState.RUNNING -> running++ - ServerState.ERROR -> error++ - ServerState.DELETED -> { - // Remove guests that have been deleted - this.guests.remove(guest.server) - guests.remove() - } - else -> invalid++ - } - } - - result.record(terminated, terminatedState) - result.record(running, runningState) - result.record(error, errorState) - result.record(invalid, invalidState) - } - - private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } - - /** - * Helper function to collect the CPU limits of a machine. - */ - private fun collectCpuLimit(result: ObservableDoubleMeasurement) { - result.record(_cpuLimit) - - val guests = _guests - for (i in guests.indices) { - guests[i].collectCpuLimit(result) - } - } - - private val _activeState = Attributes.of(STATE_KEY, "active") - private val _stealState = Attributes.of(STATE_KEY, "steal") - private val _lostState = Attributes.of(STATE_KEY, "lost") - private val _idleState = Attributes.of(STATE_KEY, "idle") - - /** - * Helper function to track the CPU time of a machine. - */ - private fun collectCpuTime(result: ObservableLongMeasurement) { - val stats = getCpuStats() - - result.record(stats.activeTime, _activeState) - result.record(stats.idleTime, _idleState) - result.record(stats.stealTime, _stealState) - result.record(stats.lostTime, _lostState) - - val guests = _guests - for (i in guests.indices) { - guests[i].collectCpuTime(result) - } - } - private var _lastReport = clock.millis() + private var _uptime = 0L + private var _downtime = 0L + private var _bootTime: Instant? = null + private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } /** * Helper function to track the uptime of a machine. @@ -517,40 +394,4 @@ public class SimHost( guests[i].updateUptime() } } - - private var _uptime = 0L - private var _downtime = 0L - private val _upState = Attributes.of(STATE_KEY, "up") - private val _downState = Attributes.of(STATE_KEY, "down") - - /** - * Helper function to track the uptime of a machine. - */ - private fun collectUptime(result: ObservableLongMeasurement) { - updateUptime() - - result.record(_uptime, _upState) - result.record(_downtime, _downState) - - val guests = _guests - for (i in guests.indices) { - guests[i].collectUptime(result) - } - } - - private var _bootTime = Long.MIN_VALUE - - /** - * Helper function to track the boot time of a machine. - */ - private fun collectBootTime(result: ObservableLongMeasurement) { - if (_bootTime != Long.MIN_VALUE) { - result.record(_bootTime) - } - - val guests = _guests - for (i in guests.indices) { - guests[i].collectBootTime(result) - } - } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 0d4c550d..ea3c6549 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -22,12 +22,6 @@ package org.opendc.compute.simulator.internal -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes -import io.opentelemetry.api.common.AttributesBuilder -import io.opentelemetry.api.metrics.ObservableDoubleMeasurement -import io.opentelemetry.api.metrics.ObservableLongMeasurement -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.Server @@ -76,11 +70,6 @@ internal class Guest( */ var state: ServerState = ServerState.TERMINATED - /** - * The attributes of the guest. - */ - val attributes: Attributes = GuestAttributes(this) - /** * Start the guest. */ @@ -158,7 +147,7 @@ internal class Guest( return GuestSystemStats( Duration.ofMillis(_uptime), Duration.ofMillis(_downtime), - Instant.ofEpochMilli(_bootTime) + _bootTime ) } @@ -235,7 +224,7 @@ internal class Guest( * This method is invoked when the guest was started on the host and has booted into a running state. */ private fun onStart() { - _bootTime = clock.millis() + _bootTime = clock.instant() state = ServerState.RUNNING listener.onStart(this) } @@ -250,18 +239,11 @@ internal class Guest( listener.onStop(this) } - private val STATE_KEY = AttributeKey.stringKey("state") - private var _uptime = 0L private var _downtime = 0L - private val _upState = attributes.toBuilder() - .put(STATE_KEY, "up") - .build() - private val _downState = attributes.toBuilder() - .put(STATE_KEY, "down") - .build() - private var _lastReport = clock.millis() + private var _bootTime: Instant? = null + private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } /** * Helper function to track the uptime and downtime of the guest. @@ -277,122 +259,4 @@ internal class Guest( _downtime += duration } } - - /** - * Helper function to track the uptime of the guest. - */ - fun collectUptime(result: ObservableLongMeasurement) { - updateUptime() - - result.record(_uptime, _upState) - result.record(_downtime, _downState) - } - - private var _bootTime = Long.MIN_VALUE - - /** - * Helper function to track the boot time of the guest. - */ - fun collectBootTime(result: ObservableLongMeasurement) { - if (_bootTime != Long.MIN_VALUE) { - result.record(_bootTime, attributes) - } - } - - private val _activeState = attributes.toBuilder() - .put(STATE_KEY, "active") - .build() - private val _stealState = attributes.toBuilder() - .put(STATE_KEY, "steal") - .build() - private val _lostState = attributes.toBuilder() - .put(STATE_KEY, "lost") - .build() - private val _idleState = attributes.toBuilder() - .put(STATE_KEY, "idle") - .build() - - /** - * Helper function to track the CPU time of a machine. - */ - fun collectCpuTime(result: ObservableLongMeasurement) { - val counters = machine.counters - counters.flush() - - result.record(counters.cpuActiveTime / 1000, _activeState) - result.record(counters.cpuIdleTime / 1000, _idleState) - result.record(counters.cpuStealTime / 1000, _stealState) - result.record(counters.cpuLostTime / 1000, _lostState) - } - - private val _cpuLimit = machine.model.cpus.sumOf { it.frequency } - - /** - * Helper function to collect the CPU limits of a machine. - */ - fun collectCpuLimit(result: ObservableDoubleMeasurement) { - result.record(_cpuLimit, attributes) - } - - /** - * An optimized [Attributes] implementation. - */ - private class GuestAttributes(private val uid: String, private val attributes: Attributes) : Attributes by attributes { - /** - * Construct a [GuestAttributes] instance from a [Guest]. - */ - constructor(guest: Guest) : this( - guest.server.uid.toString(), - Attributes.builder() - .put(ResourceAttributes.HOST_NAME, guest.server.name) - .put(ResourceAttributes.HOST_ID, guest.server.uid.toString()) - .put(ResourceAttributes.HOST_TYPE, guest.server.flavor.name) - .put(AttributeKey.longKey("host.num_cpus"), guest.server.flavor.cpuCount.toLong()) - .put(AttributeKey.longKey("host.mem_capacity"), guest.server.flavor.memorySize) - .put(AttributeKey.stringArrayKey("host.labels"), guest.server.labels.map { (k, v) -> "$k:$v" }) - .put(ResourceAttributes.HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) - .put(ResourceAttributes.HOST_IMAGE_NAME, guest.server.image.name) - .put(ResourceAttributes.HOST_IMAGE_ID, guest.server.image.uid.toString()) - .build() - ) - - override fun get(key: AttributeKey): T? { - // Optimize access to the HOST_ID key which is accessed quite often - if (key == ResourceAttributes.HOST_ID) { - @Suppress("UNCHECKED_CAST") - return uid as T? - } - return attributes.get(key) - } - - override fun toBuilder(): AttributesBuilder { - val delegate = attributes.toBuilder() - return object : AttributesBuilder { - - override fun putAll(attributes: Attributes): AttributesBuilder { - delegate.putAll(attributes) - return this - } - - override fun put(key: AttributeKey, value: Int): AttributesBuilder { - delegate.put(key, value) - return this - } - - override fun put(key: AttributeKey, value: T): AttributesBuilder { - delegate.put(key, value) - return this - } - - override fun build(): Attributes = GuestAttributes(uid, delegate.build()) - } - } - - override fun equals(other: Any?): Boolean = attributes == other - - // Cache hash code - private val _hash = attributes.hashCode() - - override fun hashCode(): Int = _hash - } } diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index fd54ad1d..5ba4a667 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -22,7 +22,6 @@ package org.opendc.compute.simulator -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach @@ -75,7 +74,6 @@ internal class SimHostTest { meta = emptyMap(), coroutineContext, engine, - MeterProvider.noop(), SimFairShareHypervisorProvider() ) val vmImageA = MockImage( @@ -158,7 +156,6 @@ internal class SimHostTest { meta = emptyMap(), coroutineContext, engine, - MeterProvider.noop(), SimFairShareHypervisorProvider() ) val image = MockImage( diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts index 319b2ae3..e8a7c9fd 100644 --- a/opendc-compute/opendc-compute-workload/build.gradle.kts +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -34,8 +34,6 @@ dependencies { implementation(projects.opendcTrace.opendcTraceParquet) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(projects.opendcTelemetry.opendcTelemetryCompute) implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt index 21cfdad2..fddb4890 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeServiceHelper.kt @@ -30,7 +30,6 @@ import org.opendc.compute.api.Server import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeScheduler import org.opendc.compute.simulator.SimHost -import org.opendc.compute.workload.telemetry.TelemetryManager import org.opendc.compute.workload.topology.HostSpec import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.workload.SimTraceWorkload @@ -46,7 +45,6 @@ import kotlin.math.max * * @param context [CoroutineContext] to run the simulation in. * @param clock [Clock] instance tracking simulation time. - * @param telemetry Helper class for managing telemetry. * @param scheduler [ComputeScheduler] implementation to use for the service. * @param failureModel A failure model to use for injecting failures. * @param interferenceModel The model to use for performance interference. @@ -55,7 +53,6 @@ import kotlin.math.max public class ComputeServiceHelper( private val context: CoroutineContext, private val clock: Clock, - private val telemetry: TelemetryManager, scheduler: ComputeScheduler, private val failureModel: FailureModel? = null, private val interferenceModel: VmInterferenceModel? = null, @@ -167,7 +164,6 @@ public class ComputeServiceHelper( * @return The [SimHost] that has been constructed by the runner. */ public fun registerHost(spec: HostSpec, optimize: Boolean = false): SimHost { - val meterProvider = telemetry.createMeterProvider(spec) val host = SimHost( spec.uid, spec.name, @@ -175,7 +171,6 @@ public class ComputeServiceHelper( spec.meta, context, _engine, - meterProvider, spec.hypervisor, powerDriver = spec.powerDriver, interferenceDomain = interferenceModel?.newDomain(), @@ -202,7 +197,6 @@ public class ComputeServiceHelper( * Construct a [ComputeService] instance. */ private fun createService(scheduler: ComputeScheduler, schedulingQuantum: Duration): ComputeService { - val meterProvider = telemetry.createMeterProvider(scheduler) - return ComputeService(context, clock, meterProvider, scheduler, schedulingQuantum) + return ComputeService(context, clock, scheduler, schedulingQuantum) } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt index 6c515118..af4dad44 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetComputeMonitor.kt @@ -22,10 +22,10 @@ package org.opendc.compute.workload.export.parquet -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.telemetry.compute.table.ServerTableReader -import org.opendc.telemetry.compute.table.ServiceTableReader +import org.opendc.compute.workload.telemetry.ComputeMonitor +import org.opendc.compute.workload.telemetry.table.HostTableReader +import org.opendc.compute.workload.telemetry.table.ServerTableReader +import org.opendc.compute.workload.telemetry.table.ServiceTableReader import java.io.File /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt index 0d5b6b34..e6e7e42d 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt @@ -27,7 +27,7 @@ import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.* -import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.compute.workload.telemetry.table.HostTableReader import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File import java.util.* diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt index 5d11629b..082c7c88 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt @@ -27,7 +27,7 @@ import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.* -import org.opendc.telemetry.compute.table.ServerTableReader +import org.opendc.compute.workload.telemetry.table.ServerTableReader import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File import java.util.* diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt index 5ad3b95e..2a0fdca1 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt @@ -22,12 +22,11 @@ package org.opendc.compute.workload.export.parquet -import io.opentelemetry.context.ContextKey.named import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.* -import org.opendc.telemetry.compute.table.ServiceTableReader +import org.opendc.compute.workload.telemetry.table.ServiceTableReader import java.io.File /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt new file mode 100644 index 00000000..45bd9ab1 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMetricReader.kt @@ -0,0 +1,424 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import mu.KotlinLogging +import org.opendc.compute.api.Server +import org.opendc.compute.service.ComputeService +import org.opendc.compute.service.driver.Host +import org.opendc.compute.workload.telemetry.table.* +import java.time.Clock +import java.time.Duration +import java.time.Instant + +/** + * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every + * export interval. + * + * @param scope The [CoroutineScope] to run the reader in. + * @param clock The virtual clock. + * @param service The [ComputeService] to monitor. + * @param servers The [Server]s to monitor. + * @param monitor The monitor to export the metrics to. + * @param exportInterval The export interval. + */ +public class ComputeMetricReader( + scope: CoroutineScope, + clock: Clock, + private val service: ComputeService, + private val servers: List, + private val monitor: ComputeMonitor, + private val exportInterval: Duration = Duration.ofMinutes(5) +) : AutoCloseable { + private val logger = KotlinLogging.logger {} + + /** + * Aggregator for service metrics. + */ + private val serviceTableReader = ServiceTableReaderImpl(service) + + /** + * Mapping from [Host] instances to [HostTableReaderImpl] + */ + private val hostTableReaders = mutableMapOf() + + /** + * Mapping from [Server] instances to [ServerTableReaderImpl] + */ + private val serverTableReaders = mutableMapOf() + + /** + * The background job that is responsible for collecting the metrics every cycle. + */ + private val job = scope.launch { + val intervalMs = exportInterval.toMillis() + + try { + while (isActive) { + delay(intervalMs) + + try { + val now = clock.instant() + + for (host in service.hosts) { + val reader = hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) } + reader.record(now) + monitor.record(reader) + reader.reset() + } + + for (server in servers) { + val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) } + reader.record(now) + monitor.record(reader) + reader.reset() + } + + serviceTableReader.record(now) + monitor.record(serviceTableReader) + } catch (cause: Throwable) { + logger.warn(cause) { "Exporter threw an Exception" } + } + } + } finally { + if (monitor is AutoCloseable) { + monitor.close() + } + } + } + + override fun close() { + job.cancel() + } + + /** + * An aggregator for service metrics before they are reported. + */ + private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader { + private var _timestamp: Instant = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + override val hostsUp: Int + get() = _hostsUp + private var _hostsUp = 0 + + override val hostsDown: Int + get() = _hostsDown + private var _hostsDown = 0 + + override val serversPending: Int + get() = _serversPending + private var _serversPending = 0 + + override val serversActive: Int + get() = _serversActive + private var _serversActive = 0 + + override val attemptsSuccess: Int + get() = _attemptsSuccess + private var _attemptsSuccess = 0 + + override val attemptsFailure: Int + get() = _attemptsFailure + private var _attemptsFailure = 0 + + override val attemptsError: Int + get() = _attemptsError + private var _attemptsError = 0 + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + _timestamp = now + + val stats = service.getSchedulerStats() + _hostsUp = stats.hostsAvailable + _hostsDown = stats.hostsUnavailable + _serversPending = stats.serversPending + _serversActive = stats.serversActive + _attemptsSuccess = stats.attemptsSuccess.toInt() + _attemptsFailure = stats.attemptsFailure.toInt() + _attemptsError = stats.attemptsError.toInt() + } + } + + /** + * An aggregator for host metrics before they are reported. + */ + private class HostTableReaderImpl(host: Host) : HostTableReader { + private val _host = host + + override val host: HostInfo = HostInfo(host.uid.toString(), host.name, "x86", host.model.cpuCount, host.model.memoryCapacity) + + override val timestamp: Instant + get() = _timestamp + private var _timestamp = Instant.MIN + + override val guestsTerminated: Int + get() = _guestsTerminated + private var _guestsTerminated = 0 + + override val guestsRunning: Int + get() = _guestsRunning + private var _guestsRunning = 0 + + override val guestsError: Int + get() = _guestsError + private var _guestsError = 0 + + override val guestsInvalid: Int + get() = _guestsInvalid + private var _guestsInvalid = 0 + + override val cpuLimit: Double + get() = _cpuLimit + private var _cpuLimit = 0.0 + + override val cpuUsage: Double + get() = _cpuUsage + private var _cpuUsage = 0.0 + + override val cpuDemand: Double + get() = _cpuDemand + private var _cpuDemand = 0.0 + + override val cpuUtilization: Double + get() = _cpuUtilization + private var _cpuUtilization = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + override val powerUsage: Double + get() = _powerUsage + private var _powerUsage = 0.0 + + override val powerTotal: Double + get() = _powerTotal - previousPowerTotal + private var _powerTotal = 0.0 + private var previousPowerTotal = 0.0 + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime = 0L + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime = 0L + private var previousDowntime = 0L + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + val hostCpuStats = _host.getCpuStats() + val hostSysStats = _host.getSystemStats() + + _timestamp = now + _guestsTerminated = hostSysStats.guestsTerminated + _guestsRunning = hostSysStats.guestsRunning + _guestsError = hostSysStats.guestsError + _guestsInvalid = hostSysStats.guestsInvalid + _cpuLimit = hostCpuStats.capacity + _cpuDemand = hostCpuStats.demand + _cpuUsage = hostCpuStats.usage + _cpuUtilization = hostCpuStats.utilization + _cpuActiveTime = hostCpuStats.activeTime + _cpuIdleTime = hostCpuStats.idleTime + _cpuStealTime = hostCpuStats.stealTime + _cpuLostTime = hostCpuStats.lostTime + _powerUsage = hostSysStats.powerUsage + _powerTotal = hostSysStats.energyUsage + _uptime = hostSysStats.uptime.toMillis() + _downtime = hostSysStats.downtime.toMillis() + _bootTime = hostSysStats.bootTime + } + + /** + * Finish the aggregation for this cycle. + */ + fun reset() { + // Reset intermediate state for next aggregation + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + previousPowerTotal = _powerTotal + previousUptime = _uptime + previousDowntime = _downtime + + _guestsTerminated = 0 + _guestsRunning = 0 + _guestsError = 0 + _guestsInvalid = 0 + + _cpuLimit = 0.0 + _cpuUsage = 0.0 + _cpuDemand = 0.0 + _cpuUtilization = 0.0 + + _powerUsage = 0.0 + } + } + + /** + * An aggregator for server metrics before they are reported. + */ + private class ServerTableReaderImpl(private val service: ComputeService, server: Server) : ServerTableReader { + private val _server = server + + /** + * The static information about this server. + */ + override val server = ServerInfo( + server.uid.toString(), + server.name, + "vm", + "x86", + server.image.uid.toString(), + server.image.name, + server.flavor.cpuCount, + server.flavor.memorySize + ) + + /** + * The [HostInfo] of the host on which the server is hosted. + */ + override var host: HostInfo? = null + private var _host: Host? = null + + private var _timestamp = Instant.MIN + override val timestamp: Instant + get() = _timestamp + + override val uptime: Long + get() = _uptime - previousUptime + private var _uptime: Long = 0 + private var previousUptime = 0L + + override val downtime: Long + get() = _downtime - previousDowntime + private var _downtime: Long = 0 + private var previousDowntime = 0L + + override val provisionTime: Instant? + get() = _provisionTime + private var _provisionTime: Instant? = null + + override val bootTime: Instant? + get() = _bootTime + private var _bootTime: Instant? = null + + override val cpuLimit: Double + get() = _cpuLimit + private var _cpuLimit = 0.0 + + override val cpuActiveTime: Long + get() = _cpuActiveTime - previousCpuActiveTime + private var _cpuActiveTime = 0L + private var previousCpuActiveTime = 0L + + override val cpuIdleTime: Long + get() = _cpuIdleTime - previousCpuIdleTime + private var _cpuIdleTime = 0L + private var previousCpuIdleTime = 0L + + override val cpuStealTime: Long + get() = _cpuStealTime - previousCpuStealTime + private var _cpuStealTime = 0L + private var previousCpuStealTime = 0L + + override val cpuLostTime: Long + get() = _cpuLostTime - previousCpuLostTime + private var _cpuLostTime = 0L + private var previousCpuLostTime = 0L + + /** + * Record the next cycle. + */ + fun record(now: Instant) { + val newHost = service.lookupHost(_server) + if (newHost != null && newHost.uid != _host?.uid) { + _host = newHost + host = HostInfo(newHost.uid.toString(), newHost.name, "x86", newHost.model.cpuCount, newHost.model.memoryCapacity) + } + + val cpuStats = _host?.getCpuStats(_server) + val sysStats = _host?.getSystemStats(_server) + + _timestamp = now + _cpuLimit = cpuStats?.capacity ?: 0.0 + _cpuActiveTime = cpuStats?.activeTime ?: 0 + _cpuIdleTime = cpuStats?.idleTime ?: 0 + _cpuStealTime = cpuStats?.stealTime ?: 0 + _cpuLostTime = cpuStats?.lostTime ?: 0 + _uptime = sysStats?.uptime?.toMillis() ?: 0 + _downtime = sysStats?.downtime?.toMillis() ?: 0 + _provisionTime = _server.launchedAt + _bootTime = sysStats?.bootTime + } + + /** + * Finish the aggregation for this cycle. + */ + fun reset() { + previousUptime = _uptime + previousDowntime = _downtime + previousCpuActiveTime = _cpuActiveTime + previousCpuIdleTime = _cpuIdleTime + previousCpuStealTime = _cpuStealTime + previousCpuLostTime = _cpuLostTime + + _host = null + _cpuLimit = 0.0 + } + } +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMonitor.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMonitor.kt new file mode 100644 index 00000000..36a2079a --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/ComputeMonitor.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry + +import org.opendc.compute.workload.telemetry.table.HostTableReader +import org.opendc.compute.workload.telemetry.table.ServerTableReader +import org.opendc.compute.workload.telemetry.table.ServiceTableReader + +/** + * A monitor that tracks the metrics and events of the OpenDC Compute service. + */ +public interface ComputeMonitor { + /** + * Record an entry with the specified [reader]. + */ + public fun record(reader: ServerTableReader) {} + + /** + * Record an entry with the specified [reader]. + */ + public fun record(reader: HostTableReader) {} + + /** + * Record an entry with the specified [reader]. + */ + public fun record(reader: ServiceTableReader) {} +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/NoopTelemetryManager.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/NoopTelemetryManager.kt deleted file mode 100644 index 4e7d0b75..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/NoopTelemetryManager.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.telemetry - -import io.opentelemetry.api.metrics.MeterProvider -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.workload.topology.HostSpec - -/** - * A [TelemetryManager] that does nothing. - */ -public class NoopTelemetryManager : TelemetryManager { - override fun createMeterProvider(host: HostSpec): MeterProvider = MeterProvider.noop() - - override fun createMeterProvider(scheduler: ComputeScheduler): MeterProvider = MeterProvider.noop() -} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt deleted file mode 100644 index 478c0609..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/SdkTelemetryManager.kt +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.telemetry - -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.data.AggregationTemporality -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.export.MetricProducer -import io.opentelemetry.sdk.metrics.export.MetricReader -import io.opentelemetry.sdk.metrics.export.MetricReaderFactory -import io.opentelemetry.sdk.resources.Resource -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.workload.topology.HostSpec -import org.opendc.telemetry.compute.* -import org.opendc.telemetry.sdk.toOtelClock -import java.time.Clock - -/** - * A [TelemetryManager] using the OpenTelemetry Java SDK. - */ -public class SdkTelemetryManager(private val clock: Clock) : TelemetryManager, AutoCloseable { - /** - * The [SdkMeterProvider]s that belong to the workload runner. - */ - private val _meterProviders = mutableListOf() - - /** - * The internal [MetricProducer] registered with the runner. - */ - private val _metricProducers = mutableListOf() - - /** - * The list of [MetricReader]s that have been registered with the runner. - */ - private val _metricReaders = mutableListOf() - - /** - * A [MetricProducer] that combines all the other metric producers. - */ - public val metricProducer: MetricProducer = object : MetricProducer { - private val producers = _metricProducers - - override fun collectAllMetrics(): Collection = producers.flatMap(MetricProducer::collectAllMetrics) - - override fun toString(): String = "SdkTelemetryManager.AggregateMetricProducer" - } - - /** - * Register a [MetricReader] for this manager. - * - * @param factory The factory for the reader to register. - */ - public fun registerMetricReader(factory: MetricReaderFactory) { - val reader = factory.apply(metricProducer) - _metricReaders.add(reader) - } - - override fun createMeterProvider(scheduler: ComputeScheduler): MeterProvider { - val resource = Resource.builder() - .put(ResourceAttributes.SERVICE_NAME, "opendc-compute") - .build() - - return createMeterProvider(resource) - } - - override fun createMeterProvider(host: HostSpec): MeterProvider { - val resource = Resource.builder() - .put(HOST_ID, host.uid.toString()) - .put(HOST_NAME, host.name) - .put(HOST_ARCH, ResourceAttributes.HostArchValues.AMD64) - .put(HOST_NCPUS, host.model.cpus.size) - .put(HOST_MEM_CAPACITY, host.model.memory.sumOf { it.size }) - .build() - - return createMeterProvider(resource) - } - - /** - * Construct a [SdkMeterProvider] for the specified [resource]. - */ - private fun createMeterProvider(resource: Resource): SdkMeterProvider { - val meterProvider = SdkMeterProvider.builder() - .setClock(clock.toOtelClock()) - .setResource(resource) - .registerMetricReader { producer -> - _metricProducers.add(producer) - object : MetricReader { - override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() - } - } - .build() - _meterProviders.add(meterProvider) - return meterProvider - } - - override fun close() { - for (meterProvider in _meterProviders) { - meterProvider.close() - } - - _meterProviders.clear() - - for (metricReader in _metricReaders) { - metricReader.shutdown() - } - - _metricReaders.clear() - _metricProducers.clear() - } -} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/TelemetryManager.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/TelemetryManager.kt deleted file mode 100644 index b67050ce..00000000 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/TelemetryManager.kt +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.compute.workload.telemetry - -import io.opentelemetry.api.metrics.MeterProvider -import org.opendc.compute.service.scheduler.ComputeScheduler -import org.opendc.compute.workload.topology.HostSpec - -/** - * Helper class to manage the telemetry for a [ComputeServiceHelper] instance. - */ -public interface TelemetryManager { - /** - * Construct a [MeterProvider] for the specified [ComputeScheduler]. - */ - public fun createMeterProvider(scheduler: ComputeScheduler): MeterProvider - - /** - * Construct a [MeterProvider] for the specified [HostSpec]. - */ - public fun createMeterProvider(host: HostSpec): MeterProvider -} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostInfo.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostInfo.kt new file mode 100644 index 00000000..5d383e40 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostInfo.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry.table + +/** + * Information about a host exposed to the telemetry service. + */ +public data class HostInfo(val id: String, val name: String, val arch: String, val cpuCount: Int, val memCapacity: Long) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostTableReader.kt new file mode 100644 index 00000000..8f6f0d01 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/HostTableReader.kt @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a host trace entry. + */ +public interface HostTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The [HostInfo] of the host to which the row belongs to. + */ + public val host: HostInfo + + /** + * The number of guests that are in a terminated state. + */ + public val guestsTerminated: Int + + /** + * The number of guests that are in a running state. + */ + public val guestsRunning: Int + + /** + * The number of guests that are in an error state. + */ + public val guestsError: Int + + /** + * The number of guests that are in an unknown state. + */ + public val guestsInvalid: Int + + /** + * The capacity of the CPUs in the host (in MHz). + */ + public val cpuLimit: Double + + /** + * The usage of all CPUs in the host (in MHz). + */ + public val cpuUsage: Double + + /** + * The demand of all vCPUs of the guests (in MHz) + */ + public val cpuDemand: Double + + /** + * The CPU utilization of the host. + */ + public val cpuUtilization: Double + + /** + * The duration (in seconds) that a CPU was active in the host. + */ + public val cpuActiveTime: Long + + /** + * The duration (in seconds) that a CPU was idle in the host. + */ + public val cpuIdleTime: Long + + /** + * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. + */ + public val cpuStealTime: Long + + /** + * The duration (in seconds) of CPU time that was lost due to interference. + */ + public val cpuLostTime: Long + + /** + * The current power usage of the host in W. + */ + public val powerUsage: Double + + /** + * The total power consumption of the host since last time in J. + */ + public val powerTotal: Double + + /** + * The uptime of the host since last time in ms. + */ + public val uptime: Long + + /** + * The downtime of the host since last time in ms. + */ + public val downtime: Long + + /** + * The [Instant] at which the host booted. + */ + public val bootTime: Instant? +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerInfo.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerInfo.kt new file mode 100644 index 00000000..111135b7 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerInfo.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry.table + +/** + * Static information about a server exposed to the telemetry service. + */ +public data class ServerInfo( + val id: String, + val name: String, + val type: String, + val arch: String, + val imageId: String, + val imageName: String, + val cpuCount: Int, + val memCapacity: Long +) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerTableReader.kt new file mode 100644 index 00000000..bccccd01 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServerTableReader.kt @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a server trace entry. + */ +public interface ServerTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The [ServerInfo] of the server to which the row belongs to. + */ + public val server: ServerInfo + + /** + * The [HostInfo] of the host on which the server is hosted or `null` if it has no host. + */ + public val host: HostInfo? + + /** + * The uptime of the host since last time in ms. + */ + public val uptime: Long + + /** + * The downtime of the host since last time in ms. + */ + public val downtime: Long + + /** + * The [Instant] at which the server was enqueued for the scheduler. + */ + public val provisionTime: Instant? + + /** + * The [Instant] at which the server booted. + */ + public val bootTime: Instant? + + /** + * The capacity of the CPUs of the servers (in MHz). + */ + public val cpuLimit: Double + + /** + * The duration (in seconds) that a CPU was active in the server. + */ + public val cpuActiveTime: Long + + /** + * The duration (in seconds) that a CPU was idle in the server. + */ + public val cpuIdleTime: Long + + /** + * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. + */ + public val cpuStealTime: Long + + /** + * The duration (in seconds) of CPU time that was lost due to interference. + */ + public val cpuLostTime: Long +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceData.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceData.kt new file mode 100644 index 00000000..a1df6ea7 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceData.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry.table + +import java.time.Instant + +/** + * A trace entry for the compute service. + */ +public data class ServiceData( + val timestamp: Instant, + val hostsUp: Int, + val hostsDown: Int, + val serversPending: Int, + val serversActive: Int, + val attemptsSuccess: Int, + val attemptsFailure: Int, + val attemptsError: Int +) + +/** + * Convert a [ServiceTableReader] into a persistent object. + */ +public fun ServiceTableReader.toServiceData(): ServiceData { + return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) +} diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceTableReader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceTableReader.kt new file mode 100644 index 00000000..4211ab15 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/telemetry/table/ServiceTableReader.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.telemetry.table + +import java.time.Instant + +/** + * An interface that is used to read a row of a service trace entry. + */ +public interface ServiceTableReader { + /** + * The timestamp of the current entry of the reader. + */ + public val timestamp: Instant + + /** + * The number of hosts that are up at this instant. + */ + public val hostsUp: Int + + /** + * The number of hosts that are down at this instant. + */ + public val hostsDown: Int + + /** + * The number of servers that are pending to be scheduled. + */ + public val serversPending: Int + + /** + * The number of servers that are currently active. + */ + public val serversActive: Int + + /** + * The scheduling attempts that were successful. + */ + public val attemptsSuccess: Int + + /** + * The scheduling attempts that were unsuccessful due to client error. + */ + public val attemptsFailure: Int + + /** + * The scheduling attempts that were unsuccessful due to scheduler error. + */ + public val attemptsError: Int +} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt index dae03513..4344bb08 100644 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt @@ -25,8 +25,8 @@ package org.opendc.compute.workload.export.parquet import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow -import org.opendc.telemetry.compute.table.HostInfo -import org.opendc.telemetry.compute.table.HostTableReader +import org.opendc.compute.workload.telemetry.table.HostInfo +import org.opendc.compute.workload.telemetry.table.HostTableReader import java.nio.file.Files import java.time.Instant diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt index 280f5ef8..8465871d 100644 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt @@ -25,9 +25,9 @@ package org.opendc.compute.workload.export.parquet import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow -import org.opendc.telemetry.compute.table.HostInfo -import org.opendc.telemetry.compute.table.ServerInfo -import org.opendc.telemetry.compute.table.ServerTableReader +import org.opendc.compute.workload.telemetry.table.HostInfo +import org.opendc.compute.workload.telemetry.table.ServerInfo +import org.opendc.compute.workload.telemetry.table.ServerTableReader import java.nio.file.Files import java.time.Instant diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt index 7ffa7186..d91982bc 100644 --- a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt @@ -25,7 +25,7 @@ package org.opendc.compute.workload.export.parquet import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow -import org.opendc.telemetry.compute.table.ServiceTableReader +import org.opendc.compute.workload.telemetry.table.ServiceTableReader import java.nio.file.Files import java.time.Instant diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 9495f4ca..39cf101d 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -37,8 +37,6 @@ dependencies { implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcCompute.opendcComputeSimulator) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(projects.opendcTelemetry.opendcTelemetryCompute) implementation(libs.config) implementation(libs.kotlin.logging) @@ -46,7 +44,6 @@ dependencies { implementation(libs.jackson.module.kotlin) implementation(libs.jackson.dataformat.csv) implementation(kotlin("reflect")) - implementation(libs.opentelemetry.semconv) runtimeOnly(projects.opendcTrace.opendcTraceOpendc) diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt index 83b8c0c6..fd2c26f0 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/kotlin/org/opendc/experiments/capelin/CapelinBenchmarks.kt @@ -22,14 +22,12 @@ package org.opendc.experiments.capelin -import kotlinx.coroutines.ExperimentalCoroutinesApi import org.opendc.compute.service.scheduler.FilterScheduler import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.topology.clusterTopology @@ -46,7 +44,6 @@ import java.util.concurrent.TimeUnit @Fork(1) @Warmup(iterations = 2, time = 5, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) -@OptIn(ExperimentalCoroutinesApi::class) class CapelinBenchmarks { private lateinit var vms: List private lateinit var topology: Topology @@ -59,7 +56,7 @@ class CapelinBenchmarks { val loader = ComputeWorkloadLoader(File("src/test/resources/trace")) val source = trace("bitbrains-small") vms = source.resolve(loader, Random(1L)).vms - topology = checkNotNull(object {}.javaClass.getResourceAsStream("/env/topology.txt")).use { clusterTopology(it) } + topology = checkNotNull(object {}.javaClass.getResourceAsStream("/topology.txt")).use { clusterTopology(it) } } @Benchmark @@ -71,7 +68,6 @@ class CapelinBenchmarks { val runner = ComputeServiceHelper( coroutineContext, clock, - NoopTelemetryManager(), computeScheduler ) diff --git a/opendc-experiments/opendc-experiments-capelin/src/jmh/resources/topology.txt b/opendc-experiments/opendc-experiments-capelin/src/jmh/resources/topology.txt new file mode 100644 index 00000000..6b347bff --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/jmh/resources/topology.txt @@ -0,0 +1,5 @@ +ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost +A01;A01;32;3.2;2048;1;256;32 +B01;B01;48;2.93;1256;6;64;8 +C01;C01;32;3.2;2048;2;128;16 + diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 6fd85e8c..0de8aa7b 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -30,7 +30,7 @@ import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.createComputeScheduler import org.opendc.compute.workload.export.parquet.ParquetComputeMonitor import org.opendc.compute.workload.grid5000 -import org.opendc.compute.workload.telemetry.NoopTelemetryManager +import org.opendc.compute.workload.telemetry.ComputeMetricReader import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.model.OperationalPhenomena import org.opendc.experiments.capelin.model.Topology @@ -39,7 +39,6 @@ import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.ComputeMetricReader import java.io.File import java.time.Duration import java.util.* @@ -99,11 +98,9 @@ abstract class Portfolio(name: String) : Experiment(name) { else null val (vms, interferenceModel) = workload.source.resolve(workloadLoader, seeder) - val telemetry = NoopTelemetryManager() val runner = ComputeServiceHelper( coroutineContext, clock, - telemetry, computeScheduler, failureModel, interferenceModel?.withSeed(repeat.toLong()) diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 62cdf123..fa2cd9c8 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -33,14 +33,13 @@ import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.NoopTelemetryManager +import org.opendc.compute.workload.telemetry.ComputeMetricReader +import org.opendc.compute.workload.telemetry.ComputeMonitor +import org.opendc.compute.workload.telemetry.table.HostTableReader import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply import org.opendc.experiments.capelin.topology.clusterTopology import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.ComputeMetricReader -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostTableReader import java.io.File import java.time.Duration import java.util.* @@ -86,7 +85,6 @@ class CapelinIntegrationTest { val runner = ComputeServiceHelper( coroutineContext, clock, - NoopTelemetryManager(), computeScheduler ) val topology = createTopology() @@ -136,7 +134,6 @@ class CapelinIntegrationTest { val runner = ComputeServiceHelper( coroutineContext, clock, - NoopTelemetryManager(), computeScheduler ) val topology = createTopology("single") @@ -182,7 +179,6 @@ class CapelinIntegrationTest { val simulator = ComputeServiceHelper( coroutineContext, clock, - NoopTelemetryManager(), computeScheduler, interferenceModel = interferenceModel?.withSeed(seed.toLong()) ) @@ -226,7 +222,6 @@ class CapelinIntegrationTest { val simulator = ComputeServiceHelper( coroutineContext, clock, - NoopTelemetryManager(), computeScheduler, grid5000(Duration.ofDays(7)) ) diff --git a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts deleted file mode 100644 index b476a669..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Telemetry for OpenDC Compute" - -/* Build configuration */ -plugins { - `kotlin-library-conventions` -} - -dependencies { - api(projects.opendcTelemetry.opendcTelemetrySdk) - - implementation(projects.opendcCompute.opendcComputeService) - implementation(libs.opentelemetry.semconv) - implementation(libs.kotlin.logging) -} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt deleted file mode 100644 index 9557f680..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt +++ /dev/null @@ -1,517 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:Suppress("PropertyName") - -package org.opendc.telemetry.compute - -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes -import io.opentelemetry.sdk.metrics.data.MetricData -import io.opentelemetry.sdk.metrics.data.PointData -import io.opentelemetry.sdk.resources.Resource -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes -import org.opendc.telemetry.compute.table.* -import java.time.Instant - -/** - * Helper class responsible for aggregating [MetricData] into [ServiceTableReader], [HostTableReader] and [ServerTableReader]. - */ -public class ComputeMetricAggregator { - private val _service = ServiceAggregator() - private val _hosts = mutableMapOf() - private val _servers = mutableMapOf() - - /** - * Process the specified [metrics] for this cycle. - */ - public fun process(metrics: Collection) { - val service = _service - val hosts = _hosts - val servers = _servers - - for (metric in metrics) { - val resource = metric.resource - - when (metric.name) { - // ComputeService - "scheduler.hosts" -> { - for (point in metric.longSumData.points) { - // Record the timestamp for the service - service.recordTimestamp(point) - - when (point.attributes[STATE_KEY]) { - "up" -> service._hostsUp = point.value.toInt() - "down" -> service._hostsDown = point.value.toInt() - } - } - } - "scheduler.servers" -> { - for (point in metric.longSumData.points) { - when (point.attributes[STATE_KEY]) { - "pending" -> service._serversPending = point.value.toInt() - "active" -> service._serversActive = point.value.toInt() - } - } - } - "scheduler.attempts" -> { - for (point in metric.longSumData.points) { - when (point.attributes[RESULT_KEY]) { - "success" -> service._attemptsSuccess = point.value.toInt() - "failure" -> service._attemptsFailure = point.value.toInt() - "error" -> service._attemptsError = point.value.toInt() - } - } - } - - // SimHost - "system.guests" -> { - val agg = getHost(hosts, resource) ?: continue - - for (point in metric.longSumData.points) { - when (point.attributes[STATE_KEY]) { - "terminated" -> agg._guestsTerminated = point.value.toInt() - "running" -> agg._guestsRunning = point.value.toInt() - "error" -> agg._guestsRunning = point.value.toInt() - "invalid" -> agg._guestsInvalid = point.value.toInt() - } - } - } - "system.cpu.limit" -> { - val agg = getHost(hosts, resource) ?: continue - - for (point in metric.doubleGaugeData.points) { - val server = getServer(servers, point) - - if (server != null) { - server._cpuLimit = point.value - server._host = agg.host - } else { - agg._cpuLimit = point.value - } - } - } - "system.cpu.usage" -> { - val agg = getHost(hosts, resource) ?: continue - agg._cpuUsage = metric.doubleGaugeData.points.first().value - } - "system.cpu.demand" -> { - val agg = getHost(hosts, resource) ?: continue - agg._cpuDemand = metric.doubleGaugeData.points.first().value - } - "system.cpu.utilization" -> { - val agg = getHost(hosts, resource) ?: continue - agg._cpuUtilization = metric.doubleGaugeData.points.first().value - } - "system.cpu.time" -> { - val agg = getHost(hosts, resource) ?: continue - - for (point in metric.longSumData.points) { - val server = getServer(servers, point) - val state = point.attributes[STATE_KEY] - if (server != null) { - when (state) { - "active" -> server._cpuActiveTime = point.value - "idle" -> server._cpuIdleTime = point.value - "steal" -> server._cpuStealTime = point.value - "lost" -> server._cpuLostTime = point.value - } - server._host = agg.host - } else { - when (state) { - "active" -> agg._cpuActiveTime = point.value - "idle" -> agg._cpuIdleTime = point.value - "steal" -> agg._cpuStealTime = point.value - "lost" -> agg._cpuLostTime = point.value - } - } - } - } - "system.power.usage" -> { - val agg = getHost(hosts, resource) ?: continue - agg._powerUsage = metric.doubleGaugeData.points.first().value - } - "system.power.total" -> { - val agg = getHost(hosts, resource) ?: continue - agg._powerTotal = metric.doubleSumData.points.first().value - } - "system.time" -> { - val agg = getHost(hosts, resource) ?: continue - - for (point in metric.longSumData.points) { - val server = getServer(servers, point) - - if (server != null) { - server.recordTimestamp(point) - - when (point.attributes[STATE_KEY]) { - "up" -> server._uptime = point.value - "down" -> server._downtime = point.value - } - server._host = agg.host - } else { - agg.recordTimestamp(point) - - when (point.attributes[STATE_KEY]) { - "up" -> agg._uptime = point.value - "down" -> agg._downtime = point.value - } - } - } - } - "system.time.boot" -> { - val agg = getHost(hosts, resource) ?: continue - - for (point in metric.longGaugeData.points) { - val server = getServer(servers, point) - - if (server != null) { - server._bootTime = Instant.ofEpochMilli(point.value) - server._host = agg.host - } else { - agg._bootTime = Instant.ofEpochMilli(point.value) - } - } - } - "system.time.provision" -> { - for (point in metric.longGaugeData.points) { - val server = getServer(servers, point) ?: continue - server.recordTimestamp(point) - server._provisionTime = Instant.ofEpochMilli(point.value) - } - } - } - } - } - - /** - * Collect the data via the [monitor]. - */ - public fun collect(monitor: ComputeMonitor) { - monitor.record(_service) - - for (host in _hosts.values) { - monitor.record(host) - host.reset() - } - - for (server in _servers.values) { - monitor.record(server) - server.reset() - } - } - - /** - * Obtain the [HostAggregator] for the specified [resource]. - */ - private fun getHost(hosts: MutableMap, resource: Resource): HostAggregator? { - val id = resource.attributes[HOST_ID] - return if (id != null) { - hosts.getOrPut(id) { HostAggregator(resource) } - } else { - null - } - } - - /** - * Obtain the [ServerAggregator] for the specified [point]. - */ - private fun getServer(servers: MutableMap, point: PointData): ServerAggregator? { - val id = point.attributes[ResourceAttributes.HOST_ID] - return if (id != null) { - servers.getOrPut(id) { ServerAggregator(point.attributes) } - } else { - null - } - } - - /** - * An aggregator for service metrics before they are reported. - */ - internal class ServiceAggregator : ServiceTableReader { - private var _timestamp: Instant = Instant.MIN - override val timestamp: Instant - get() = _timestamp - - override val hostsUp: Int - get() = _hostsUp - @JvmField var _hostsUp = 0 - - override val hostsDown: Int - get() = _hostsDown - @JvmField var _hostsDown = 0 - - override val serversPending: Int - get() = _serversPending - @JvmField var _serversPending = 0 - - override val serversActive: Int - get() = _serversActive - @JvmField var _serversActive = 0 - - override val attemptsSuccess: Int - get() = _attemptsSuccess - @JvmField var _attemptsSuccess = 0 - - override val attemptsFailure: Int - get() = _attemptsFailure - @JvmField var _attemptsFailure = 0 - - override val attemptsError: Int - get() = _attemptsError - @JvmField var _attemptsError = 0 - - /** - * Record the timestamp of a [point] for this aggregator. - */ - fun recordTimestamp(point: PointData) { - _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms - } - } - - /** - * An aggregator for host metrics before they are reported. - */ - internal class HostAggregator(resource: Resource) : HostTableReader { - /** - * The static information about this host. - */ - override val host = HostInfo( - resource.attributes[HOST_ID]!!, - resource.attributes[HOST_NAME] ?: "", - resource.attributes[HOST_ARCH] ?: "", - resource.attributes[HOST_NCPUS]?.toInt() ?: 0, - resource.attributes[HOST_MEM_CAPACITY] ?: 0, - ) - - override val timestamp: Instant - get() = _timestamp - private var _timestamp = Instant.MIN - - override val guestsTerminated: Int - get() = _guestsTerminated - @JvmField var _guestsTerminated = 0 - - override val guestsRunning: Int - get() = _guestsRunning - @JvmField var _guestsRunning = 0 - - override val guestsError: Int - get() = _guestsError - @JvmField var _guestsError = 0 - - override val guestsInvalid: Int - get() = _guestsInvalid - @JvmField var _guestsInvalid = 0 - - override val cpuLimit: Double - get() = _cpuLimit - @JvmField var _cpuLimit = 0.0 - - override val cpuUsage: Double - get() = _cpuUsage - @JvmField var _cpuUsage = 0.0 - - override val cpuDemand: Double - get() = _cpuDemand - @JvmField var _cpuDemand = 0.0 - - override val cpuUtilization: Double - get() = _cpuUtilization - @JvmField var _cpuUtilization = 0.0 - - override val cpuActiveTime: Long - get() = _cpuActiveTime - previousCpuActiveTime - @JvmField var _cpuActiveTime = 0L - private var previousCpuActiveTime = 0L - - override val cpuIdleTime: Long - get() = _cpuIdleTime - previousCpuIdleTime - @JvmField var _cpuIdleTime = 0L - private var previousCpuIdleTime = 0L - - override val cpuStealTime: Long - get() = _cpuStealTime - previousCpuStealTime - @JvmField var _cpuStealTime = 0L - private var previousCpuStealTime = 0L - - override val cpuLostTime: Long - get() = _cpuLostTime - previousCpuLostTime - @JvmField var _cpuLostTime = 0L - private var previousCpuLostTime = 0L - - override val powerUsage: Double - get() = _powerUsage - @JvmField var _powerUsage = 0.0 - - override val powerTotal: Double - get() = _powerTotal - previousPowerTotal - @JvmField var _powerTotal = 0.0 - private var previousPowerTotal = 0.0 - - override val uptime: Long - get() = _uptime - previousUptime - @JvmField var _uptime = 0L - private var previousUptime = 0L - - override val downtime: Long - get() = _downtime - previousDowntime - @JvmField var _downtime = 0L - private var previousDowntime = 0L - - override val bootTime: Instant? - get() = _bootTime - @JvmField var _bootTime: Instant? = null - - /** - * Finish the aggregation for this cycle. - */ - fun reset() { - // Reset intermediate state for next aggregation - previousCpuActiveTime = _cpuActiveTime - previousCpuIdleTime = _cpuIdleTime - previousCpuStealTime = _cpuStealTime - previousCpuLostTime = _cpuLostTime - previousPowerTotal = _powerTotal - previousUptime = _uptime - previousDowntime = _downtime - - _guestsTerminated = 0 - _guestsRunning = 0 - _guestsError = 0 - _guestsInvalid = 0 - - _cpuLimit = 0.0 - _cpuUsage = 0.0 - _cpuDemand = 0.0 - _cpuUtilization = 0.0 - - _powerUsage = 0.0 - } - - /** - * Record the timestamp of a [point] for this aggregator. - */ - fun recordTimestamp(point: PointData) { - _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms - } - } - - /** - * An aggregator for server metrics before they are reported. - */ - internal class ServerAggregator(attributes: Attributes) : ServerTableReader { - /** - * The static information about this server. - */ - override val server = ServerInfo( - attributes[ResourceAttributes.HOST_ID]!!, - attributes[ResourceAttributes.HOST_NAME]!!, - attributes[ResourceAttributes.HOST_TYPE]!!, - attributes[ResourceAttributes.HOST_ARCH]!!, - attributes[ResourceAttributes.HOST_IMAGE_ID]!!, - attributes[ResourceAttributes.HOST_IMAGE_NAME]!!, - attributes[AttributeKey.longKey("host.num_cpus")]!!.toInt(), - attributes[AttributeKey.longKey("host.mem_capacity")]!!, - ) - - /** - * The [HostInfo] of the host on which the server is hosted. - */ - override val host: HostInfo? - get() = _host - @JvmField var _host: HostInfo? = null - - private var _timestamp = Instant.MIN - override val timestamp: Instant - get() = _timestamp - - override val uptime: Long - get() = _uptime - previousUptime - @JvmField var _uptime: Long = 0 - private var previousUptime = 0L - - override val downtime: Long - get() = _downtime - previousDowntime - @JvmField var _downtime: Long = 0 - private var previousDowntime = 0L - - override val provisionTime: Instant? - get() = _provisionTime - @JvmField var _provisionTime: Instant? = null - - override val bootTime: Instant? - get() = _bootTime - @JvmField var _bootTime: Instant? = null - - override val cpuLimit: Double - get() = _cpuLimit - @JvmField var _cpuLimit = 0.0 - - override val cpuActiveTime: Long - get() = _cpuActiveTime - previousCpuActiveTime - @JvmField var _cpuActiveTime = 0L - private var previousCpuActiveTime = 0L - - override val cpuIdleTime: Long - get() = _cpuIdleTime - previousCpuIdleTime - @JvmField var _cpuIdleTime = 0L - private var previousCpuIdleTime = 0L - - override val cpuStealTime: Long - get() = _cpuStealTime - previousCpuStealTime - @JvmField var _cpuStealTime = 0L - private var previousCpuStealTime = 0L - - override val cpuLostTime: Long - get() = _cpuLostTime - previousCpuLostTime - @JvmField var _cpuLostTime = 0L - private var previousCpuLostTime = 0L - - /** - * Finish the aggregation for this cycle. - */ - fun reset() { - previousUptime = _uptime - previousDowntime = _downtime - previousCpuActiveTime = _cpuActiveTime - previousCpuIdleTime = _cpuIdleTime - previousCpuStealTime = _cpuStealTime - previousCpuLostTime = _cpuLostTime - - _host = null - _cpuLimit = 0.0 - } - - /** - * Record the timestamp of a [point] for this aggregator. - */ - fun recordTimestamp(point: PointData) { - _timestamp = Instant.ofEpochMilli(point.epochNanos / 1_000_000L) // ns to ms - } - } - - private companion object { - private val STATE_KEY = AttributeKey.stringKey("state") - private val RESULT_KEY = AttributeKey.stringKey("result") - } -} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt deleted file mode 100644 index 3ab6c7b2..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.telemetry.compute - -import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.data.* -import io.opentelemetry.sdk.metrics.export.MetricExporter -import mu.KotlinLogging - -/** - * A [MetricExporter] that redirects data to a [ComputeMonitor] implementation. - */ -public abstract class ComputeMetricExporter : MetricExporter, ComputeMonitor { - /** - * The logging instance for this exporter. - */ - private val logger = KotlinLogging.logger {} - - /** - * A [ComputeMetricAggregator] that actually performs the aggregation. - */ - private val agg = ComputeMetricAggregator() - - override fun export(metrics: Collection): CompletableResultCode { - return try { - agg.process(metrics) - agg.collect(this) - - CompletableResultCode.ofSuccess() - } catch (e: Throwable) { - logger.warn(e) { "Failed to export results" } - CompletableResultCode.ofFailure() - } - } - - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() - - override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() -} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt deleted file mode 100644 index 593203fc..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricReader.kt +++ /dev/null @@ -1,424 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.telemetry.compute - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch -import mu.KotlinLogging -import org.opendc.compute.api.Server -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.driver.Host -import org.opendc.telemetry.compute.table.* -import java.time.Clock -import java.time.Duration -import java.time.Instant - -/** - * A helper class to collect metrics from a [ComputeService] instance and automatically export the metrics every - * export interval. - * - * @param scope The [CoroutineScope] to run the reader in. - * @param clock The virtual clock. - * @param service The [ComputeService] to monitor. - * @param servers The [Server]s to monitor. - * @param monitor The monitor to export the metrics to. - * @param exportInterval The export interval. - */ -public class ComputeMetricReader( - scope: CoroutineScope, - clock: Clock, - private val service: ComputeService, - private val servers: List, - private val monitor: ComputeMonitor, - private val exportInterval: Duration = Duration.ofMinutes(5) -) : AutoCloseable { - private val logger = KotlinLogging.logger {} - - /** - * Aggregator for service metrics. - */ - private val serviceTableReader = ServiceTableReaderImpl(service) - - /** - * Mapping from [Host] instances to [HostTableReaderImpl] - */ - private val hostTableReaders = mutableMapOf() - - /** - * Mapping from [Server] instances to [ServerTableReaderImpl] - */ - private val serverTableReaders = mutableMapOf() - - /** - * The background job that is responsible for collecting the metrics every cycle. - */ - private val job = scope.launch { - val intervalMs = exportInterval.toMillis() - - try { - while (isActive) { - delay(intervalMs) - - try { - val now = clock.instant() - - for (host in service.hosts) { - val reader = hostTableReaders.computeIfAbsent(host) { HostTableReaderImpl(it) } - reader.record(now) - monitor.record(reader) - reader.reset() - } - - for (server in servers) { - val reader = serverTableReaders.computeIfAbsent(server) { ServerTableReaderImpl(service, it) } - reader.record(now) - monitor.record(reader) - reader.reset() - } - - serviceTableReader.record(now) - monitor.record(serviceTableReader) - } catch (cause: Throwable) { - logger.warn(cause) { "Exporter threw an Exception" } - } - } - } finally { - if (monitor is AutoCloseable) { - monitor.close() - } - } - } - - override fun close() { - job.cancel() - } - - /** - * An aggregator for service metrics before they are reported. - */ - private class ServiceTableReaderImpl(private val service: ComputeService) : ServiceTableReader { - private var _timestamp: Instant = Instant.MIN - override val timestamp: Instant - get() = _timestamp - - override val hostsUp: Int - get() = _hostsUp - private var _hostsUp = 0 - - override val hostsDown: Int - get() = _hostsDown - private var _hostsDown = 0 - - override val serversPending: Int - get() = _serversPending - private var _serversPending = 0 - - override val serversActive: Int - get() = _serversActive - private var _serversActive = 0 - - override val attemptsSuccess: Int - get() = _attemptsSuccess - private var _attemptsSuccess = 0 - - override val attemptsFailure: Int - get() = _attemptsFailure - private var _attemptsFailure = 0 - - override val attemptsError: Int - get() = _attemptsError - private var _attemptsError = 0 - - /** - * Record the next cycle. - */ - fun record(now: Instant) { - _timestamp = now - - val stats = service.getSchedulerStats() - _hostsUp = stats.hostsAvailable - _hostsDown = stats.hostsUnavailable - _serversPending = stats.serversPending - _serversActive = stats.serversActive - _attemptsSuccess = stats.attemptsSuccess.toInt() - _attemptsFailure = stats.attemptsFailure.toInt() - _attemptsError = stats.attemptsError.toInt() - } - } - - /** - * An aggregator for host metrics before they are reported. - */ - private class HostTableReaderImpl(host: Host) : HostTableReader { - private val _host = host - - override val host: HostInfo = HostInfo(host.uid.toString(), host.name, "x86", host.model.cpuCount, host.model.memoryCapacity) - - override val timestamp: Instant - get() = _timestamp - private var _timestamp = Instant.MIN - - override val guestsTerminated: Int - get() = _guestsTerminated - private var _guestsTerminated = 0 - - override val guestsRunning: Int - get() = _guestsRunning - private var _guestsRunning = 0 - - override val guestsError: Int - get() = _guestsError - private var _guestsError = 0 - - override val guestsInvalid: Int - get() = _guestsInvalid - private var _guestsInvalid = 0 - - override val cpuLimit: Double - get() = _cpuLimit - private var _cpuLimit = 0.0 - - override val cpuUsage: Double - get() = _cpuUsage - private var _cpuUsage = 0.0 - - override val cpuDemand: Double - get() = _cpuDemand - private var _cpuDemand = 0.0 - - override val cpuUtilization: Double - get() = _cpuUtilization - private var _cpuUtilization = 0.0 - - override val cpuActiveTime: Long - get() = _cpuActiveTime - previousCpuActiveTime - private var _cpuActiveTime = 0L - private var previousCpuActiveTime = 0L - - override val cpuIdleTime: Long - get() = _cpuIdleTime - previousCpuIdleTime - private var _cpuIdleTime = 0L - private var previousCpuIdleTime = 0L - - override val cpuStealTime: Long - get() = _cpuStealTime - previousCpuStealTime - private var _cpuStealTime = 0L - private var previousCpuStealTime = 0L - - override val cpuLostTime: Long - get() = _cpuLostTime - previousCpuLostTime - private var _cpuLostTime = 0L - private var previousCpuLostTime = 0L - - override val powerUsage: Double - get() = _powerUsage - private var _powerUsage = 0.0 - - override val powerTotal: Double - get() = _powerTotal - previousPowerTotal - private var _powerTotal = 0.0 - private var previousPowerTotal = 0.0 - - override val uptime: Long - get() = _uptime - previousUptime - private var _uptime = 0L - private var previousUptime = 0L - - override val downtime: Long - get() = _downtime - previousDowntime - private var _downtime = 0L - private var previousDowntime = 0L - - override val bootTime: Instant? - get() = _bootTime - private var _bootTime: Instant? = null - - /** - * Record the next cycle. - */ - fun record(now: Instant) { - val hostCpuStats = _host.getCpuStats() - val hostSysStats = _host.getSystemStats() - - _timestamp = now - _guestsTerminated = hostSysStats.guestsTerminated - _guestsRunning = hostSysStats.guestsRunning - _guestsError = hostSysStats.guestsError - _guestsInvalid = hostSysStats.guestsInvalid - _cpuLimit = hostCpuStats.capacity - _cpuDemand = hostCpuStats.demand - _cpuUsage = hostCpuStats.usage - _cpuUtilization = hostCpuStats.utilization - _cpuActiveTime = hostCpuStats.activeTime - _cpuIdleTime = hostCpuStats.idleTime - _cpuStealTime = hostCpuStats.stealTime - _cpuLostTime = hostCpuStats.lostTime - _powerUsage = hostSysStats.powerUsage - _powerTotal = hostSysStats.energyUsage - _uptime = hostSysStats.uptime.toMillis() - _downtime = hostSysStats.downtime.toMillis() - _bootTime = hostSysStats.bootTime - } - - /** - * Finish the aggregation for this cycle. - */ - fun reset() { - // Reset intermediate state for next aggregation - previousCpuActiveTime = _cpuActiveTime - previousCpuIdleTime = _cpuIdleTime - previousCpuStealTime = _cpuStealTime - previousCpuLostTime = _cpuLostTime - previousPowerTotal = _powerTotal - previousUptime = _uptime - previousDowntime = _downtime - - _guestsTerminated = 0 - _guestsRunning = 0 - _guestsError = 0 - _guestsInvalid = 0 - - _cpuLimit = 0.0 - _cpuUsage = 0.0 - _cpuDemand = 0.0 - _cpuUtilization = 0.0 - - _powerUsage = 0.0 - } - } - - /** - * An aggregator for server metrics before they are reported. - */ - private class ServerTableReaderImpl(private val service: ComputeService, server: Server) : ServerTableReader { - private val _server = server - - /** - * The static information about this server. - */ - override val server = ServerInfo( - server.uid.toString(), - server.name, - "vm", - "x86", - server.image.uid.toString(), - server.image.name, - server.flavor.cpuCount, - server.flavor.memorySize - ) - - /** - * The [HostInfo] of the host on which the server is hosted. - */ - override var host: HostInfo? = null - private var _host: Host? = null - - private var _timestamp = Instant.MIN - override val timestamp: Instant - get() = _timestamp - - override val uptime: Long - get() = _uptime - previousUptime - private var _uptime: Long = 0 - private var previousUptime = 0L - - override val downtime: Long - get() = _downtime - previousDowntime - private var _downtime: Long = 0 - private var previousDowntime = 0L - - override val provisionTime: Instant? - get() = _provisionTime - private var _provisionTime: Instant? = null - - override val bootTime: Instant? - get() = _bootTime - private var _bootTime: Instant? = null - - override val cpuLimit: Double - get() = _cpuLimit - private var _cpuLimit = 0.0 - - override val cpuActiveTime: Long - get() = _cpuActiveTime - previousCpuActiveTime - private var _cpuActiveTime = 0L - private var previousCpuActiveTime = 0L - - override val cpuIdleTime: Long - get() = _cpuIdleTime - previousCpuIdleTime - private var _cpuIdleTime = 0L - private var previousCpuIdleTime = 0L - - override val cpuStealTime: Long - get() = _cpuStealTime - previousCpuStealTime - private var _cpuStealTime = 0L - private var previousCpuStealTime = 0L - - override val cpuLostTime: Long - get() = _cpuLostTime - previousCpuLostTime - private var _cpuLostTime = 0L - private var previousCpuLostTime = 0L - - /** - * Record the next cycle. - */ - fun record(now: Instant) { - val newHost = service.lookupHost(_server) - if (newHost != null && newHost.uid != _host?.uid) { - _host = newHost - host = HostInfo(newHost.uid.toString(), newHost.name, "x86", newHost.model.cpuCount, newHost.model.memoryCapacity) - } - - val cpuStats = _host?.getCpuStats(_server) - val sysStats = _host?.getSystemStats(_server) - - _timestamp = now - _cpuLimit = cpuStats?.capacity ?: 0.0 - _cpuActiveTime = cpuStats?.activeTime ?: 0 - _cpuIdleTime = cpuStats?.idleTime ?: 0 - _cpuStealTime = cpuStats?.stealTime ?: 0 - _cpuLostTime = cpuStats?.lostTime ?: 0 - _uptime = sysStats?.uptime?.toMillis() ?: 0 - _downtime = sysStats?.downtime?.toMillis() ?: 0 - _provisionTime = _server.launchedAt - _bootTime = sysStats?.bootTime - } - - /** - * Finish the aggregation for this cycle. - */ - fun reset() { - previousUptime = _uptime - previousDowntime = _downtime - previousCpuActiveTime = _cpuActiveTime - previousCpuIdleTime = _cpuIdleTime - previousCpuStealTime = _cpuStealTime - previousCpuLostTime = _cpuLostTime - - _host = null - _cpuLimit = 0.0 - } - } -} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt deleted file mode 100644 index 64b5f337..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.telemetry.compute - -import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.telemetry.compute.table.ServerTableReader -import org.opendc.telemetry.compute.table.ServiceTableReader - -/** - * A monitor that tracks the metrics and events of the OpenDC Compute service. - */ -public interface ComputeMonitor { - /** - * Record an entry with the specified [reader]. - */ - public fun record(reader: ServerTableReader) {} - - /** - * Record an entry with the specified [reader]. - */ - public fun record(reader: HostTableReader) {} - - /** - * Record an entry with the specified [reader]. - */ - public fun record(reader: ServiceTableReader) {} -} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt deleted file mode 100644 index 41315b15..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.telemetry.compute - -import io.opentelemetry.sdk.metrics.export.MetricProducer -import org.opendc.telemetry.compute.table.ServiceData -import org.opendc.telemetry.compute.table.ServiceTableReader -import org.opendc.telemetry.compute.table.toServiceData - -/** - * Collect the metrics of the compute service. - */ -public fun collectServiceMetrics(metricProducer: MetricProducer): ServiceData { - lateinit var serviceData: ServiceData - val agg = ComputeMetricAggregator() - val monitor = object : ComputeMonitor { - override fun record(reader: ServiceTableReader) { - serviceData = reader.toServiceData() - } - } - - agg.process(metricProducer.collectAllMetrics()) - agg.collect(monitor) - return serviceData -} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt deleted file mode 100644 index 7dca6186..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("HostAttributes") -package org.opendc.telemetry.compute - -import io.opentelemetry.api.common.AttributeKey - -/** - * The identifier of the node hosting virtual machines. - */ -public val HOST_ID: AttributeKey = AttributeKey.stringKey("node.id") - -/** - * The name of the node hosting virtual machines. - */ -public val HOST_NAME: AttributeKey = AttributeKey.stringKey("node.name") - -/** - * The CPU architecture of the host node. - */ -public val HOST_ARCH: AttributeKey = AttributeKey.stringKey("node.arch") - -/** - * The number of CPUs in the host node. - */ -public val HOST_NCPUS: AttributeKey = AttributeKey.longKey("node.num_cpus") - -/** - * The amount of memory installed in the host node in MiB. - */ -public val HOST_MEM_CAPACITY: AttributeKey = AttributeKey.longKey("node.mem_capacity") diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt deleted file mode 100644 index d9a5906b..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.telemetry.compute.table - -/** - * Information about a host exposed to the telemetry service. - */ -public data class HostInfo(val id: String, val name: String, val arch: String, val cpuCount: Int, val memCapacity: Long) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt deleted file mode 100644 index 1e1ad94e..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostTableReader.kt +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.telemetry.compute.table - -import java.time.Instant - -/** - * An interface that is used to read a row of a host trace entry. - */ -public interface HostTableReader { - /** - * The timestamp of the current entry of the reader. - */ - public val timestamp: Instant - - /** - * The [HostInfo] of the host to which the row belongs to. - */ - public val host: HostInfo - - /** - * The number of guests that are in a terminated state. - */ - public val guestsTerminated: Int - - /** - * The number of guests that are in a running state. - */ - public val guestsRunning: Int - - /** - * The number of guests that are in an error state. - */ - public val guestsError: Int - - /** - * The number of guests that are in an unknown state. - */ - public val guestsInvalid: Int - - /** - * The capacity of the CPUs in the host (in MHz). - */ - public val cpuLimit: Double - - /** - * The usage of all CPUs in the host (in MHz). - */ - public val cpuUsage: Double - - /** - * The demand of all vCPUs of the guests (in MHz) - */ - public val cpuDemand: Double - - /** - * The CPU utilization of the host. - */ - public val cpuUtilization: Double - - /** - * The duration (in seconds) that a CPU was active in the host. - */ - public val cpuActiveTime: Long - - /** - * The duration (in seconds) that a CPU was idle in the host. - */ - public val cpuIdleTime: Long - - /** - * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. - */ - public val cpuStealTime: Long - - /** - * The duration (in seconds) of CPU time that was lost due to interference. - */ - public val cpuLostTime: Long - - /** - * The current power usage of the host in W. - */ - public val powerUsage: Double - - /** - * The total power consumption of the host since last time in J. - */ - public val powerTotal: Double - - /** - * The uptime of the host since last time in ms. - */ - public val uptime: Long - - /** - * The downtime of the host since last time in ms. - */ - public val downtime: Long - - /** - * The [Instant] at which the host booted. - */ - public val bootTime: Instant? -} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt deleted file mode 100644 index b16e5f3d..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.telemetry.compute.table - -/** - * Static information about a server exposed to the telemetry service. - */ -public data class ServerInfo( - val id: String, - val name: String, - val type: String, - val arch: String, - val imageId: String, - val imageName: String, - val cpuCount: Int, - val memCapacity: Long -) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt deleted file mode 100644 index c23d1467..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerTableReader.kt +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.telemetry.compute.table - -import java.time.Instant - -/** - * An interface that is used to read a row of a server trace entry. - */ -public interface ServerTableReader { - /** - * The timestamp of the current entry of the reader. - */ - public val timestamp: Instant - - /** - * The [ServerInfo] of the server to which the row belongs to. - */ - public val server: ServerInfo - - /** - * The [HostInfo] of the host on which the server is hosted or `null` if it has no host. - */ - public val host: HostInfo? - - /** - * The uptime of the host since last time in ms. - */ - public val uptime: Long - - /** - * The downtime of the host since last time in ms. - */ - public val downtime: Long - - /** - * The [Instant] at which the server was enqueued for the scheduler. - */ - public val provisionTime: Instant? - - /** - * The [Instant] at which the server booted. - */ - public val bootTime: Instant? - - /** - * The capacity of the CPUs of the servers (in MHz). - */ - public val cpuLimit: Double - - /** - * The duration (in seconds) that a CPU was active in the server. - */ - public val cpuActiveTime: Long - - /** - * The duration (in seconds) that a CPU was idle in the server. - */ - public val cpuIdleTime: Long - - /** - * The duration (in seconds) that a vCPU wanted to run, but no capacity was available. - */ - public val cpuStealTime: Long - - /** - * The duration (in seconds) of CPU time that was lost due to interference. - */ - public val cpuLostTime: Long -} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt deleted file mode 100644 index 39bf96f4..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.telemetry.compute.table - -import java.time.Instant - -/** - * A trace entry for the compute service. - */ -public data class ServiceData( - val timestamp: Instant, - val hostsUp: Int, - val hostsDown: Int, - val serversPending: Int, - val serversActive: Int, - val attemptsSuccess: Int, - val attemptsFailure: Int, - val attemptsError: Int -) - -/** - * Convert a [ServiceTableReader] into a persistent object. - */ -public fun ServiceTableReader.toServiceData(): ServiceData { - return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) -} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt deleted file mode 100644 index 908f6748..00000000 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceTableReader.kt +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.telemetry.compute.table - -import java.time.Instant - -/** - * An interface that is used to read a row of a service trace entry. - */ -public interface ServiceTableReader { - /** - * The timestamp of the current entry of the reader. - */ - public val timestamp: Instant - - /** - * The number of hosts that are up at this instant. - */ - public val hostsUp: Int - - /** - * The number of hosts that are down at this instant. - */ - public val hostsDown: Int - - /** - * The number of servers that are pending to be scheduled. - */ - public val serversPending: Int - - /** - * The number of servers that are currently active. - */ - public val serversActive: Int - - /** - * The scheduling attempts that were successful. - */ - public val attemptsSuccess: Int - - /** - * The scheduling attempts that were unsuccessful due to client error. - */ - public val attemptsFailure: Int - - /** - * The scheduling attempts that were unsuccessful due to scheduler error. - */ - public val attemptsError: Int -} diff --git a/opendc-web/opendc-web-runner/build.gradle.kts b/opendc-web/opendc-web-runner/build.gradle.kts index 3c80f605..c1e3b976 100644 --- a/opendc-web/opendc-web-runner/build.gradle.kts +++ b/opendc-web/opendc-web-runner/build.gradle.kts @@ -37,8 +37,6 @@ dependencies { implementation(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcCompute.opendcComputeWorkload) implementation(projects.opendcSimulator.opendcSimulatorCore) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(projects.opendcTelemetry.opendcTelemetryCompute) implementation(projects.opendcTrace.opendcTraceApi) implementation(projects.opendcWeb.opendcWebClient) diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt index 7c0c43ed..9c9a866d 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/OpenDCRunner.kt @@ -25,7 +25,7 @@ package org.opendc.web.runner import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.workload.* -import org.opendc.compute.workload.telemetry.NoopTelemetryManager +import org.opendc.compute.workload.telemetry.ComputeMetricReader import org.opendc.compute.workload.topology.HostSpec import org.opendc.compute.workload.topology.Topology import org.opendc.compute.workload.topology.apply @@ -36,7 +36,6 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.power.LinearPowerModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.compute.ComputeMetricReader import org.opendc.web.client.runner.OpenDCRunnerClient import org.opendc.web.proto.runner.Job import org.opendc.web.proto.runner.Scenario @@ -209,7 +208,6 @@ public class OpenDCRunner( val simulator = ComputeServiceHelper( coroutineContext, clock, - NoopTelemetryManager(), computeScheduler, failureModel, interferenceModel.takeIf { phenomena.interference } diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt index 69350d8c..01002c70 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/internal/WebComputeMonitor.kt @@ -22,9 +22,9 @@ package org.opendc.web.runner.internal -import org.opendc.telemetry.compute.ComputeMonitor -import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.telemetry.compute.table.ServiceTableReader +import org.opendc.compute.workload.telemetry.ComputeMonitor +import org.opendc.compute.workload.telemetry.table.HostTableReader +import org.opendc.compute.workload.telemetry.table.ServiceTableReader import kotlin.math.max import kotlin.math.roundToLong diff --git a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt index d5f06587..73d1b23b 100644 --- a/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt +++ b/opendc-workflow/opendc-workflow-service/src/test/kotlin/org/opendc/workflow/service/WorkflowServiceTest.kt @@ -32,7 +32,6 @@ import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.VCpuWeigher import org.opendc.compute.workload.ComputeServiceHelper -import org.opendc.compute.workload.telemetry.NoopTelemetryManager import org.opendc.compute.workload.topology.HostSpec import org.opendc.simulator.compute.kernel.SimSpaceSharedHypervisorProvider import org.opendc.simulator.compute.model.MachineModel @@ -70,7 +69,12 @@ internal class WorkflowServiceTest { weighers = listOf(VCpuWeigher(1.0, multiplier = 1.0)) ) - val computeHelper = ComputeServiceHelper(coroutineContext, clock, NoopTelemetryManager(), computeScheduler, schedulingQuantum = Duration.ofSeconds(1)) + val computeHelper = ComputeServiceHelper( + coroutineContext, + clock, + computeScheduler, + schedulingQuantum = Duration.ofSeconds(1) + ) val hostCount = 4 repeat(hostCount) { computeHelper.registerHost(createHostSpec(it)) } diff --git a/settings.gradle.kts b/settings.gradle.kts index a779edcc..06289e72 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -49,7 +49,6 @@ include(":opendc-simulator:opendc-simulator-network") include(":opendc-simulator:opendc-simulator-compute") include(":opendc-telemetry:opendc-telemetry-api") include(":opendc-telemetry:opendc-telemetry-sdk") -include(":opendc-telemetry:opendc-telemetry-compute") include(":opendc-trace:opendc-trace-api") include(":opendc-trace:opendc-trace-gwf") include(":opendc-trace:opendc-trace-swf") -- cgit v1.2.3 From b82ae73d064590094f79e26de355060135ed13fd Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 5 May 2022 11:33:47 +0200 Subject: refactor(workflow/service): Remove OpenTelemetry from "workflow" modules This change removes the OpenTelemetry integration from the OpenDC Workflow modules. Previously, we chose to integrate OpenTelemetry to provide a unified way to report metrics to the users. See the previous commit removing it from the "Compute" modules for the reasoning behind this change. --- .../opendc-workflow-service/build.gradle.kts | 2 - .../org/opendc/workflow/service/WorkflowService.kt | 7 +- .../service/internal/WorkflowServiceImpl.kt | 67 +---------------- .../service/scheduler/telemetry/SchedulerStats.kt | 42 +++++++++++ .../workflow/service/telemetry/SchedulerStats.kt | 42 ----------- .../opendc-workflow-workload/build.gradle.kts | 2 - .../workflow/workload/WorkflowServiceHelper.kt | 86 +++------------------- 7 files changed, 56 insertions(+), 192 deletions(-) create mode 100644 opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt delete mode 100644 opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt diff --git a/opendc-workflow/opendc-workflow-service/build.gradle.kts b/opendc-workflow/opendc-workflow-service/build.gradle.kts index 60b5eb13..b6365885 100644 --- a/opendc-workflow/opendc-workflow-service/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-service/build.gradle.kts @@ -30,7 +30,6 @@ plugins { dependencies { api(projects.opendcWorkflow.opendcWorkflowApi) api(projects.opendcCompute.opendcComputeApi) - api(projects.opendcTelemetry.opendcTelemetryApi) implementation(projects.opendcCommon) implementation(libs.kotlin.logging) @@ -38,7 +37,6 @@ dependencies { testImplementation(projects.opendcCompute.opendcComputeWorkload) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testImplementation(projects.opendcTrace.opendcTraceApi) - testImplementation(projects.opendcTelemetry.opendcTelemetrySdk) testRuntimeOnly(projects.opendcTrace.opendcTraceGwf) testRuntimeOnly(libs.log4j.slf4j) } diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt index b8bc0e33..2436c387 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/WorkflowService.kt @@ -22,7 +22,6 @@ package org.opendc.workflow.service -import io.opentelemetry.api.metrics.MeterProvider import org.opendc.compute.api.ComputeClient import org.opendc.workflow.api.Job import org.opendc.workflow.service.internal.WorkflowServiceImpl @@ -30,7 +29,7 @@ import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy -import org.opendc.workflow.service.telemetry.SchedulerStats +import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats import java.time.Clock import java.time.Duration import kotlin.coroutines.CoroutineContext @@ -63,7 +62,7 @@ public interface WorkflowService : AutoCloseable { * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. * @param meterProvider The meter provider to use. - * @param compute The compute client to use. + * @param compute The "Compute" client to use. * @param schedulingQuantum The scheduling quantum to use (minimum duration between scheduling cycles). * @param jobAdmissionPolicy The job admission policy to use. * @param jobOrderPolicy The job order policy to use. @@ -73,7 +72,6 @@ public interface WorkflowService : AutoCloseable { public operator fun invoke( context: CoroutineContext, clock: Clock, - meterProvider: MeterProvider, compute: ComputeClient, schedulingQuantum: Duration, jobAdmissionPolicy: JobAdmissionPolicy, @@ -84,7 +82,6 @@ public interface WorkflowService : AutoCloseable { return WorkflowServiceImpl( context, clock, - meterProvider, compute, schedulingQuantum, jobAdmissionPolicy, diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 9c7f18a2..899810a2 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -22,8 +22,6 @@ package org.opendc.workflow.service.internal -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import org.opendc.common.util.Pacer import org.opendc.compute.api.* @@ -34,7 +32,7 @@ import org.opendc.workflow.service.scheduler.job.JobAdmissionPolicy import org.opendc.workflow.service.scheduler.job.JobOrderPolicy import org.opendc.workflow.service.scheduler.task.TaskEligibilityPolicy import org.opendc.workflow.service.scheduler.task.TaskOrderPolicy -import org.opendc.workflow.service.telemetry.SchedulerStats +import org.opendc.workflow.service.scheduler.telemetry.SchedulerStats import java.time.Clock import java.time.Duration import java.util.* @@ -48,9 +46,8 @@ import kotlin.coroutines.resume public class WorkflowServiceImpl( context: CoroutineContext, private val clock: Clock, - meterProvider: MeterProvider, private val computeClient: ComputeClient, - private val schedulingQuantum: Duration, + schedulingQuantum: Duration, jobAdmissionPolicy: JobAdmissionPolicy, jobOrderPolicy: JobOrderPolicy, taskEligibilityPolicy: TaskEligibilityPolicy, @@ -61,11 +58,6 @@ public class WorkflowServiceImpl( */ private val scope = CoroutineScope(context + Job()) - /** - * The [Meter] to collect metrics of this service. - */ - private val meter = meterProvider.get("org.opendc.workflow.service") - /** * The incoming jobs ready to be processed by the scheduler. */ @@ -139,58 +131,11 @@ public class WorkflowServiceImpl( } } - /** - * The number of jobs that have been submitted to the service. - */ - private val submittedJobs = meter.counterBuilder("jobs.submitted") - .setDescription("Number of submitted jobs") - .setUnit("1") - .build() private var _workflowsSubmitted: Int = 0 - - /** - * The number of jobs that are running. - */ - private val runningJobs = meter.upDownCounterBuilder("jobs.active") - .setDescription("Number of jobs running") - .setUnit("1") - .build() private var _workflowsRunning: Int = 0 - - /** - * The number of jobs that have finished running. - */ - private val finishedJobs = meter.counterBuilder("jobs.finished") - .setDescription("Number of jobs that finished running") - .setUnit("1") - .build() private var _workflowsFinished: Int = 0 - - /** - * The number of tasks that have been submitted to the service. - */ - private val submittedTasks = meter.counterBuilder("tasks.submitted") - .setDescription("Number of submitted tasks") - .setUnit("1") - .build() private var _tasksSubmitted: Int = 0 - - /** - * The number of jobs that are running. - */ - private val runningTasks = meter.upDownCounterBuilder("tasks.active") - .setDescription("Number of tasks running") - .setUnit("1") - .build() private var _tasksRunning: Int = 0 - - /** - * The number of jobs that have finished running. - */ - private val finishedTasks = meter.counterBuilder("tasks.finished") - .setDescription("Number of tasks that finished running") - .setUnit("1") - .build() private var _tasksFinished: Int = 0 /** @@ -229,14 +174,12 @@ public class WorkflowServiceImpl( instance.state = TaskStatus.READY } - submittedTasks.add(1) _tasksSubmitted++ } instances.values.toCollection(jobInstance.tasks) incomingJobs += jobInstance rootListener.jobSubmitted(jobInstance) - submittedJobs.add(1) _workflowsSubmitted++ pacer.enqueue() @@ -283,7 +226,6 @@ public class WorkflowServiceImpl( jobQueue.add(jobInstance) activeJobs += jobInstance - runningJobs.add(1) _workflowsRunning++ rootListener.jobStarted(jobInstance) } @@ -363,7 +305,6 @@ public class WorkflowServiceImpl( ServerState.RUNNING -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() - runningTasks.add(1) _tasksRunning++ rootListener.taskStarted(task) } @@ -381,8 +322,6 @@ public class WorkflowServiceImpl( job.tasks.remove(task) activeTasks -= task - runningTasks.add(-1) - finishedTasks.add(1) _tasksRunning-- _tasksFinished++ rootListener.taskFinished(task) @@ -410,8 +349,6 @@ public class WorkflowServiceImpl( private fun finishJob(job: JobState) { activeJobs -= job - runningJobs.add(-1) - finishedJobs.add(1) _workflowsRunning-- _workflowsFinished++ rootListener.jobFinished(job) diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt new file mode 100644 index 00000000..608e82df --- /dev/null +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/scheduler/telemetry/SchedulerStats.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.workflow.service.scheduler.telemetry + +/** + * Statistics about the workflow scheduler. + * + * @property workflowsSubmitted The number of workflows submitted to the scheduler. + * @property workflowsRunning The number of workflows that are currently running. + * @property workflowsFinished The number of workflows that have completed since the scheduler started. + * @property tasksSubmitted The number of tasks submitted to the scheduler. + * @property tasksRunning The number of tasks that are currently running. + * @property tasksFinished The number of tasks that have completed. + */ +public data class SchedulerStats( + val workflowsSubmitted: Int, + val workflowsRunning: Int, + val workflowsFinished: Int, + val tasksSubmitted: Int, + val tasksRunning: Int, + val tasksFinished: Int +) diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt deleted file mode 100644 index 7c7d7c4d..00000000 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/telemetry/SchedulerStats.kt +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflow.service.telemetry - -/** - * Statistics about the workflow scheduler. - * - * @property workflowsSubmitted The number of workflows submitted to the scheduler. - * @property workflowsRunning The number of workflows that are currently running. - * @property workflowsFinished The number of workflows that have completed since the scheduler started. - * @property tasksSubmitted The number of tasks submitted to the scheduler. - * @property tasksRunning The number of tasks that are currently running. - * @property tasksFinished The number of tasks that have completed. - */ -public data class SchedulerStats( - val workflowsSubmitted: Int, - val workflowsRunning: Int, - val workflowsFinished: Int, - val tasksSubmitted: Int, - val tasksRunning: Int, - val tasksFinished: Int -) diff --git a/opendc-workflow/opendc-workflow-workload/build.gradle.kts b/opendc-workflow/opendc-workflow-workload/build.gradle.kts index b725a69c..17eadf29 100644 --- a/opendc-workflow/opendc-workflow-workload/build.gradle.kts +++ b/opendc-workflow/opendc-workflow-workload/build.gradle.kts @@ -32,6 +32,4 @@ dependencies { implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcTrace.opendcTraceApi) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(libs.opentelemetry.semconv) } diff --git a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt index a7d0ed6c..435d0190 100644 --- a/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt +++ b/opendc-workflow/opendc-workflow-workload/src/main/kotlin/org/opendc/workflow/workload/WorkflowServiceHelper.kt @@ -22,24 +22,13 @@ package org.opendc.workflow.workload -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.data.AggregationTemporality -import io.opentelemetry.sdk.metrics.export.MetricProducer -import io.opentelemetry.sdk.metrics.export.MetricReader -import io.opentelemetry.sdk.metrics.export.MetricReaderFactory -import io.opentelemetry.sdk.resources.Resource -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import org.opendc.compute.api.ComputeClient -import org.opendc.telemetry.sdk.toOtelClock import org.opendc.workflow.api.Job import org.opendc.workflow.service.WorkflowService import java.time.Clock -import java.util.* import kotlin.coroutines.CoroutineContext /** @@ -59,60 +48,16 @@ public class WorkflowServiceHelper( /** * The [WorkflowService] that is constructed by this runner. */ - public val service: WorkflowService - - /** - * The [MetricProducer] exposed by the [WorkflowService]. - */ - public lateinit var metricProducer: MetricProducer - private set - - /** - * The [MeterProvider] used for the service. - */ - private val _meterProvider: SdkMeterProvider - - /** - * The list of [MetricReader]s that have been registered with the runner. - */ - private val _metricReaders = mutableListOf() - - init { - val resource = Resource.builder() - .put(ResourceAttributes.SERVICE_NAME, "opendc-workflow") - .build() - - _meterProvider = SdkMeterProvider.builder() - .setClock(clock.toOtelClock()) - .setResource(resource) - .registerMetricReader { producer -> - metricProducer = producer - - val metricReaders = _metricReaders - object : MetricReader { - override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE - override fun flush(): CompletableResultCode { - return CompletableResultCode.ofAll(metricReaders.map { it.flush() }) - } - override fun shutdown(): CompletableResultCode { - return CompletableResultCode.ofAll(metricReaders.map { it.shutdown() }) - } - } - } - .build() - - service = WorkflowService( - context, - clock, - _meterProvider, - computeClient, - schedulerSpec.schedulingQuantum, - jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy, - jobOrderPolicy = schedulerSpec.jobOrderPolicy, - taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy, - taskOrderPolicy = schedulerSpec.taskOrderPolicy, - ) - } + public val service: WorkflowService = WorkflowService( + context, + clock, + computeClient, + schedulerSpec.schedulingQuantum, + jobAdmissionPolicy = schedulerSpec.jobAdmissionPolicy, + jobOrderPolicy = schedulerSpec.jobOrderPolicy, + taskEligibilityPolicy = schedulerSpec.taskEligibilityPolicy, + taskOrderPolicy = schedulerSpec.taskOrderPolicy, + ) /** * Run the specified list of [jobs] using the workflow service and suspend execution until all jobs have @@ -146,19 +91,8 @@ public class WorkflowServiceHelper( } } - /** - * Register a [MetricReader] for this helper. - * - * @param factory The factory for the reader to register. - */ - public fun registerMetricReader(factory: MetricReaderFactory) { - val reader = factory.apply(metricProducer) - _metricReaders.add(reader) - } - override fun close() { computeClient.close() service.close() - _meterProvider.close() } } -- cgit v1.2.3 From 0b584e261fdf34d662129b1b47f00711c0ce0779 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 6 May 2022 09:27:45 +0200 Subject: refactor(workflow/service): Remove OpenTelemetry from "FaaS" modules This change removes the OpenTelemetry integration from the OpenDC FaaS modules. Previously, we chose to integrate OpenTelemetry to provide a unified way to report metrics to the users. See the previous commit removing it from the "Compute" modules for the reasoning behind this change. --- .../build.gradle.kts | 1 - .../experiments/serverless/ServerlessExperiment.kt | 16 ++-- opendc-faas/opendc-faas-service/build.gradle.kts | 2 - .../kotlin/org/opendc/faas/service/FaaSService.kt | 9 +-- .../org/opendc/faas/service/FunctionObject.kt | 94 +--------------------- .../faas/service/internal/FaaSServiceImpl.kt | 40 ++------- .../org/opendc/faas/service/FaaSServiceTest.kt | 23 +++--- .../opendc/faas/simulator/SimFaaSServiceTest.kt | 6 +- 8 files changed, 31 insertions(+), 160 deletions(-) diff --git a/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts b/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts index b96647a6..a6391986 100644 --- a/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts @@ -33,7 +33,6 @@ dependencies { implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcFaas.opendcFaasService) implementation(projects.opendcFaas.opendcFaasSimulator) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) implementation(libs.kotlin.logging) implementation(libs.config) } diff --git a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt index 3312d6c0..1c357f67 100644 --- a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt +++ b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt @@ -23,8 +23,6 @@ package org.opendc.experiments.serverless import com.typesafe.config.ConfigFactory -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.SdkMeterProvider import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -44,7 +42,6 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.sdk.toOtelClock import java.io.File import java.time.Duration import java.util.* @@ -76,17 +73,18 @@ public class ServerlessExperiment : Experiment("Serverless") { private val coldStartModel by anyOf(ColdStartModel.LAMBDA, ColdStartModel.AZURE, ColdStartModel.GOOGLE) override fun doRun(repeat: Int): Unit = runBlockingSimulation { - val meterProvider: MeterProvider = SdkMeterProvider - .builder() - .setClock(clock.toOtelClock()) - .build() - val trace = ServerlessTraceReader().parse(File(config.getString("trace-path"))) val traceById = trace.associateBy { it.id } val delayInjector = StochasticDelayInjector(coldStartModel, Random()) val deployer = SimFunctionDeployer(clock, this, createMachineModel(), delayInjector) { FunctionTraceWorkload(traceById.getValue(it.name)) } val service = - FaaSService(coroutineContext, clock, meterProvider, deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMinutes(10))) + FaaSService( + coroutineContext, + clock, + deployer, + routingPolicy, + FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMinutes(10)) + ) val client = service.newClient() coroutineScope { diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts index 1803ae69..34f5b7ea 100644 --- a/opendc-faas/opendc-faas-service/build.gradle.kts +++ b/opendc-faas/opendc-faas-service/build.gradle.kts @@ -29,11 +29,9 @@ plugins { dependencies { api(projects.opendcFaas.opendcFaasApi) - api(projects.opendcTelemetry.opendcTelemetryApi) api(libs.commons.math3) implementation(projects.opendcCommon) implementation(libs.kotlin.logging) - implementation(libs.opentelemetry.semconv) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testRuntimeOnly(libs.log4j.slf4j) diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt index f7dc3c1f..7b40d867 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt @@ -22,8 +22,6 @@ package org.opendc.faas.service -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider import org.opendc.faas.api.FaaSClient import org.opendc.faas.api.FaaSFunction import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy @@ -33,6 +31,7 @@ import org.opendc.faas.service.router.RoutingPolicy import org.opendc.faas.service.telemetry.FunctionStats import org.opendc.faas.service.telemetry.SchedulerStats import java.time.Clock +import java.time.Duration import kotlin.coroutines.CoroutineContext /** @@ -65,20 +64,20 @@ public interface FaaSService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. - * @param meterProvider The [MeterProvider] to create a [Meter] with. * @param deployer the [FunctionDeployer] to use for deploying function instances. * @param routingPolicy The policy to route function invocations. * @param terminationPolicy The policy for terminating function instances. + * @param quantum The scheduling quantum of the service (100 ms default) */ public operator fun invoke( context: CoroutineContext, clock: Clock, - meterProvider: MeterProvider, deployer: FunctionDeployer, routingPolicy: RoutingPolicy, terminationPolicy: FunctionTerminationPolicy, + quantum: Duration = Duration.ofMillis(100) ): FaaSService { - return FaaSServiceImpl(context, clock, meterProvider, deployer, routingPolicy, terminationPolicy) + return FaaSServiceImpl(context, clock, deployer, routingPolicy, terminationPolicy, quantum) } } } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt index 52fcffa1..1cc33f6f 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt @@ -22,13 +22,6 @@ package org.opendc.faas.service -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes -import io.opentelemetry.api.metrics.LongCounter -import io.opentelemetry.api.metrics.LongHistogram -import io.opentelemetry.api.metrics.LongUpDownCounter -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.telemetry.FunctionStats @@ -38,7 +31,6 @@ import java.util.* * An [FunctionObject] represents the service's view of a serverless function. */ public class FunctionObject( - meter: Meter, public val uid: UUID, name: String, allocatedMemory: Long, @@ -46,88 +38,16 @@ public class FunctionObject( meta: Map ) : AutoCloseable { /** - * The attributes of this function. + * Metrics tracked per function. */ - private val attributes: Attributes = Attributes.builder() - .put(ResourceAttributes.FAAS_ID, uid.toString()) - .put(ResourceAttributes.FAAS_NAME, name) - .put(ResourceAttributes.FAAS_MAX_MEMORY, allocatedMemory) - .put(AttributeKey.stringArrayKey("faas.labels"), labels.map { (k, v) -> "$k:$v" }) - .build() - - /** - * The total amount of function invocations received by the function. - */ - private val invocations: LongCounter = meter.counterBuilder("function.invocations.total") - .setDescription("Number of function invocations") - .setUnit("1") - .build() private var _invocations = 0L - - /** - * The amount of function invocations that could be handled directly. - */ - private val timelyInvocations: LongCounter = meter.counterBuilder("function.invocations.warm") - .setDescription("Number of function invocations handled directly") - .setUnit("1") - .build() private var _timelyInvocations = 0L - - /** - * The amount of function invocations that were delayed due to function deployment. - */ - private val delayedInvocations: LongCounter = meter.counterBuilder("function.invocations.cold") - .setDescription("Number of function invocations that are delayed") - .setUnit("1") - .build() private var _delayedInvocations = 0L - - /** - * The amount of function invocations that failed. - */ - private val failedInvocations: LongCounter = meter.counterBuilder("function.invocations.failed") - .setDescription("Number of function invocations that failed") - .setUnit("1") - .build() private var _failedInvocations = 0L - - /** - * The amount of instances for this function. - */ - private val activeInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.active") - .setDescription("Number of active function instances") - .setUnit("1") - .build() private var _activeInstances = 0 - - /** - * The amount of idle instances for this function. - */ - private val idleInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle") - .setDescription("Number of idle function instances") - .setUnit("1") - .build() private var _idleInstances = 0 - - /** - * The time that the function waited. - */ - private val waitTime: LongHistogram = meter.histogramBuilder("function.time.wait") - .ofLongs() - .setDescription("Time the function has to wait before being started") - .setUnit("ms") - .build() private val _waitTime = DescriptiveStatistics() .apply { windowSize = 100 } - - /** - * The time that the function was running. - */ - private val activeTime: LongHistogram = meter.histogramBuilder("function.time.active") - .ofLongs() - .setDescription("Time the function was running") - .setUnit("ms") - .build() private val _activeTime = DescriptiveStatistics() .apply { windowSize = 100 } @@ -150,7 +70,6 @@ public class FunctionObject( * Report a scheduled invocation. */ internal fun reportSubmission() { - invocations.add(1, attributes) _invocations++ } @@ -159,13 +78,9 @@ public class FunctionObject( */ internal fun reportDeployment(isDelayed: Boolean) { if (isDelayed) { - delayedInvocations.add(1, attributes) _delayedInvocations++ - - idleInstances.add(1, attributes) _idleInstances++ } else { - timelyInvocations.add(1, attributes) _timelyInvocations++ } } @@ -175,12 +90,9 @@ public class FunctionObject( */ internal fun reportStart(start: Long, submitTime: Long) { val wait = start - submitTime - waitTime.record(wait, attributes) _waitTime.addValue(wait.toDouble()) - idleInstances.add(-1, attributes) _idleInstances-- - activeInstances.add(1, attributes) _activeInstances++ } @@ -188,7 +100,6 @@ public class FunctionObject( * Report the failure of a function invocation. */ internal fun reportFailure() { - failedInvocations.add(1, attributes) _failedInvocations++ } @@ -196,11 +107,8 @@ public class FunctionObject( * Report the end of a function invocation. */ internal fun reportEnd(duration: Long) { - activeTime.record(duration, attributes) _activeTime.addValue(duration.toDouble()) - idleInstances.add(1, attributes) _idleInstances++ - activeInstances.add(-1, attributes) _activeInstances-- } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index ce3b2b98..4ee55dea 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -22,8 +22,6 @@ package org.opendc.faas.service.internal -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import kotlinx.coroutines.intrinsics.startCoroutineCancellable import mu.KotlinLogging @@ -42,6 +40,7 @@ import org.opendc.faas.service.telemetry.FunctionStats import org.opendc.faas.service.telemetry.SchedulerStats import java.lang.IllegalStateException import java.time.Clock +import java.time.Duration import java.util.* import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext @@ -57,10 +56,10 @@ import kotlin.coroutines.resumeWithException internal class FaaSServiceImpl( context: CoroutineContext, private val clock: Clock, - meterProvider: MeterProvider, private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy, - private val terminationPolicy: FunctionTerminationPolicy + private val terminationPolicy: FunctionTerminationPolicy, + quantum: Duration ) : FaaSService, FunctionInstanceListener { /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. @@ -72,15 +71,10 @@ internal class FaaSServiceImpl( */ private val logger = KotlinLogging.logger {} - /** - * The [Meter] that collects the metrics of this service. - */ - private val meter = meterProvider.get("org.opendc.faas.service") - /** * The [Pacer] to use for scheduling the scheduler cycles. */ - private val pacer = Pacer(scope.coroutineContext, clock, quantum = 100) { doSchedule() } + private val pacer = Pacer(scope.coroutineContext, clock, quantum = quantum.toMillis()) { doSchedule() } /** * The [Random] instance used to generate unique identifiers for the objects. @@ -99,30 +93,10 @@ internal class FaaSServiceImpl( private val queue = ArrayDeque() /** - * The total amount of function invocations received by the service. + * Metrics tracked by the service. */ - private val _invocations = meter.counterBuilder("service.invocations.total") - .setDescription("Number of function invocations") - .setUnit("1") - .build() private var totalInvocations = 0L - - /** - * The amount of function invocations that could be handled directly. - */ - private val _timelyInvocations = meter.counterBuilder("service.invocations.warm") - .setDescription("Number of function invocations handled directly") - .setUnit("1") - .build() private var timelyInvocations = 0L - - /** - * The amount of function invocations that were delayed due to function deployment. - */ - private val _delayedInvocations = meter.counterBuilder("service.invocations.cold") - .setDescription("Number of function invocations that are delayed") - .setUnit("1") - .build() private var delayedInvocations = 0L override fun newClient(): FaaSClient { @@ -165,7 +139,6 @@ internal class FaaSServiceImpl( val uid = UUID(clock.millis(), random.nextLong()) val function = FunctionObject( - meter, uid, name, memorySize, @@ -232,7 +205,6 @@ internal class FaaSServiceImpl( } val instance = if (activeInstance != null) { - _timelyInvocations.add(1) timelyInvocations++ function.reportDeployment(isDelayed = false) @@ -242,7 +214,6 @@ internal class FaaSServiceImpl( instances.add(instance) terminationPolicy.enqueue(instance) - _delayedInvocations.add(1) delayedInvocations++ function.reportDeployment(isDelayed = true) @@ -271,7 +242,6 @@ internal class FaaSServiceImpl( suspend fun invoke(function: FunctionObject) { check(function.uid in functions) { "Function does not exist (anymore)" } - _invocations.add(1) totalInvocations++ function.reportSubmission() diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt index 1612e10b..560039c1 100644 --- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt @@ -23,8 +23,6 @@ package org.opendc.faas.service import io.mockk.* -import io.opentelemetry.api.metrics.MeterProvider -import kotlinx.coroutines.ExperimentalCoroutinesApi import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow @@ -39,12 +37,11 @@ import java.util.* /** * Test suite for the [FaaSService] implementation. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class FaaSServiceTest { @Test fun testClientState() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = assertDoesNotThrow { service.newClient() } assertDoesNotThrow { client.close() } @@ -58,7 +55,7 @@ internal class FaaSServiceTest { @Test fun testClientInvokeUnknown() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -67,7 +64,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCreation() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -78,7 +75,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionQuery() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -91,7 +88,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindById() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -104,7 +101,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindByName() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -117,7 +114,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDuplicateName() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -128,7 +125,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDelete() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -142,7 +139,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCannotInvokeDeleted() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -155,7 +152,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionInvoke() = runBlockingSimulation { val deployer = mockk() - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), deployer, mockk(), mockk(relaxUnitFun = true)) + val service = FaaSService(coroutineContext, clock, deployer, mockk(), mockk(relaxUnitFun = true)) every { deployer.deploy(any(), any()) } answers { object : FunctionInstance { diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt index 792a8584..d528558c 100644 --- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt @@ -24,7 +24,6 @@ package org.opendc.faas.simulator import io.mockk.coVerify import io.mockk.spyk -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.yield @@ -78,7 +77,10 @@ internal class SimFaaSServiceTest { val delayInjector = StochasticDelayInjector(ColdStartModel.GOOGLE, random) val deployer = SimFunctionDeployer(clock, this, machineModel, delayInjector) { workload } val service = FaaSService( - coroutineContext, clock, MeterProvider.noop(), deployer, RandomRoutingPolicy(), + coroutineContext, + clock, + deployer, + RandomRoutingPolicy(), FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMillis(10000)) ) -- cgit v1.2.3 From 0e8ad565a78dd194e687003e5ccc8ccf9b28667f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 6 May 2022 10:22:35 +0200 Subject: refactor(exp/tf20): Remove OpenTelemetry from TF20 experiment This change removes the OpenTelemetry integration from the OpenDC Tensorflow 2020 experiments. Previously, we chose to integrate OpenTelemetry to provide a unified way to report metrics to the users. See the previous commit removing it from the "Compute" modules for the reasoning behind this change. --- .../opendc-experiments-tf20/build.gradle.kts | 1 - .../opendc/experiments/tf20/TensorFlowExperiment.kt | 13 ++----------- .../org/opendc/experiments/tf20/core/SimTFDevice.kt | 18 +----------------- .../opendc/experiments/tf20/core/SimTFDeviceTest.kt | 14 +++++++++----- 4 files changed, 12 insertions(+), 34 deletions(-) diff --git a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts index 5762ce64..f61c8fef 100644 --- a/opendc-experiments/opendc-experiments-tf20/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-tf20/build.gradle.kts @@ -32,7 +32,6 @@ dependencies { api(projects.opendcHarness.opendcHarnessApi) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) implementation(projects.opendcCommon) implementation(libs.kotlin.logging) diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt index 2153a862..19236029 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/TensorFlowExperiment.kt @@ -22,8 +22,6 @@ package org.opendc.experiments.tf20 -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.SdkMeterProvider import org.opendc.experiments.tf20.core.SimTFDevice import org.opendc.experiments.tf20.distribute.* import org.opendc.experiments.tf20.keras.AlexNet @@ -32,7 +30,6 @@ import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.compute.power.LinearPowerModel import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.sdk.toOtelClock /** * Experiments with the TensorFlow simulation model. @@ -49,17 +46,11 @@ public class TensorFlowExperiment : Experiment(name = "tf20") { private val batchSize by anyOf(16, 32, 64, 128) override fun doRun(repeat: Int): Unit = runBlockingSimulation { - val meterProvider: MeterProvider = SdkMeterProvider - .builder() - .setClock(clock.toOtelClock()) - .build() - val meter = meterProvider.get("opendc-tf20") - val envInput = checkNotNull(TensorFlowExperiment::class.java.getResourceAsStream(environmentFile)) val def = MLEnvironmentReader().readEnvironment(envInput).first() val device = SimTFDevice( - def.uid, def.meta["gpu"] as Boolean, coroutineContext, clock, meter, def.model.cpus[0], - def.model.memory[0], LinearPowerModel(250.0, 60.0) + def.uid, def.meta["gpu"] as Boolean, coroutineContext, clock, def.model.cpus[0], def.model.memory[0], + LinearPowerModel(250.0, 60.0) ) val strategy = OneDeviceStrategy(device) diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index 99948c8e..d2105196 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.tf20.core -import io.opentelemetry.api.metrics.Meter import kotlinx.coroutines.* import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.SimMachine @@ -50,7 +49,6 @@ public class SimTFDevice( override val isGpu: Boolean, context: CoroutineContext, clock: Clock, - meter: Meter, pu: ProcessingUnit, private val memory: MemoryUnit, powerModel: PowerModel @@ -69,21 +67,9 @@ public class SimTFDevice( ) /** - * The usage of the device. + * Metrics collected by the device. */ - private val _usage = meter.histogramBuilder("device.usage") - .setDescription("The amount of device resources used") - .setUnit("MHz") - .build() private var _resourceUsage = 0.0 - - /** - * The power draw of the device. - */ - private val _power = meter.histogramBuilder("device.power") - .setDescription("The power draw of the device") - .setUnit("W") - .build() private var _powerUsage = 0.0 private var _energyUsage = 0.0 @@ -171,9 +157,7 @@ public class SimTFDevice( } override fun onConverge(conn: FlowConnection, now: Long) { - _usage.record(conn.rate) _resourceUsage = conn.rate - _power.record(machine.psu.powerDraw) _powerUsage = machine.powerUsage _energyUsage = machine.energyUsage } diff --git a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt index 0d5fbebb..fd18a3a7 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/test/kotlin/org/opendc/experiments/tf20/core/SimTFDeviceTest.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.tf20.core -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import org.junit.jupiter.api.Assertions.assertAll @@ -41,14 +40,19 @@ import java.util.* internal class SimTFDeviceTest { @Test fun testSmoke() = runBlockingSimulation { - val meterProvider: MeterProvider = MeterProvider.noop() - val meter = meterProvider.get("opendc-tf20") - val puNode = ProcessingNode("NVIDIA", "Tesla V100", "unknown", 1) val pu = ProcessingUnit(puNode, 0, 960 * 1230.0) val memory = MemoryUnit("NVIDIA", "Tesla V100", 877.0, 32_000) - val device = SimTFDevice(UUID.randomUUID(), isGpu = true, coroutineContext, clock, meter, pu, memory, LinearPowerModel(250.0, 100.0)) + val device = SimTFDevice( + UUID.randomUUID(), + isGpu = true, + coroutineContext, + clock, + pu, + memory, + LinearPowerModel(250.0, 100.0) + ) // Load 1 GiB into GPU memory device.load(1000) -- cgit v1.2.3 From 260e2228afea08868e8f7f07233b1861b2d7f0c7 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 6 May 2022 14:01:36 +0200 Subject: refactor(telemetry): Remove dependency on OpenTelemetry SDK This change removes the dependency on the OpenTelemetry SDK. Instead, we'll only expose metrics via the OpenTelemetry API in the future via adapter classes. --- gradle/libs.versions.toml | 9 -- .../opendc-compute-workload/build.gradle.kts | 1 - opendc-telemetry/build.gradle.kts | 27 ------ .../opendc-telemetry-api/build.gradle.kts | 32 ------- .../opendc-telemetry-sdk/build.gradle.kts | 37 ------- .../org/opendc/telemetry/sdk/OtelClockAdapter.kt | 39 -------- .../sdk/metrics/export/CoroutineMetricReader.kt | 106 --------------------- settings.gradle.kts | 2 - 8 files changed, 253 deletions(-) delete mode 100644 opendc-telemetry/build.gradle.kts delete mode 100644 opendc-telemetry/opendc-telemetry-api/build.gradle.kts delete mode 100644 opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts delete mode 100644 opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt delete mode 100644 opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index b05af368..55975919 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -21,9 +21,6 @@ ktlint-gradle = "10.2.1" log4j = "2.17.2" microprofile-openapi = "3.0" mockk = "1.12.3" -opentelemetry-main = "1.12.0" -opentelemetry-metrics = "1.12.0-alpha" -opentelemetry-semconv = "1.12.0-alpha" parquet = "1.12.2" progressbar = "0.9.2" quarkus = "2.8.1.Final" @@ -45,12 +42,6 @@ slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" } log4j-slf4j = { module = "org.apache.logging.log4j:log4j-slf4j-impl", version.ref = "log4j" } sentry-log4j2 = { module = "io.sentry:sentry-log4j2", version.ref = "sentry" } -# Telemetry -opentelemetry-api = { module = "io.opentelemetry:opentelemetry-api", version.ref = "opentelemetry-main" } -opentelemetry-sdk-main = { module = "io.opentelemetry:opentelemetry-sdk", version.ref = "opentelemetry-main" } -opentelemetry-sdk-metrics = { module = "io.opentelemetry:opentelemetry-sdk-metrics", version.ref = "opentelemetry-metrics" } -opentelemetry-semconv = { module = "io.opentelemetry:opentelemetry-semconv", version.ref = "opentelemetry-semconv" } - # Testing junit-jupiter-api = { module = "org.junit.jupiter:junit-jupiter-api", version.ref = "junit-jupiter" } junit-jupiter-engine = { module = "org.junit.jupiter:junit-jupiter-engine", version.ref = "junit-jupiter" } diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts index e8a7c9fd..7b5fe6c1 100644 --- a/opendc-compute/opendc-compute-workload/build.gradle.kts +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -34,7 +34,6 @@ dependencies { implementation(projects.opendcTrace.opendcTraceParquet) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) - implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) diff --git a/opendc-telemetry/build.gradle.kts b/opendc-telemetry/build.gradle.kts deleted file mode 100644 index 6473a29e..00000000 --- a/opendc-telemetry/build.gradle.kts +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Telemetry processing for OpenDC" - -subprojects { - group = "org.opendc.telemetry" -} diff --git a/opendc-telemetry/opendc-telemetry-api/build.gradle.kts b/opendc-telemetry/opendc-telemetry-api/build.gradle.kts deleted file mode 100644 index 32a36d68..00000000 --- a/opendc-telemetry/opendc-telemetry-api/build.gradle.kts +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Telemetry API for OpenDC" - -/* Build configuration */ -plugins { - `kotlin-library-conventions` -} - -dependencies { - api(libs.opentelemetry.api) -} diff --git a/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts b/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts deleted file mode 100644 index 4b3241bc..00000000 --- a/opendc-telemetry/opendc-telemetry-sdk/build.gradle.kts +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -description = "Telemetry SDK for OpenDC" - -/* Build configuration */ -plugins { - `kotlin-library-conventions` -} - -dependencies { - api(projects.opendcTelemetry.opendcTelemetryApi) - api(libs.kotlinx.coroutines) - api(libs.opentelemetry.sdk.main) - api(libs.opentelemetry.sdk.metrics) - - implementation(libs.kotlin.logging) -} diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt deleted file mode 100644 index cd191652..00000000 --- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/OtelClockAdapter.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.telemetry.sdk - -import io.opentelemetry.sdk.common.Clock - -/** - * An adapter class that bridges a [java.time.Clock] to a [Clock] - */ -public class OtelClockAdapter(private val clock: java.time.Clock) : Clock { - override fun now(): Long = nanoTime() - - override fun nanoTime(): Long = clock.millis() * 1_000_000L -} - -/** - * Convert the specified [java.time.Clock] to a [Clock]. - */ -public fun java.time.Clock.toOtelClock(): Clock = OtelClockAdapter(this) diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt deleted file mode 100644 index ca5da079..00000000 --- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.telemetry.sdk.metrics.export - -import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.data.AggregationTemporality -import io.opentelemetry.sdk.metrics.export.MetricExporter -import io.opentelemetry.sdk.metrics.export.MetricProducer -import io.opentelemetry.sdk.metrics.export.MetricReader -import io.opentelemetry.sdk.metrics.export.MetricReaderFactory -import kotlinx.coroutines.* -import mu.KotlinLogging -import java.time.Duration - -/** - * A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every - * export interval. - * - * The reader runs in a [CoroutineScope] which enables collection of metrics in environments with a custom clock. - * - * @param scope The [CoroutineScope] to run the reader in. - * @param producer The metric producer to gather metrics from. - * @param exporter The export to export the metrics to. - * @param exportInterval The export interval. - */ -public class CoroutineMetricReader private constructor( - scope: CoroutineScope, - private val producer: MetricProducer, - private val exporter: MetricExporter, - private val exportInterval: Duration -) : MetricReader { - private val logger = KotlinLogging.logger {} - - /** - * The background job that is responsible for collecting the metrics every cycle. - */ - private val job = scope.launch { - val intervalMs = exportInterval.toMillis() - - try { - while (isActive) { - delay(intervalMs) - - try { - val metrics = producer.collectAllMetrics() - val result = exporter.export(metrics) - result.whenComplete { - if (!result.isSuccess) { - logger.warn { "Exporter failed" } - } - } - } catch (cause: Throwable) { - logger.warn(cause) { "Exporter threw an Exception" } - } - } - } finally { - exporter.shutdown() - } - } - - override fun getPreferredTemporality(): AggregationTemporality = AggregationTemporality.CUMULATIVE - - override fun flush(): CompletableResultCode { - return exporter.flush() - } - - override fun shutdown(): CompletableResultCode { - job.cancel() - return CompletableResultCode.ofSuccess() - } - - public companion object { - /** - * Construct a [MetricReaderFactory] for this metric reader. - */ - public operator fun invoke( - scope: CoroutineScope, - exporter: MetricExporter, - exportInterval: Duration = Duration.ofMinutes(5) - ): MetricReaderFactory { - return MetricReaderFactory { producer -> - CoroutineMetricReader(scope, producer, exporter, exportInterval) - } - } - } -} diff --git a/settings.gradle.kts b/settings.gradle.kts index 06289e72..f651f4c1 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -47,8 +47,6 @@ include(":opendc-simulator:opendc-simulator-flow") include(":opendc-simulator:opendc-simulator-power") include(":opendc-simulator:opendc-simulator-network") include(":opendc-simulator:opendc-simulator-compute") -include(":opendc-telemetry:opendc-telemetry-api") -include(":opendc-telemetry:opendc-telemetry-sdk") include(":opendc-trace:opendc-trace-api") include(":opendc-trace:opendc-trace-gwf") include(":opendc-trace:opendc-trace-swf") -- cgit v1.2.3