summaryrefslogtreecommitdiff
path: root/opendc-faas/opendc-faas-service
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-14 14:41:05 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-17 16:48:53 +0200
commit3ca64e0110adab65526a0ccfd5b252e9f047ab10 (patch)
tree9cad172501154d01b00a1e5c45d94d7e2f1e5698 /opendc-faas/opendc-faas-service
parent5f0b6b372487d79594cf59010822e160f351e0be (diff)
refactor(telemetry): Create separate MeterProvider per service/host
This change refactors the telemetry implementation by creating a separate MeterProvider per service or host. This means we have to keep track of multiple metric producers, but that we can attach resource information to each of the MeterProviders like we would in a real world scenario.
Diffstat (limited to 'opendc-faas/opendc-faas-service')
-rw-r--r--opendc-faas/opendc-faas-service/build.gradle.kts1
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt7
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt26
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt5
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt8
-rw-r--r--opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt30
6 files changed, 41 insertions, 36 deletions
diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts
index 63bed8bc..6f4fcc9b 100644
--- a/opendc-faas/opendc-faas-service/build.gradle.kts
+++ b/opendc-faas/opendc-faas-service/build.gradle.kts
@@ -35,6 +35,7 @@ dependencies {
api(projects.opendcTelemetry.opendcTelemetryApi)
implementation(projects.opendcUtils)
implementation(libs.kotlin.logging)
+ implementation(libs.opentelemetry.semconv)
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testRuntimeOnly(libs.log4j.slf4j)
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
index 7e716a34..1d5331cb 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
@@ -23,6 +23,7 @@
package org.opendc.faas.service
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.faas.api.FaaSClient
import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy
import org.opendc.faas.service.deployer.FunctionDeployer
@@ -51,7 +52,7 @@ public interface FaaSService : AutoCloseable {
*
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
- * @param meter The meter to report metrics to.
+ * @param meterProvider The [MeterProvider] to create a [Meter] with.
* @param deployer the [FunctionDeployer] to use for deploying function instances.
* @param routingPolicy The policy to route function invocations.
* @param terminationPolicy The policy for terminating function instances.
@@ -59,12 +60,12 @@ public interface FaaSService : AutoCloseable {
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
- meter: Meter,
+ meterProvider: MeterProvider,
deployer: FunctionDeployer,
routingPolicy: RoutingPolicy,
terminationPolicy: FunctionTerminationPolicy,
): FaaSService {
- return FaaSServiceImpl(context, clock, meter, deployer, routingPolicy, terminationPolicy)
+ return FaaSServiceImpl(context, clock, meterProvider, deployer, routingPolicy, terminationPolicy)
}
}
}
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
index a1cb1dbf..54df2b59 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
@@ -28,6 +28,7 @@ import io.opentelemetry.api.metrics.BoundLongCounter
import io.opentelemetry.api.metrics.BoundLongHistogram
import io.opentelemetry.api.metrics.BoundLongUpDownCounter
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import org.opendc.faas.service.deployer.FunctionInstance
import java.util.*
@@ -43,9 +44,14 @@ public class FunctionObject(
meta: Map<String, Any>
) : AutoCloseable {
/**
- * The function identifier attached to the metrics.
+ * The attributes of this function.
*/
- private val functionId = AttributeKey.stringKey("function")
+ public val attributes: Attributes = Attributes.builder()
+ .put(ResourceAttributes.FAAS_ID, uid.toString())
+ .put(ResourceAttributes.FAAS_NAME, name)
+ .put(ResourceAttributes.FAAS_MAX_MEMORY, allocatedMemory)
+ .put(AttributeKey.stringArrayKey("faas.labels"), labels.map { (k, v) -> "$k:$v" })
+ .build()
/**
* The total amount of function invocations received by the function.
@@ -54,7 +60,7 @@ public class FunctionObject(
.setDescription("Number of function invocations")
.setUnit("1")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The amount of function invocations that could be handled directly.
@@ -63,7 +69,7 @@ public class FunctionObject(
.setDescription("Number of function invocations handled directly")
.setUnit("1")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The amount of function invocations that were delayed due to function deployment.
@@ -72,7 +78,7 @@ public class FunctionObject(
.setDescription("Number of function invocations that are delayed")
.setUnit("1")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The amount of function invocations that failed.
@@ -81,7 +87,7 @@ public class FunctionObject(
.setDescription("Number of function invocations that failed")
.setUnit("1")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The amount of instances for this function.
@@ -90,7 +96,7 @@ public class FunctionObject(
.setDescription("Number of active function instances")
.setUnit("1")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The amount of idle instances for this function.
@@ -99,7 +105,7 @@ public class FunctionObject(
.setDescription("Number of idle function instances")
.setUnit("1")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The time that the function waited.
@@ -109,7 +115,7 @@ public class FunctionObject(
.setDescription("Time the function has to wait before being started")
.setUnit("ms")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The time that the function was running.
@@ -119,7 +125,7 @@ public class FunctionObject(
.setDescription("Time the function was running")
.setUnit("ms")
.build()
- .bind(Attributes.of(functionId, uid.toString()))
+ .bind(attributes)
/**
* The instances associated with this function.
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
index 1e224ed1..63dbadc7 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt
@@ -26,6 +26,7 @@ import org.opendc.faas.service.deployer.FunctionInstance
import org.opendc.faas.service.deployer.FunctionInstanceState
import org.opendc.utils.TimerScheduler
import java.time.Clock
+import java.time.Duration
import kotlin.coroutines.CoroutineContext
/**
@@ -36,7 +37,7 @@ import kotlin.coroutines.CoroutineContext
public class FunctionTerminationPolicyFixed(
context: CoroutineContext,
clock: Clock,
- public val timeout: Long
+ public val timeout: Duration
) : FunctionTerminationPolicy {
/**
* The [TimerScheduler] used to schedule the function terminations.
@@ -60,6 +61,6 @@ public class FunctionTerminationPolicyFixed(
* Schedule termination for the specified [instance].
*/
private fun schedule(instance: FunctionInstance) {
- scheduler.startSingleTimer(instance, delay = timeout) { instance.close() }
+ scheduler.startSingleTimer(instance, delay = timeout.toMillis()) { instance.close() }
}
}
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
index ccf9a5d9..3b560cd3 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
@@ -23,6 +23,7 @@
package org.opendc.faas.service.internal
import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import mu.KotlinLogging
@@ -54,7 +55,7 @@ import kotlin.coroutines.resumeWithException
internal class FaaSServiceImpl(
context: CoroutineContext,
private val clock: Clock,
- private val meter: Meter,
+ private val meterProvider: MeterProvider,
private val deployer: FunctionDeployer,
private val routingPolicy: RoutingPolicy,
private val terminationPolicy: FunctionTerminationPolicy
@@ -70,6 +71,11 @@ internal class FaaSServiceImpl(
private val logger = KotlinLogging.logger {}
/**
+ * The [Meter] that collects the metrics of this service.
+ */
+ private val meter = meterProvider.get("org.opendc.faas.service")
+
+ /**
* The [TimerScheduler] to use for scheduling the scheduler cycles.
*/
private val scheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock)
diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
index 6b99684a..1612e10b 100644
--- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
+++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
@@ -44,8 +44,7 @@ internal class FaaSServiceTest {
@Test
fun testClientState() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = assertDoesNotThrow { service.newClient() }
assertDoesNotThrow { client.close() }
@@ -59,8 +58,7 @@ internal class FaaSServiceTest {
@Test
fun testClientInvokeUnknown() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
@@ -69,8 +67,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionCreation() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
@@ -81,8 +78,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionQuery() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
@@ -95,8 +91,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionFindById() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
@@ -109,8 +104,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionFindByName() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
@@ -123,8 +117,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionDuplicateName() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
@@ -135,8 +128,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionDelete() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -150,8 +142,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionCannotInvokeDeleted() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
- val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -163,9 +154,8 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionInvoke() = runBlockingSimulation {
- val meter = MeterProvider.noop().get("opendc-faas")
val deployer = mockk<FunctionDeployer>()
- val service = FaaSService(coroutineContext, clock, meter, deployer, mockk(), mockk(relaxUnitFun = true))
+ val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), deployer, mockk(), mockk(relaxUnitFun = true))
every { deployer.deploy(any(), any()) } answers {
object : FunctionInstance {