From b8ae32eb9a2420fe596ac5f89d3eabee83c2291a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 29 Mar 2021 13:45:41 +0200 Subject: serverless: Expose metrics from Serverless service This change exposes several metrics from the Serverless service, which are needed for the experiments. --- .../opendc-serverless-service/build.gradle.kts | 1 + .../opendc/serverless/service/ServerlessService.kt | 7 ++++- .../service/internal/ServerlessServiceImpl.kt | 31 ++++++++++++++++++++++ .../serverless/service/ServerlessServiceTest.kt | 31 +++++++++++++++------- .../simulator/SimServerlessServiceTest.kt | 4 ++- 5 files changed, 62 insertions(+), 12 deletions(-) (limited to 'simulator') diff --git a/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts b/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts index 0221829a..f7e43aba 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts +++ b/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts @@ -32,6 +32,7 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-serverless:opendc-serverless-api")) + api(project(":opendc-telemetry:opendc-telemetry-api")) implementation(project(":opendc-utils")) implementation("io.github.microutils:kotlin-logging") diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt index 18717ef5..a791c815 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt @@ -22,6 +22,7 @@ package org.opendc.serverless.service +import io.opentelemetry.api.metrics.Meter import org.opendc.serverless.api.ServerlessClient import org.opendc.serverless.service.deployer.FunctionDeployer import org.opendc.serverless.service.internal.ServerlessServiceImpl @@ -49,14 +50,18 @@ public interface ServerlessService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. + * @param meter The meter to report metrics to. + * @param deployer the [FunctionDeployer] to use for deploying function instances. + * @param routingPolicy The policy to route function invocations. */ public operator fun invoke( context: CoroutineContext, clock: Clock, + meter: Meter, deployer: FunctionDeployer, routingPolicy: RoutingPolicy, ): ServerlessService { - return ServerlessServiceImpl(context, clock, deployer, routingPolicy) + return ServerlessServiceImpl(context, clock, meter, deployer, routingPolicy) } } } diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt index b3f395c3..c49871df 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt @@ -22,6 +22,7 @@ package org.opendc.serverless.service.internal +import io.opentelemetry.api.metrics.Meter import kotlinx.coroutines.* import kotlinx.coroutines.intrinsics.startCoroutineCancellable import mu.KotlinLogging @@ -49,6 +50,7 @@ import kotlin.coroutines.resumeWithException internal class ServerlessServiceImpl( context: CoroutineContext, private val clock: Clock, + internal val meter: Meter, private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy ) : ServerlessService { @@ -88,6 +90,30 @@ internal class ServerlessServiceImpl( */ private val instancesByFunction = mutableMapOf>() + /** + * The total amount of function invocations received by the service. + */ + private val _invocations = meter.longCounterBuilder("invocations.total") + .setDescription("Number of function invocations") + .setUnit("1") + .build() + + /** + * The amount of function invocations that could be handled directly. + */ + private val _timelyInvocations = meter.longCounterBuilder("invocations.warm") + .setDescription("Number of function invocations handled directly") + .setUnit("1") + .build() + + /** + * The amount of function invocations that were delayed due to function deployment. + */ + private val _delayedInvocations = meter.longCounterBuilder("invocations.cold") + .setDescription("Number of function invocations that are delayed") + .setUnit("1") + .build() + override fun newClient(): ServerlessClient { return object : ServerlessClient { private var isClosed: Boolean = false @@ -183,6 +209,8 @@ internal class ServerlessServiceImpl( } val instance = if (activeInstance != null) { + _timelyInvocations.add(1) + activeInstance } else { val instance = deployer.deploy(function) @@ -195,6 +223,8 @@ internal class ServerlessServiceImpl( } } + _delayedInvocations.add(1) + instance } @@ -209,6 +239,7 @@ internal class ServerlessServiceImpl( internal suspend fun invoke(function: InternalFunction) { check(function.uid in functions) { "Function does not exist (anymore)" } + _invocations.add(1) return suspendCancellableCoroutine { cont -> if (!queue.add(InvocationRequest(function, cont))) { cont.resumeWithException(IllegalStateException("Failed to enqueue request")) diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt b/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt index d9c2bcd2..c4910758 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt @@ -23,6 +23,7 @@ package org.opendc.serverless.service import io.mockk.* +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.* @@ -44,8 +45,9 @@ internal class ServerlessServiceTest { @Test fun testClientState() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = assertDoesNotThrow { service.newClient() } assertDoesNotThrow { client.close() } @@ -59,8 +61,9 @@ internal class ServerlessServiceTest { @Test fun testClientInvokeUnknown() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() @@ -69,8 +72,9 @@ internal class ServerlessServiceTest { @Test fun testClientFunctionCreation() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() @@ -81,8 +85,9 @@ internal class ServerlessServiceTest { @Test fun testClientFunctionQuery() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() @@ -95,8 +100,9 @@ internal class ServerlessServiceTest { @Test fun testClientFunctionFindById() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() @@ -109,8 +115,9 @@ internal class ServerlessServiceTest { @Test fun testClientFunctionFindByName() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() @@ -123,8 +130,9 @@ internal class ServerlessServiceTest { @Test fun testClientFunctionDuplicateName() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() @@ -135,8 +143,9 @@ internal class ServerlessServiceTest { @Test fun testClientFunctionDelete() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test") @@ -150,8 +159,9 @@ internal class ServerlessServiceTest { @Test fun testClientFunctionCannotInvokeDeleted() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test") @@ -163,9 +173,10 @@ internal class ServerlessServiceTest { @Test fun testClientFunctionInvoke() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) val deployer = mockk() - val service = ServerlessService(coroutineContext, clock, deployer, mockk()) + val service = ServerlessService(coroutineContext, clock, meter, deployer, mockk()) every { deployer.deploy(any()) } answers { object : FunctionInstance { diff --git a/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt b/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt index f68e206a..16ae60e6 100644 --- a/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt +++ b/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt @@ -24,6 +24,7 @@ package org.opendc.serverless.simulator import io.mockk.spyk import io.mockk.verify +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.test.runBlockingTest @@ -62,12 +63,13 @@ internal class SimServerlessServiceTest { @Test fun testSmoke() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) val workload = spyk(object : SimServerlessWorkload { override fun onInvoke(): SimWorkload = SimFlopsWorkload(1000) }) val deployer = SimFunctionDeployer(clock, this, machineModel) { workload } - val service = ServerlessService(coroutineContext, clock, deployer, RandomRoutingPolicy()) + val service = ServerlessService(coroutineContext, clock, meter, deployer, RandomRoutingPolicy()) val client = service.newClient() -- cgit v1.2.3