diff options
Diffstat (limited to 'opendc-faas')
7 files changed, 44 insertions, 39 deletions
diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts index 63bed8bc..6f4fcc9b 100644 --- a/opendc-faas/opendc-faas-service/build.gradle.kts +++ b/opendc-faas/opendc-faas-service/build.gradle.kts @@ -35,6 +35,7 @@ dependencies { api(projects.opendcTelemetry.opendcTelemetryApi) implementation(projects.opendcUtils) implementation(libs.kotlin.logging) + implementation(libs.opentelemetry.semconv) testImplementation(projects.opendcSimulator.opendcSimulatorCore) testRuntimeOnly(libs.log4j.slf4j) diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt index 7e716a34..1d5331cb 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt @@ -23,6 +23,7 @@ package org.opendc.faas.service import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import org.opendc.faas.api.FaaSClient import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy import org.opendc.faas.service.deployer.FunctionDeployer @@ -51,7 +52,7 @@ public interface FaaSService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. - * @param meter The meter to report metrics to. + * @param meterProvider The [MeterProvider] to create a [Meter] with. * @param deployer the [FunctionDeployer] to use for deploying function instances. * @param routingPolicy The policy to route function invocations. * @param terminationPolicy The policy for terminating function instances. @@ -59,12 +60,12 @@ public interface FaaSService : AutoCloseable { public operator fun invoke( context: CoroutineContext, clock: Clock, - meter: Meter, + meterProvider: MeterProvider, deployer: FunctionDeployer, routingPolicy: RoutingPolicy, terminationPolicy: FunctionTerminationPolicy, ): FaaSService { - return FaaSServiceImpl(context, clock, meter, deployer, routingPolicy, terminationPolicy) + return FaaSServiceImpl(context, clock, meterProvider, deployer, routingPolicy, terminationPolicy) } } } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt index a1cb1dbf..54df2b59 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt @@ -28,6 +28,7 @@ import io.opentelemetry.api.metrics.BoundLongCounter import io.opentelemetry.api.metrics.BoundLongHistogram import io.opentelemetry.api.metrics.BoundLongUpDownCounter import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import org.opendc.faas.service.deployer.FunctionInstance import java.util.* @@ -43,9 +44,14 @@ public class FunctionObject( meta: Map<String, Any> ) : AutoCloseable { /** - * The function identifier attached to the metrics. + * The attributes of this function. */ - private val functionId = AttributeKey.stringKey("function") + public val attributes: Attributes = Attributes.builder() + .put(ResourceAttributes.FAAS_ID, uid.toString()) + .put(ResourceAttributes.FAAS_NAME, name) + .put(ResourceAttributes.FAAS_MAX_MEMORY, allocatedMemory) + .put(AttributeKey.stringArrayKey("faas.labels"), labels.map { (k, v) -> "$k:$v" }) + .build() /** * The total amount of function invocations received by the function. @@ -54,7 +60,7 @@ public class FunctionObject( .setDescription("Number of function invocations") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of function invocations that could be handled directly. @@ -63,7 +69,7 @@ public class FunctionObject( .setDescription("Number of function invocations handled directly") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of function invocations that were delayed due to function deployment. @@ -72,7 +78,7 @@ public class FunctionObject( .setDescription("Number of function invocations that are delayed") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of function invocations that failed. @@ -81,7 +87,7 @@ public class FunctionObject( .setDescription("Number of function invocations that failed") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of instances for this function. @@ -90,7 +96,7 @@ public class FunctionObject( .setDescription("Number of active function instances") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The amount of idle instances for this function. @@ -99,7 +105,7 @@ public class FunctionObject( .setDescription("Number of idle function instances") .setUnit("1") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The time that the function waited. @@ -109,7 +115,7 @@ public class FunctionObject( .setDescription("Time the function has to wait before being started") .setUnit("ms") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The time that the function was running. @@ -119,7 +125,7 @@ public class FunctionObject( .setDescription("Time the function was running") .setUnit("ms") .build() - .bind(Attributes.of(functionId, uid.toString())) + .bind(attributes) /** * The instances associated with this function. diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt index 1e224ed1..63dbadc7 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt @@ -26,6 +26,7 @@ import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.deployer.FunctionInstanceState import org.opendc.utils.TimerScheduler import java.time.Clock +import java.time.Duration import kotlin.coroutines.CoroutineContext /** @@ -36,7 +37,7 @@ import kotlin.coroutines.CoroutineContext public class FunctionTerminationPolicyFixed( context: CoroutineContext, clock: Clock, - public val timeout: Long + public val timeout: Duration ) : FunctionTerminationPolicy { /** * The [TimerScheduler] used to schedule the function terminations. @@ -60,6 +61,6 @@ public class FunctionTerminationPolicyFixed( * Schedule termination for the specified [instance]. */ private fun schedule(instance: FunctionInstance) { - scheduler.startSingleTimer(instance, delay = timeout) { instance.close() } + scheduler.startSingleTimer(instance, delay = timeout.toMillis()) { instance.close() } } } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index ccf9a5d9..3b560cd3 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -23,6 +23,7 @@ package org.opendc.faas.service.internal import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.* import kotlinx.coroutines.intrinsics.startCoroutineCancellable import mu.KotlinLogging @@ -54,7 +55,7 @@ import kotlin.coroutines.resumeWithException internal class FaaSServiceImpl( context: CoroutineContext, private val clock: Clock, - private val meter: Meter, + private val meterProvider: MeterProvider, private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy, private val terminationPolicy: FunctionTerminationPolicy @@ -70,6 +71,11 @@ internal class FaaSServiceImpl( private val logger = KotlinLogging.logger {} /** + * The [Meter] that collects the metrics of this service. + */ + private val meter = meterProvider.get("org.opendc.faas.service") + + /** * The [TimerScheduler] to use for scheduling the scheduler cycles. */ private val scheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock) diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt index 6b99684a..1612e10b 100644 --- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt @@ -44,8 +44,7 @@ internal class FaaSServiceTest { @Test fun testClientState() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = assertDoesNotThrow { service.newClient() } assertDoesNotThrow { client.close() } @@ -59,8 +58,7 @@ internal class FaaSServiceTest { @Test fun testClientInvokeUnknown() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -69,8 +67,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCreation() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -81,8 +78,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionQuery() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -95,8 +91,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindById() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -109,8 +104,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionFindByName() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -123,8 +117,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDuplicateName() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() @@ -135,8 +128,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionDelete() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -150,8 +142,7 @@ internal class FaaSServiceTest { @Test fun testClientFunctionCannotInvokeDeleted() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") - val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test", 128) @@ -163,9 +154,8 @@ internal class FaaSServiceTest { @Test fun testClientFunctionInvoke() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") val deployer = mockk<FunctionDeployer>() - val service = FaaSService(coroutineContext, clock, meter, deployer, mockk(), mockk(relaxUnitFun = true)) + val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), deployer, mockk(), mockk(relaxUnitFun = true)) every { deployer.deploy(any(), any()) } answers { object : FunctionInstance { diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt index 64f2551b..0dc9ba87 100644 --- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt @@ -43,6 +43,7 @@ import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation +import java.time.Duration /** * A test suite for the [FaaSService] implementation under simulated conditions. @@ -64,14 +65,13 @@ internal class SimFaaSServiceTest { @Test fun testSmoke() = runBlockingSimulation { - val meter = MeterProvider.noop().get("opendc-faas") val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimFlopsWorkload(1000) { override suspend fun invoke() {} }) val deployer = SimFunctionDeployer(clock, this, machineModel, ZeroDelayInjector) { workload } val service = FaaSService( - coroutineContext, clock, meter, deployer, RandomRoutingPolicy(), - FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = 10000) + coroutineContext, clock, MeterProvider.noop(), deployer, RandomRoutingPolicy(), + FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMillis(10000)) ) val client = service.newClient() |
