From 0b584e261fdf34d662129b1b47f00711c0ce0779 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 6 May 2022 09:27:45 +0200 Subject: refactor(workflow/service): Remove OpenTelemetry from "FaaS" modules This change removes the OpenTelemetry integration from the OpenDC FaaS modules. Previously, we chose to integrate OpenTelemetry to provide a unified way to report metrics to the users. See the previous commit removing it from the "Compute" modules for the reasoning behind this change. --- .../build.gradle.kts | 1 - .../experiments/serverless/ServerlessExperiment.kt | 16 ++-- opendc-faas/opendc-faas-service/build.gradle.kts | 2 - .../kotlin/org/opendc/faas/service/FaaSService.kt | 9 +-- .../org/opendc/faas/service/FunctionObject.kt | 94 +--------------------- .../faas/service/internal/FaaSServiceImpl.kt | 40 ++------- .../org/opendc/faas/service/FaaSServiceTest.kt | 23 +++--- .../opendc/faas/simulator/SimFaaSServiceTest.kt | 6 +- 8 files changed, 31 insertions(+), 160 deletions(-) diff --git a/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts b/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts index b96647a6..a6391986 100644 --- a/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts @@ -33,7 +33,6 @@ dependencies { implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcFaas.opendcFaasService) implementation(projects.opendcFaas.opendcFaasSimulator) - implementation(projects.opendcTelemetry.opendcTelemetrySdk) implementation(libs.kotlin.logging) implementation(libs.config) } diff --git a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt index 3312d6c0..1c357f67 100644 --- a/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt +++ b/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt @@ -23,8 +23,6 @@ package org.opendc.experiments.serverless import com.typesafe.config.ConfigFactory -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.SdkMeterProvider import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -44,7 +42,6 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.sdk.toOtelClock import java.io.File import java.time.Duration import java.util.* @@ -76,17 +73,18 @@ public class ServerlessExperiment : Experiment("Serverless") { private val coldStartModel by anyOf(ColdStartModel.LAMBDA, ColdStartModel.AZURE, ColdStartModel.GOOGLE) override fun doRun(repeat: Int): Unit = runBlockingSimulation { - val meterProvider: MeterProvider = SdkMeterProvider - .builder() - .setClock(clock.toOtelClock()) - .build() - val trace = ServerlessTraceReader().parse(File(config.getString("trace-path"))) val traceById = trace.associateBy { it.id } val delayInjector = StochasticDelayInjector(coldStartModel, Random()) val deployer = SimFunctionDeployer(clock, this, createMachineModel(), delayInjector) { FunctionTraceWorkload(traceById.getValue(it.name)) } val service = - FaaSService(coroutineContext, clock, meterProvider, deployer, routingPolicy, FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMinutes(10))) + FaaSService( + coroutineContext, + clock, + deployer, + routingPolicy, + FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMinutes(10)) + ) val client = service.newClient() coroutineScope { diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts index 1803ae69..34f5b7ea 100644 --- a/opendc-faas/opendc-faas-service/build.gradle.kts +++ b/opendc-faas/opendc-faas-service/build.gradle.kts @@ -29,11 +29,9 @@ plugins { dependencies { api(projects.opendcFaas.opendcFaasApi) - api(projects.opendcTelemetry.opendcTelemetryApi) api(libs.commons.math3) implementation(projects.opendcCommon) implementation(libs.kotlin.logging) - implementation(libs.opentelemetry.semconv) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testRuntimeOnly(libs.log4j.slf4j) diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt index f7dc3c1f..7b40d867 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt @@ -22,8 +22,6 @@ package org.opendc.faas.service -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider import org.opendc.faas.api.FaaSClient import org.opendc.faas.api.FaaSFunction import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy @@ -33,6 +31,7 @@ import org.opendc.faas.service.router.RoutingPolicy import org.opendc.faas.service.telemetry.FunctionStats import org.opendc.faas.service.telemetry.SchedulerStats import java.time.Clock +import java.time.Duration import kotlin.coroutines.CoroutineContext /** @@ -65,20 +64,20 @@ public interface FaaSService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. - * @param meterProvider The [MeterProvider] to create a [Meter] with. * @param deployer the [FunctionDeployer] to use for deploying function instances. * @param routingPolicy The policy to route function invocations. * @param terminationPolicy The policy for terminating function instances. + * @param quantum The scheduling quantum of the service (100 ms default) */ public operator fun invoke( context: CoroutineContext, clock: Clock, - meterProvider: MeterProvider, deployer: FunctionDeployer, routingPolicy: RoutingPolicy, terminationPolicy: FunctionTerminationPolicy, + quantum: Duration = Duration.ofMillis(100) ): FaaSService { - return FaaSServiceImpl(context, clock, meterProvider, deployer, routingPolicy, terminationPolicy) + return FaaSServiceImpl(context, clock, deployer, routingPolicy, terminationPolicy, quantum) } } } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt index 52fcffa1..1cc33f6f 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt @@ -22,13 +22,6 @@ package org.opendc.faas.service -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes -import io.opentelemetry.api.metrics.LongCounter -import io.opentelemetry.api.metrics.LongHistogram -import io.opentelemetry.api.metrics.LongUpDownCounter -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.telemetry.FunctionStats @@ -38,7 +31,6 @@ import java.util.* * An [FunctionObject] represents the service's view of a serverless function. */ public class FunctionObject( - meter: Meter, public val uid: UUID, name: String, allocatedMemory: Long, @@ -46,88 +38,16 @@ public class FunctionObject( meta: Map ) : AutoCloseable { /** - * The attributes of this function. + * Metrics tracked per function. */ - private val attributes: Attributes = Attributes.builder() - .put(ResourceAttributes.FAAS_ID, uid.toString()) - .put(ResourceAttributes.FAAS_NAME, name) - .put(ResourceAttributes.FAAS_MAX_MEMORY, allocatedMemory) - .put(AttributeKey.stringArrayKey("faas.labels"), labels.map { (k, v) -> "$k:$v" }) - .build() - - /** - * The total amount of function invocations received by the function. - */ - private val invocations: LongCounter = meter.counterBuilder("function.invocations.total") - .setDescription("Number of function invocations") - .setUnit("1") - .build() private var _invocations = 0L - - /** - * The amount of function invocations that could be handled directly. - */ - private val timelyInvocations: LongCounter = meter.counterBuilder("function.invocations.warm") - .setDescription("Number of function invocations handled directly") - .setUnit("1") - .build() private var _timelyInvocations = 0L - - /** - * The amount of function invocations that were delayed due to function deployment. - */ - private val delayedInvocations: LongCounter = meter.counterBuilder("function.invocations.cold") - .setDescription("Number of function invocations that are delayed") - .setUnit("1") - .build() private var _delayedInvocations = 0L - - /** - * The amount of function invocations that failed. - */ - private val failedInvocations: LongCounter = meter.counterBuilder("function.invocations.failed") - .setDescription("Number of function invocations that failed") - .setUnit("1") - .build() private var _failedInvocations = 0L - - /** - * The amount of instances for this function. - */ - private val activeInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.active") - .setDescription("Number of active function instances") - .setUnit("1") - .build() private var _activeInstances = 0 - - /** - * The amount of idle instances for this function. - */ - private val idleInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle") - .setDescription("Number of idle function instances") - .setUnit("1") - .build() private var _idleInstances = 0 - - /** - * The time that the function waited. - */ - private val waitTime: LongHistogram = meter.histogramBuilder("function.time.wait") - .ofLongs() - .setDescription("Time the function has to wait before being started") - .setUnit("ms") - .build() private val _waitTime = DescriptiveStatistics() .apply { windowSize = 100 } - - /** - * The time that the function was running. - */ - private val activeTime: LongHistogram = meter.histogramBuilder("function.time.active") - .ofLongs() - .setDescription("Time the function was running") - .setUnit("ms") - .build() private val _activeTime = DescriptiveStatistics() .apply { windowSize = 100 } @@ -150,7 +70,6 @@ public class FunctionObject( * Report a scheduled invocation. */ internal fun reportSubmission() { - invocations.add(1, attributes) _invocations++ } @@ -159,13 +78,9 @@ public class FunctionObject( */ internal fun reportDeployment(isDelayed: Boolean) { if (isDelayed) { - delayedInvocations.add(1, attributes) _delayedInvocations++ - - idleInstances.add(1, attributes) _idleInstances++ } else { - timelyInvocations.add(1, attributes) _timelyInvocations++ } } @@ -175,12 +90,9 @@ public class FunctionObject( */ internal fun reportStart(start: Long, submitTime: Long) { val wait = start - submitTime - waitTime.record(wait, attributes) _waitTime.addValue(wait.toDouble()) - idleInstances.add(-1, attributes) _idleInstances-- - activeInstances.add(1, attributes) _activeInstances++ } @@ -188,7 +100,6 @@ public class FunctionObject( * Report the failure of a function invocation. */ internal fun reportFailure() { - failedInvocations.add(1, attributes) _failedInvocations++ } @@ -196,11 +107,8 @@ public class FunctionObject( * Report the end of a function invocation. */ internal fun reportEnd(duration: Long) { - activeTime.record(duration, attributes) _activeTime.addValue(duration.toDouble()) - idleInstances.add(1, attributes) _idleInstances++ - activeInstances.add(-1, attributes) _activeInstances-- } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index ce3b2b98..4ee55dea 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -22,8 +22,6 @@ package org.opendc.faas.service.internal -import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import kotlinx.coroutines.intrinsics.startCoroutineCancellable import mu.KotlinLogging @@ -42,6 +40,7 @@ import org.opendc.faas.service.telemetry.FunctionStats import org.opendc.faas.service.telemetry.SchedulerStats import java.lang.IllegalStateException import java.time.Clock +import java.time.Duration import java.util.* import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext @@ -57,10 +56,10 @@ import kotlin.coroutines.resumeWithException internal class FaaSServiceImpl( context: CoroutineContext, private val clock: Clock, - meterProvider: MeterProvider, private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy, - private val terminationPolicy: FunctionTerminationPolicy + private val terminationPolicy: FunctionTerminationPolicy, + quantum: Duration ) : FaaSService, FunctionInstanceListener { /** * The [CoroutineScope] of the service bounded by the lifecycle of the service. @@ -72,15 +71,10 @@ internal class FaaSServiceImpl( */ private val logger = KotlinLogging.logger {} - /** - * The [Meter] that collects the metrics of this service. - */ - private val meter = meterProvider.get("org.opendc.faas.service") - /** * The [Pacer] to use for scheduling the scheduler cycles. */ - private val pacer = Pacer(scope.coroutineContext, clock, quantum = 100) { doSchedule() } + private val pacer = Pacer(scope.coroutineContext, clock, quantum = quantum.toMillis()) { doSchedule() } /** * The [Random] instance used to generate unique identifiers for the objects. @@ -99,30 +93,10 @@ internal class FaaSServiceImpl( private val queue = ArrayDeque() /** - * The total amount of function invocations received by the service. + * Metrics tracked by the service. */ - private val _invocations = meter.counterBuilder("service.invocations.total") - .setDescription("Number of function invocations") - .setUnit("1") - .build() private var totalInvocations = 0L - - /** - * The amount of function invocations that could be handled directly. - */ - private val _timelyInvocations = meter.counterBuilder("service.invocations.warm") - .setDescription("Number of function invocations handled directly") - .setUnit("1") - .build() private var timelyInvocations = 0L - - /** - * The amount of function invocations that were delayed due to function deployment. - */ - private val _delayedInvocations = meter.counterBuilder("service.invocations.cold") - .setDescription("Number of function invocations that are delayed") - .setUnit("1") - .build() private var delayedInvocations = 0L override fun newClient(): FaaSClient { @@ -165,7 +139,6 @@ internal class FaaSServiceImpl( val uid = UUID(clock.millis(), random.nextLong()) val function = FunctionObject( - meter, uid, name, memorySize, @@ -232,7 +205,6 @@ internal class FaaSServiceImpl( } val instance = if (activeInstance != null) { - _timelyInvocations.add(1) timelyInvocations++ function.reportDeployment(isDelayed = false) @@ -242,7 +214,6 @@ internal class FaaSServiceImpl( instances.add(instance) terminationPolicy.enqueue(instance) - _delayedInvocations.add(1) delayedInvocations++ function.reportDeployment(isDelayed = true) @@ -271,7 +242,6 @@ internal class FaaSServiceImpl( suspend fun invoke(function: FunctionObject) { check(function.uid in functions) { "Function does not exist (anymore)" } - _invocations.add(1) totalInvocations++ function.reportSubmission() diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt index 1612e10b..560039c1 100644 --- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt @@ -23,8 +23,6 @@ package org.opendc.faas.service import io.mockk.* -import io.opentelemetry.api.metrics.MeterProvider -import kotlinx.coroutines.ExperimentalCoroutinesApi import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow @@ -39,12 +37,11 @@ import java.util.* /** * Test suite for the [FaaSService] implementation. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class FaaSServiceTest { @Test fun testClientState() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = assertDoesNotThrow { service.newClient() } assertDoesNotThrow { client.close() } @@ -58,7 +55,7 @@ internal class FaaSServiceTest { @Test fun testClientInvokeUnknown() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -67,7 +64,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCreation() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -78,7 +75,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionQuery() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -91,7 +88,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindById() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -104,7 +101,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindByName() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -117,7 +114,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDuplicateName() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() @@ -128,7 +125,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDelete() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -142,7 +139,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCannotInvokeDeleted() = runBlockingSimulation { - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -155,7 +152,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionInvoke() = runBlockingSimulation { val deployer = mockk() - val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), deployer, mockk(), mockk(relaxUnitFun = true)) + val service = FaaSService(coroutineContext, clock, deployer, mockk(), mockk(relaxUnitFun = true)) every { deployer.deploy(any(), any()) } answers { object : FunctionInstance { diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt index 792a8584..d528558c 100644 --- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt @@ -24,7 +24,6 @@ package org.opendc.faas.simulator import io.mockk.coVerify import io.mockk.spyk -import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.yield @@ -78,7 +77,10 @@ internal class SimFaaSServiceTest { val delayInjector = StochasticDelayInjector(ColdStartModel.GOOGLE, random) val deployer = SimFunctionDeployer(clock, this, machineModel, delayInjector) { workload } val service = FaaSService( - coroutineContext, clock, MeterProvider.noop(), deployer, RandomRoutingPolicy(), + coroutineContext, + clock, + deployer, + RandomRoutingPolicy(), FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMillis(10000)) ) -- cgit v1.2.3