diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-14 14:41:05 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-17 16:48:53 +0200 |
| commit | 3ca64e0110adab65526a0ccfd5b252e9f047ab10 (patch) | |
| tree | 9cad172501154d01b00a1e5c45d94d7e2f1e5698 /opendc-faas/opendc-faas-service/src | |
| parent | 5f0b6b372487d79594cf59010822e160f351e0be (diff) | |
refactor(telemetry): Create separate MeterProvider per service/host
This change refactors the telemetry implementation by creating a
separate MeterProvider per service or host. This means we have to keep
track of multiple metric producers, but that we can attach resource
information to each of the MeterProviders like we would in a real world
scenario.
Diffstat (limited to 'opendc-faas/opendc-faas-service/src')
5 files changed, 40 insertions, 36 deletions
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt index 7e716a34..1d5331cb 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt @@ -23,6 +23,7 @@ package org.opendc.faas.service import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import org.opendc.faas.api.FaaSClient import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy import org.opendc.faas.service.deployer.FunctionDeployer @@ -51,7 +52,7 @@ public interface FaaSService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. - * @param meter The meter to report metrics to. + * @param meterProvider The [MeterProvider] to create a [Meter] with. * @param deployer the [FunctionDeployer] to use for deploying function instances. * @param routingPolicy The policy to route function invocations. * @param terminationPolicy The policy for terminating function instances. @@ -59,12 +60,12 @@ public interface FaaSService : AutoCloseable { public operator fun invoke( context: CoroutineContext, clock: Clock, - meter: Meter, + meterProvider: MeterProvider, deployer: FunctionDeployer, routingPolicy: RoutingPolicy, terminationPolicy: FunctionTerminationPolicy, ): FaaSService { - return FaaSServiceImpl(context, clock, meter, deployer, routingPolicy, terminationPolicy) + return FaaSServiceImpl(context, clock, meterProvider, deployer, routingPolicy, terminationPolicy) } } } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt index a1cb1dbf..54df2b59 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt @@ -28,6 +28,7 @@ import io.opentelemetry.api.metrics.BoundLongCounter import io.opentelemetry.api.metrics.BoundLongHistogram import io.opentelemetry.api.metrics.BoundLongUpDownCounter import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import org.opendc.faas.service.deployer.FunctionInstance import java.util.* @@ -43,9 +44,14 @@ public class FunctionObject( meta: Map<String, Any> ) : AutoCloseable { /** - * The function identifier attached to the metrics. + * The attributes of this function. */ - private val functionId = AttributeKey.stringKey("function") + public val attributes: Attributes = Attributes.builder() + .put(ResourceAttributes.FAAS_ID, uid.toString()) + .put(ResourceAttributes.FAAS_NAME, name) + .put(ResourceAttributes.FAAS_MAX_MEMORY, allocatedMemory) + .put(AttributeKey.stringArrayKey("faas.labels"), labels.map { (k, v) -> "$k:$v" }) + .build() /** * The total amount of function invocations received by the function. @@ -54,7 +60,7 @@ public class FunctionObject( .setDescription("Number of function invocations") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of function invocations that could be handled directly. @@ -63,7 +69,7 @@ public class FunctionObject( .setDescription("Number of function invocations handled directly") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of function invocations that were delayed due to function deployment. @@ -72,7 +78,7 @@ public class FunctionObject( .setDescription("Number of function invocations that are delayed") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of function invocations that failed. @@ -81,7 +87,7 @@ public class FunctionObject( .setDescription("Number of function invocations that failed") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of instances for this function. @@ -90,7 +96,7 @@ public class FunctionObject( .setDescription("Number of active function instances") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of idle instances for this function. @@ -99,7 +105,7 @@ public class FunctionObject( .setDescription("Number of idle function instances") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The time that the function waited. @@ -109,7 +115,7 @@ public class FunctionObject( .setDescription("Time the function has to wait before being started") .setUnit("ms") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The time that the function was running. @@ -119,7 +125,7 @@ public class FunctionObject( .setDescription("Time the function was running") .setUnit("ms") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The instances associated with this function. diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt index 1e224ed1..63dbadc7 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt @@ -26,6 +26,7 @@ import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.deployer.FunctionInstanceState import org.opendc.utils.TimerScheduler import java.time.Clock +import java.time.Duration import kotlin.coroutines.CoroutineContext /** @@ -36,7 +37,7 @@ import kotlin.coroutines.CoroutineContext public class FunctionTerminationPolicyFixed( context: CoroutineContext, clock: Clock, - public val timeout: Long + public val timeout: Duration ) : FunctionTerminationPolicy { /** * The [TimerScheduler] used to schedule the function terminations. @@ -60,6 +61,6 @@ public class FunctionTerminationPolicyFixed( * Schedule termination for the specified [instance]. */ private fun schedule(instance: FunctionInstance) { - scheduler.startSingleTimer(instance, delay = timeout) { instance.close() } + scheduler.startSingleTimer(instance, delay = timeout.toMillis()) { instance.close() } } } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index ccf9a5d9..3b560cd3 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -23,6 +23,7 @@ package org.opendc.faas.service.internal import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import kotlinx.coroutines.intrinsics.startCoroutineCancellable import mu.KotlinLogging @@ -54,7 +55,7 @@ import kotlin.coroutines.resumeWithException internal class FaaSServiceImpl( context: CoroutineContext, private val clock: Clock, - private val meter: Meter, + private val meterProvider: MeterProvider, private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy, private val terminationPolicy: FunctionTerminationPolicy @@ -70,6 +71,11 @@ internal class FaaSServiceImpl( private val logger = KotlinLogging.logger {} /** + * The [Meter] that collects the metrics of this service. + */ + private val meter = meterProvider.get("org.opendc.faas.service") + + /** * The [TimerScheduler] to use for scheduling the scheduler cycles. */ private val scheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock) diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt index 6b99684a..1612e10b 100644 --- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt @@ -44,8 +44,7 @@ internal class FaaSServiceTest { @Test fun testClientState() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = assertDoesNotThrow { service.newClient() } assertDoesNotThrow { client.close() } @@ -59,8 +58,7 @@ internal class FaaSServiceTest { @Test fun testClientInvokeUnknown() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -69,8 +67,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCreation() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -81,8 +78,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionQuery() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -95,8 +91,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindById() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -109,8 +104,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindByName() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -123,8 +117,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDuplicateName() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -135,8 +128,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDelete() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -150,8 +142,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCannotInvokeDeleted() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -163,9 +154,8 @@ internal class FaaSServiceTest { @Test fun testClientFunctionInvoke() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") val deployer = mockk<FunctionDeployer>() - val service = FaaSService(coroutineContext, clock, meter, deployer, mockk(), mockk(relaxUnitFun = true)) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), deployer, mockk(), mockk(relaxUnitFun = true)) every { deployer.deploy(any(), any()) } answers { object : FunctionInstance { |
