diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-06 19:04:03 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-05-06 19:04:03 +0200 |
| commit | c3d8d967f82f39f1ef461d5687eb68fb867336c5 (patch) | |
| tree | 2e9938f63c42e5d02fe203e049377d1d17b5d782 /opendc-faas | |
| parent | a9657e4fa3b15e2c1c11884b5a250b0861bcc21d (diff) | |
| parent | 260e2228afea08868e8f7f07233b1861b2d7f0c7 (diff) | |
merge: Move OpenTelemetry integration outside core modules (#81)
This change removes the OpenTelemetry integration from the OpenDC modules.
Previously, we chose to integrate OpenTelemetry to provide a unified way to
report metrics to the users.
Although this worked as expected, the overhead of the OpenTelemetry when
collecting metrics during simulation was considerable and lacked more
optimization opportunities (other than providing a separate API
implementation). Furthermore, since we were tied to OpenTelemetry's SDK
implementation, we experienced issues with throttling and registering
multiple instruments.
We will instead use another approach, where we expose the core metrics
in OpenDC via specialized interfaces (see #80) such that
access is fast and can be done without having to interface with
OpenTelemetry. In addition, we will provide an adapter to that is able
to forward these metrics to OpenTelemetry implementations, so we can
still integrate with the wider ecosystem.
## Implementation Notes :hammer_and_pick:
* Remove OpenTelemetry from "compute" modules
* Remove OpenTelemetry from "workflow" modules
* Remove OpenTelemetry from "FaaS" modules
* Remove OpenTelemetry from TF20 experiment
* Remove dependency on OpenTelemetry SDK
## External Dependencies :four_leaf_clover:
* N/A
## Breaking API Changes :warning:
* Metrics are not anymore directly exposed via OpenTelemetry. Instead, an adapter needs to be used to access the data via OpenTelemetry.
Diffstat (limited to 'opendc-faas')
6 files changed, 24 insertions, 150 deletions
diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts index 1803ae69..34f5b7ea 100644 --- a/opendc-faas/opendc-faas-service/build.gradle.kts +++ b/opendc-faas/opendc-faas-service/build.gradle.kts @@ -29,11 +29,9 @@ plugins { dependencies { api(projects.opendcFaas.opendcFaasApi) - api(projects.opendcTelemetry.opendcTelemetryApi) api(libs.commons.math3) implementation(projects.opendcCommon) implementation(libs.kotlin.logging) - implementation(libs.opentelemetry.semconv) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testRuntimeOnly(libs.log4j.slf4j) diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt index f7dc3c1f..7b40d867 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt @@ -22,8 +22,6 @@ package org.opendc.faas.service -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider import org.opendc.faas.api.FaaSClient import org.opendc.faas.api.FaaSFunction import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy @@ -33,6 +31,7 @@ import org.opendc.faas.service.router.RoutingPolicy import org.opendc.faas.service.telemetry.FunctionStats import org.opendc.faas.service.telemetry.SchedulerStats import java.time.Clock +import java.time.Duration import kotlin.coroutines.CoroutineContext /** @@ -65,20 +64,20 @@ public interface FaaSService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. - * @param meterProvider The [MeterProvider] to create a [Meter] with. * @param deployer the [FunctionDeployer] to use for deploying function instances. * @param routingPolicy The policy to route function invocations. * @param terminationPolicy The policy for terminating function instances. + * @param quantum The scheduling quantum of the service (100 ms default) */ public operator fun invoke( context: CoroutineContext, clock: Clock, - meterProvider: MeterProvider, deployer: FunctionDeployer, routingPolicy: RoutingPolicy, terminationPolicy: FunctionTerminationPolicy, + quantum: Duration = Duration.ofMillis(100) ): FaaSService { - return FaaSServiceImpl(context, clock, meterProvider, deployer, routingPolicy, terminationPolicy) + return FaaSServiceImpl(context, clock, deployer, routingPolicy, terminationPolicy, quantum) } } } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt index 52fcffa1..1cc33f6f 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt @@ -22,13 +22,6 @@ package org.opendc.faas.service -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes -import io.opentelemetry.api.metrics.LongCounter -import io.opentelemetry.api.metrics.LongHistogram -import io.opentelemetry.api.metrics.LongUpDownCounter -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.telemetry.FunctionStats @@ -38,7 +31,6 @@ import java.util.* * An [FunctionObject] represents the service's view of a serverless function. */ public class FunctionObject( - meter: Meter, public val uid: UUID, name: String, allocatedMemory: Long, @@ -46,88 +38,16 @@ public class FunctionObject( meta: Map<String, Any> ) : AutoCloseable { /** - * The attributes of this function. + * Metrics tracked per function. */ - private val attributes: Attributes = Attributes.builder() - .put(ResourceAttributes.FAAS_ID, uid.toString()) - .put(ResourceAttributes.FAAS_NAME, name) - .put(ResourceAttributes.FAAS_MAX_MEMORY, allocatedMemory) - .put(AttributeKey.stringArrayKey("faas.labels"), labels.map { (k, v) -> "$k:$v" }) - .build() - - /** - * The total amount of function invocations received by the function. - */ - private val invocations: LongCounter = meter.counterBuilder("function.invocations.total") - .setDescription("Number of function invocations") - .setUnit("1") - .build() private var _invocations = 0L - - /** - * The amount of function invocations that could be handled directly. - */ - private val timelyInvocations: LongCounter = meter.counterBuilder("function.invocations.warm") - .setDescription("Number of function invocations handled directly") - .setUnit("1") - .build() private var _timelyInvocations = 0L - - /** - * The amount of function invocations that were delayed due to function deployment. - */ - private val delayedInvocations: LongCounter = meter.counterBuilder("function.invocations.cold") - .setDescription("Number of function invocations that are delayed") - .setUnit("1") - .build() private var _delayedInvocations = 0L - - /** - * The amount of function invocations that failed. - */ - private val failedInvocations: LongCounter = meter.counterBuilder("function.invocations.failed") - .setDescription("Number of function invocations that failed") - .setUnit("1") - .build() private var _failedInvocations = 0L - - /** - * The amount of instances for this function. - */ - private val activeInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.active") - .setDescription("Number of active function instances") - .setUnit("1") - .build() private var _activeInstances = 0 - - /** - * The amount of idle instances for this function. - */ - private val idleInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle") - .setDescription("Number of idle function instances") - .setUnit("1") - .build() private var _idleInstances = 0 - - /** - * The time that the function waited. - */ - private val waitTime: LongHistogram = meter.histogramBuilder("function.time.wait") - .ofLongs() - .setDescription("Time the function has to wait before being started") - .setUnit("ms") - .build() private val _waitTime = DescriptiveStatistics() .apply { windowSize = 100 } - - /** - * The time that the function was running. - */ - private val activeTime: LongHistogram = meter.histogramBuilder("function.time.active") - .ofLongs() - .setDescription("Time the function was running") - .setUnit("ms") - .build() private val _activeTime = DescriptiveStatistics() .apply { windowSize = 100 } @@ -150,7 +70,6 @@ public class FunctionObject( * Report a scheduled invocation. */ internal fun reportSubmission() { - invocations.add(1, attributes) _invocations++ } @@ -159,13 +78,9 @@ public class FunctionObject( */ internal fun reportDeployment(isDelayed: Boolean) { if (isDelayed) { - delayedInvocations.add(1, attributes) _delayedInvocations++ - - idleInstances.add(1, attributes) _idleInstances++ } else { - timelyInvocations.add(1, attributes) _timelyInvocations++ } } @@ -175,12 +90,9 @@ public class FunctionObject( */ internal fun reportStart(start: Long, submitTime: Long) { val wait = start - submitTime - waitTime.record(wait, attributes) _waitTime.addValue(wait.toDouble()) - idleInstances.add(-1, attributes) _idleInstances-- - activeInstances.add(1, attributes) _activeInstances++ } @@ -188,7 +100,6 @@ public class FunctionObject( * Report the failure of a function invocation. */ internal fun reportFailure() { - failedInvocations.add(1, attributes) _failedInvocations++ } @@ -196,11 +107,8 @@ public class FunctionObject( * Report the end of a function invocation. */ internal fun reportEnd(duration: Long) { - activeTime.record(duration, attributes) _activeTime.addValue(duration.toDouble()) - idleInstances.add(1, attributes) _idleInstances++ - activeInstances.add(-1, attributes) _activeInstances-- } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index ce3b2b98..4ee55dea 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -22,8 +22,6 @@ package org.opendc.faas.service.internal -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import kotlinx.coroutines.intrinsics.startCoroutineCancellable import mu.KotlinLogging @@ -42,6 +40,7 @@ import org.opendc.faas.service.telemetry.FunctionStats import org.opendc.faas.service.telemetry.SchedulerStats import java.lang.IllegalStateException import java.time.Clock +import java.time.Duration import java.util.* import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext @@ -57,10 +56,10 @@ import kotlin.coroutines.resumeWithException internal class FaaSServiceImpl( context: CoroutineContext, private val clock: Clock, - meterProvider: MeterProvider, private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy, - private val terminationPolicy: FunctionTerminationPolicy + private val terminationPolicy: FunctionTerminationPolicy, + quantum: Duration ) : FaaSService, FunctionInstanceListener { /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. @@ -73,14 +72,9 @@ internal class FaaSServiceImpl( private val logger = KotlinLogging.logger {} /** - * The [Meter] that collects the metrics of this service. - */ - private val meter = meterProvider.get("org.opendc.faas.service") - - /** * The [Pacer] to use for scheduling the scheduler cycles. */ - private val pacer = Pacer(scope.coroutineContext, clock, quantum = 100) { doSchedule() } + private val pacer = Pacer(scope.coroutineContext, clock, quantum = quantum.toMillis()) { doSchedule() } /** * The [Random] instance used to generate unique identifiers for the objects. @@ -99,30 +93,10 @@ internal class FaaSServiceImpl( private val queue = ArrayDeque<InvocationRequest>() /** - * The total amount of function invocations received by the service. + * Metrics tracked by the service. */ - private val _invocations = meter.counterBuilder("service.invocations.total") - .setDescription("Number of function invocations") - .setUnit("1") - .build() private var totalInvocations = 0L - - /** - * The amount of function invocations that could be handled directly. - */ - private val _timelyInvocations = meter.counterBuilder("service.invocations.warm") - .setDescription("Number of function invocations handled directly") - .setUnit("1") - .build() private var timelyInvocations = 0L - - /** - * The amount of function invocations that were delayed due to function deployment. - */ - private val _delayedInvocations = meter.counterBuilder("service.invocations.cold") - .setDescription("Number of function invocations that are delayed") - .setUnit("1") - .build() private var delayedInvocations = 0L override fun newClient(): FaaSClient { @@ -165,7 +139,6 @@ internal class FaaSServiceImpl( val uid = UUID(clock.millis(), random.nextLong()) val function = FunctionObject( - meter, uid, name, memorySize, @@ -232,7 +205,6 @@ internal class FaaSServiceImpl( } val instance = if (activeInstance != null) { - _timelyInvocations.add(1) timelyInvocations++ function.reportDeployment(isDelayed = false) @@ -242,7 +214,6 @@ internal class FaaSServiceImpl( instances.add(instance) terminationPolicy.enqueue(instance) - _delayedInvocations.add(1) delayedInvocations++ function.reportDeployment(isDelayed = true) @@ -271,7 +242,6 @@ internal class FaaSServiceImpl( suspend fun invoke(function: FunctionObject) { check(function.uid in functions) { "Function does not exist (anymore)" } - _invocations.add(1) totalInvocations++ function.reportSubmission() diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt index 1612e10b..560039c1 100644 --- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt @@ -23,8 +23,6 @@ package org.opendc.faas.service import io.mockk.* -import io.opentelemetry.api.metrics.MeterProvider -import kotlinx.coroutines.ExperimentalCoroutinesApi import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow @@ -39,12 +37,11 @@ import java.util.* /** * Test suite for the [FaaSService] implementation. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class FaaSServiceTest { @Test fun testClientState() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = assertDoesNotThrow { service.newClient() } assertDoesNotThrow { client.close() } @@ -58,7 +55,7 @@ internal class FaaSServiceTest { @Test fun testClientInvokeUnknown() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -67,7 +64,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCreation() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -78,7 +75,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionQuery() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -91,7 +88,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindById() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -104,7 +101,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindByName() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -117,7 +114,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDuplicateName() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -128,7 +125,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDelete() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -142,7 +139,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCannotInvokeDeleted() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -155,7 +152,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionInvoke() = runBlockingSimulation { val deployer = mockk<FunctionDeployer>() - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), deployer, mockk(), mockk(relaxUnitFun = true)) + val service = FaaSService(coroutineContext, clock, deployer, mockk(), mockk(relaxUnitFun = true)) every { deployer.deploy(any(), any()) } answers { object : FunctionInstance { diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt index 792a8584..d528558c 100644 --- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt @@ -24,7 +24,6 @@ package org.opendc.faas.simulator import io.mockk.coVerify import io.mockk.spyk -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.yield @@ -78,7 +77,10 @@ internal class SimFaaSServiceTest { val delayInjector = StochasticDelayInjector(ColdStartModel.GOOGLE, random) val deployer = SimFunctionDeployer(clock, this, machineModel, delayInjector) { workload } val service = FaaSService( - coroutineContext, clock, MeterProvider.noop(), deployer, RandomRoutingPolicy(), + coroutineContext, + clock, + deployer, + RandomRoutingPolicy(), FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMillis(10000)) ) |
