diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-06 09:27:45 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-06 18:37:36 +0200 |
| commit | 0b584e261fdf34d662129b1b47f00711c0ce0779 (patch) | |
| tree | 5395603ad1b8bbd4e28c71a331d916b071ff3bd6 /opendc-faas/opendc-faas-service/src | |
| parent | b82ae73d064590094f79e26de355060135ed13fd (diff) | |
refactor(workflow/service): Remove OpenTelemetry from "FaaS" modules
This change removes the OpenTelemetry integration from the OpenDC
FaaS modules. Previously, we chose to integrate OpenTelemetry to
provide a unified way to report metrics to the users.
See the previous commit removing it from the "Compute" modules for the
reasoning behind this change.
Diffstat (limited to 'opendc-faas/opendc-faas-service/src')
4 files changed, 20 insertions, 146 deletions
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt index f7dc3c1f..7b40d867 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt @@ -22,8 +22,6 @@ package org.opendc.faas.service -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider import org.opendc.faas.api.FaaSClient import org.opendc.faas.api.FaaSFunction import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy @@ -33,6 +31,7 @@ import org.opendc.faas.service.router.RoutingPolicy import org.opendc.faas.service.telemetry.FunctionStats import org.opendc.faas.service.telemetry.SchedulerStats import java.time.Clock +import java.time.Duration import kotlin.coroutines.CoroutineContext /** @@ -65,20 +64,20 @@ public interface FaaSService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. - * @param meterProvider The [MeterProvider] to create a [Meter] with. * @param deployer the [FunctionDeployer] to use for deploying function instances. * @param routingPolicy The policy to route function invocations. * @param terminationPolicy The policy for terminating function instances. + * @param quantum The scheduling quantum of the service (100 ms default) */ public operator fun invoke( context: CoroutineContext, clock: Clock, - meterProvider: MeterProvider, deployer: FunctionDeployer, routingPolicy: RoutingPolicy, terminationPolicy: FunctionTerminationPolicy, + quantum: Duration = Duration.ofMillis(100) ): FaaSService { - return FaaSServiceImpl(context, clock, meterProvider, deployer, routingPolicy, terminationPolicy) + return FaaSServiceImpl(context, clock, deployer, routingPolicy, terminationPolicy, quantum) } } } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt index 52fcffa1..1cc33f6f 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt @@ -22,13 +22,6 @@ package org.opendc.faas.service -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes -import io.opentelemetry.api.metrics.LongCounter -import io.opentelemetry.api.metrics.LongHistogram -import io.opentelemetry.api.metrics.LongUpDownCounter -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.telemetry.FunctionStats @@ -38,7 +31,6 @@ import java.util.* * An [FunctionObject] represents the service's view of a serverless function. */ public class FunctionObject( - meter: Meter, public val uid: UUID, name: String, allocatedMemory: Long, @@ -46,88 +38,16 @@ public class FunctionObject( meta: Map<String, Any> ) : AutoCloseable { /** - * The attributes of this function. + * Metrics tracked per function. */ - private val attributes: Attributes = Attributes.builder() - .put(ResourceAttributes.FAAS_ID, uid.toString()) - .put(ResourceAttributes.FAAS_NAME, name) - .put(ResourceAttributes.FAAS_MAX_MEMORY, allocatedMemory) - .put(AttributeKey.stringArrayKey("faas.labels"), labels.map { (k, v) -> "$k:$v" }) - .build() - - /** - * The total amount of function invocations received by the function. - */ - private val invocations: LongCounter = meter.counterBuilder("function.invocations.total") - .setDescription("Number of function invocations") - .setUnit("1") - .build() private var _invocations = 0L - - /** - * The amount of function invocations that could be handled directly. - */ - private val timelyInvocations: LongCounter = meter.counterBuilder("function.invocations.warm") - .setDescription("Number of function invocations handled directly") - .setUnit("1") - .build() private var _timelyInvocations = 0L - - /** - * The amount of function invocations that were delayed due to function deployment. - */ - private val delayedInvocations: LongCounter = meter.counterBuilder("function.invocations.cold") - .setDescription("Number of function invocations that are delayed") - .setUnit("1") - .build() private var _delayedInvocations = 0L - - /** - * The amount of function invocations that failed. - */ - private val failedInvocations: LongCounter = meter.counterBuilder("function.invocations.failed") - .setDescription("Number of function invocations that failed") - .setUnit("1") - .build() private var _failedInvocations = 0L - - /** - * The amount of instances for this function. - */ - private val activeInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.active") - .setDescription("Number of active function instances") - .setUnit("1") - .build() private var _activeInstances = 0 - - /** - * The amount of idle instances for this function. - */ - private val idleInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle") - .setDescription("Number of idle function instances") - .setUnit("1") - .build() private var _idleInstances = 0 - - /** - * The time that the function waited. - */ - private val waitTime: LongHistogram = meter.histogramBuilder("function.time.wait") - .ofLongs() - .setDescription("Time the function has to wait before being started") - .setUnit("ms") - .build() private val _waitTime = DescriptiveStatistics() .apply { windowSize = 100 } - - /** - * The time that the function was running. - */ - private val activeTime: LongHistogram = meter.histogramBuilder("function.time.active") - .ofLongs() - .setDescription("Time the function was running") - .setUnit("ms") - .build() private val _activeTime = DescriptiveStatistics() .apply { windowSize = 100 } @@ -150,7 +70,6 @@ public class FunctionObject( * Report a scheduled invocation. */ internal fun reportSubmission() { - invocations.add(1, attributes) _invocations++ } @@ -159,13 +78,9 @@ public class FunctionObject( */ internal fun reportDeployment(isDelayed: Boolean) { if (isDelayed) { - delayedInvocations.add(1, attributes) _delayedInvocations++ - - idleInstances.add(1, attributes) _idleInstances++ } else { - timelyInvocations.add(1, attributes) _timelyInvocations++ } } @@ -175,12 +90,9 @@ public class FunctionObject( */ internal fun reportStart(start: Long, submitTime: Long) { val wait = start - submitTime - waitTime.record(wait, attributes) _waitTime.addValue(wait.toDouble()) - idleInstances.add(-1, attributes) _idleInstances-- - activeInstances.add(1, attributes) _activeInstances++ } @@ -188,7 +100,6 @@ public class FunctionObject( * Report the failure of a function invocation. */ internal fun reportFailure() { - failedInvocations.add(1, attributes) _failedInvocations++ } @@ -196,11 +107,8 @@ public class FunctionObject( * Report the end of a function invocation. */ internal fun reportEnd(duration: Long) { - activeTime.record(duration, attributes) _activeTime.addValue(duration.toDouble()) - idleInstances.add(1, attributes) _idleInstances++ - activeInstances.add(-1, attributes) _activeInstances-- } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index ce3b2b98..4ee55dea 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -22,8 +22,6 @@ package org.opendc.faas.service.internal -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import kotlinx.coroutines.intrinsics.startCoroutineCancellable import mu.KotlinLogging @@ -42,6 +40,7 @@ import org.opendc.faas.service.telemetry.FunctionStats import org.opendc.faas.service.telemetry.SchedulerStats import java.lang.IllegalStateException import java.time.Clock +import java.time.Duration import java.util.* import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext @@ -57,10 +56,10 @@ import kotlin.coroutines.resumeWithException internal class FaaSServiceImpl( context: CoroutineContext, private val clock: Clock, - meterProvider: MeterProvider, private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy, - private val terminationPolicy: FunctionTerminationPolicy + private val terminationPolicy: FunctionTerminationPolicy, + quantum: Duration ) : FaaSService, FunctionInstanceListener { /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. @@ -73,14 +72,9 @@ internal class FaaSServiceImpl( private val logger = KotlinLogging.logger {} /** - * The [Meter] that collects the metrics of this service. - */ - private val meter = meterProvider.get("org.opendc.faas.service") - - /** * The [Pacer] to use for scheduling the scheduler cycles. */ - private val pacer = Pacer(scope.coroutineContext, clock, quantum = 100) { doSchedule() } + private val pacer = Pacer(scope.coroutineContext, clock, quantum = quantum.toMillis()) { doSchedule() } /** * The [Random] instance used to generate unique identifiers for the objects. @@ -99,30 +93,10 @@ internal class FaaSServiceImpl( private val queue = ArrayDeque<InvocationRequest>() /** - * The total amount of function invocations received by the service. + * Metrics tracked by the service. */ - private val _invocations = meter.counterBuilder("service.invocations.total") - .setDescription("Number of function invocations") - .setUnit("1") - .build() private var totalInvocations = 0L - - /** - * The amount of function invocations that could be handled directly. - */ - private val _timelyInvocations = meter.counterBuilder("service.invocations.warm") - .setDescription("Number of function invocations handled directly") - .setUnit("1") - .build() private var timelyInvocations = 0L - - /** - * The amount of function invocations that were delayed due to function deployment. - */ - private val _delayedInvocations = meter.counterBuilder("service.invocations.cold") - .setDescription("Number of function invocations that are delayed") - .setUnit("1") - .build() private var delayedInvocations = 0L override fun newClient(): FaaSClient { @@ -165,7 +139,6 @@ internal class FaaSServiceImpl( val uid = UUID(clock.millis(), random.nextLong()) val function = FunctionObject( - meter, uid, name, memorySize, @@ -232,7 +205,6 @@ internal class FaaSServiceImpl( } val instance = if (activeInstance != null) { - _timelyInvocations.add(1) timelyInvocations++ function.reportDeployment(isDelayed = false) @@ -242,7 +214,6 @@ internal class FaaSServiceImpl( instances.add(instance) terminationPolicy.enqueue(instance) - _delayedInvocations.add(1) delayedInvocations++ function.reportDeployment(isDelayed = true) @@ -271,7 +242,6 @@ internal class FaaSServiceImpl( suspend fun invoke(function: FunctionObject) { check(function.uid in functions) { "Function does not exist (anymore)" } - _invocations.add(1) totalInvocations++ function.reportSubmission() diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt index 1612e10b..560039c1 100644 --- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt @@ -23,8 +23,6 @@ package org.opendc.faas.service import io.mockk.* -import io.opentelemetry.api.metrics.MeterProvider -import kotlinx.coroutines.ExperimentalCoroutinesApi import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow @@ -39,12 +37,11 @@ import java.util.* /** * Test suite for the [FaaSService] implementation. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class FaaSServiceTest { @Test fun testClientState() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = assertDoesNotThrow { service.newClient() } assertDoesNotThrow { client.close() } @@ -58,7 +55,7 @@ internal class FaaSServiceTest { @Test fun testClientInvokeUnknown() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -67,7 +64,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCreation() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -78,7 +75,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionQuery() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -91,7 +88,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindById() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -104,7 +101,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindByName() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -117,7 +114,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDuplicateName() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -128,7 +125,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDelete() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -142,7 +139,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCannotInvokeDeleted() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -155,7 +152,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionInvoke() = runBlockingSimulation { val deployer = mockk<FunctionDeployer>() - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), deployer, mockk(), mockk(relaxUnitFun = true)) + val service = FaaSService(coroutineContext, clock, deployer, mockk(), mockk(relaxUnitFun = true)) every { deployer.deploy(any(), any()) } answers { object : FunctionInstance { |
