summaryrefslogtreecommitdiff
path: root/opendc-faas
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-06 19:04:03 +0200
committerGitHub <noreply@github.com>2022-05-06 19:04:03 +0200
commitc3d8d967f82f39f1ef461d5687eb68fb867336c5 (patch)
tree2e9938f63c42e5d02fe203e049377d1d17b5d782 /opendc-faas
parenta9657e4fa3b15e2c1c11884b5a250b0861bcc21d (diff)
parent260e2228afea08868e8f7f07233b1861b2d7f0c7 (diff)
merge: Move OpenTelemetry integration outside core modules (#81)
This change removes the OpenTelemetry integration from the OpenDC modules. Previously, we chose to integrate OpenTelemetry to provide a unified way to report metrics to the users. Although this worked as expected, the overhead of the OpenTelemetry when collecting metrics during simulation was considerable and lacked more optimization opportunities (other than providing a separate API implementation). Furthermore, since we were tied to OpenTelemetry's SDK implementation, we experienced issues with throttling and registering multiple instruments. We will instead use another approach, where we expose the core metrics in OpenDC via specialized interfaces (see #80) such that access is fast and can be done without having to interface with OpenTelemetry. In addition, we will provide an adapter to that is able to forward these metrics to OpenTelemetry implementations, so we can still integrate with the wider ecosystem. ## Implementation Notes :hammer_and_pick: * Remove OpenTelemetry from "compute" modules * Remove OpenTelemetry from "workflow" modules * Remove OpenTelemetry from "FaaS" modules * Remove OpenTelemetry from TF20 experiment * Remove dependency on OpenTelemetry SDK ## External Dependencies :four_leaf_clover: * N/A ## Breaking API Changes :warning: * Metrics are not anymore directly exposed via OpenTelemetry. Instead, an adapter needs to be used to access the data via OpenTelemetry.
Diffstat (limited to 'opendc-faas')
-rw-r--r--opendc-faas/opendc-faas-service/build.gradle.kts2
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt9
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt94
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt40
-rw-r--r--opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt23
-rw-r--r--opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt6
6 files changed, 24 insertions, 150 deletions
diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts
index 1803ae69..34f5b7ea 100644
--- a/opendc-faas/opendc-faas-service/build.gradle.kts
+++ b/opendc-faas/opendc-faas-service/build.gradle.kts
@@ -29,11 +29,9 @@ plugins {
dependencies {
api(projects.opendcFaas.opendcFaasApi)
- api(projects.opendcTelemetry.opendcTelemetryApi)
api(libs.commons.math3)
implementation(projects.opendcCommon)
implementation(libs.kotlin.logging)
- implementation(libs.opentelemetry.semconv)
testImplementation(projects.opendcSimulator.opendcSimulatorCore)
testRuntimeOnly(libs.log4j.slf4j)
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
index f7dc3c1f..7b40d867 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
@@ -22,8 +22,6 @@
package org.opendc.faas.service
-import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.faas.api.FaaSClient
import org.opendc.faas.api.FaaSFunction
import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy
@@ -33,6 +31,7 @@ import org.opendc.faas.service.router.RoutingPolicy
import org.opendc.faas.service.telemetry.FunctionStats
import org.opendc.faas.service.telemetry.SchedulerStats
import java.time.Clock
+import java.time.Duration
import kotlin.coroutines.CoroutineContext
/**
@@ -65,20 +64,20 @@ public interface FaaSService : AutoCloseable {
*
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
- * @param meterProvider The [MeterProvider] to create a [Meter] with.
* @param deployer the [FunctionDeployer] to use for deploying function instances.
* @param routingPolicy The policy to route function invocations.
* @param terminationPolicy The policy for terminating function instances.
+ * @param quantum The scheduling quantum of the service (100 ms default)
*/
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
- meterProvider: MeterProvider,
deployer: FunctionDeployer,
routingPolicy: RoutingPolicy,
terminationPolicy: FunctionTerminationPolicy,
+ quantum: Duration = Duration.ofMillis(100)
): FaaSService {
- return FaaSServiceImpl(context, clock, meterProvider, deployer, routingPolicy, terminationPolicy)
+ return FaaSServiceImpl(context, clock, deployer, routingPolicy, terminationPolicy, quantum)
}
}
}
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
index 52fcffa1..1cc33f6f 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
@@ -22,13 +22,6 @@
package org.opendc.faas.service
-import io.opentelemetry.api.common.AttributeKey
-import io.opentelemetry.api.common.Attributes
-import io.opentelemetry.api.metrics.LongCounter
-import io.opentelemetry.api.metrics.LongHistogram
-import io.opentelemetry.api.metrics.LongUpDownCounter
-import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics
import org.opendc.faas.service.deployer.FunctionInstance
import org.opendc.faas.service.telemetry.FunctionStats
@@ -38,7 +31,6 @@ import java.util.*
* An [FunctionObject] represents the service's view of a serverless function.
*/
public class FunctionObject(
- meter: Meter,
public val uid: UUID,
name: String,
allocatedMemory: Long,
@@ -46,88 +38,16 @@ public class FunctionObject(
meta: Map<String, Any>
) : AutoCloseable {
/**
- * The attributes of this function.
+ * Metrics tracked per function.
*/
- private val attributes: Attributes = Attributes.builder()
- .put(ResourceAttributes.FAAS_ID, uid.toString())
- .put(ResourceAttributes.FAAS_NAME, name)
- .put(ResourceAttributes.FAAS_MAX_MEMORY, allocatedMemory)
- .put(AttributeKey.stringArrayKey("faas.labels"), labels.map { (k, v) -> "$k:$v" })
- .build()
-
- /**
- * The total amount of function invocations received by the function.
- */
- private val invocations: LongCounter = meter.counterBuilder("function.invocations.total")
- .setDescription("Number of function invocations")
- .setUnit("1")
- .build()
private var _invocations = 0L
-
- /**
- * The amount of function invocations that could be handled directly.
- */
- private val timelyInvocations: LongCounter = meter.counterBuilder("function.invocations.warm")
- .setDescription("Number of function invocations handled directly")
- .setUnit("1")
- .build()
private var _timelyInvocations = 0L
-
- /**
- * The amount of function invocations that were delayed due to function deployment.
- */
- private val delayedInvocations: LongCounter = meter.counterBuilder("function.invocations.cold")
- .setDescription("Number of function invocations that are delayed")
- .setUnit("1")
- .build()
private var _delayedInvocations = 0L
-
- /**
- * The amount of function invocations that failed.
- */
- private val failedInvocations: LongCounter = meter.counterBuilder("function.invocations.failed")
- .setDescription("Number of function invocations that failed")
- .setUnit("1")
- .build()
private var _failedInvocations = 0L
-
- /**
- * The amount of instances for this function.
- */
- private val activeInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.active")
- .setDescription("Number of active function instances")
- .setUnit("1")
- .build()
private var _activeInstances = 0
-
- /**
- * The amount of idle instances for this function.
- */
- private val idleInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle")
- .setDescription("Number of idle function instances")
- .setUnit("1")
- .build()
private var _idleInstances = 0
-
- /**
- * The time that the function waited.
- */
- private val waitTime: LongHistogram = meter.histogramBuilder("function.time.wait")
- .ofLongs()
- .setDescription("Time the function has to wait before being started")
- .setUnit("ms")
- .build()
private val _waitTime = DescriptiveStatistics()
.apply { windowSize = 100 }
-
- /**
- * The time that the function was running.
- */
- private val activeTime: LongHistogram = meter.histogramBuilder("function.time.active")
- .ofLongs()
- .setDescription("Time the function was running")
- .setUnit("ms")
- .build()
private val _activeTime = DescriptiveStatistics()
.apply { windowSize = 100 }
@@ -150,7 +70,6 @@ public class FunctionObject(
* Report a scheduled invocation.
*/
internal fun reportSubmission() {
- invocations.add(1, attributes)
_invocations++
}
@@ -159,13 +78,9 @@ public class FunctionObject(
*/
internal fun reportDeployment(isDelayed: Boolean) {
if (isDelayed) {
- delayedInvocations.add(1, attributes)
_delayedInvocations++
-
- idleInstances.add(1, attributes)
_idleInstances++
} else {
- timelyInvocations.add(1, attributes)
_timelyInvocations++
}
}
@@ -175,12 +90,9 @@ public class FunctionObject(
*/
internal fun reportStart(start: Long, submitTime: Long) {
val wait = start - submitTime
- waitTime.record(wait, attributes)
_waitTime.addValue(wait.toDouble())
- idleInstances.add(-1, attributes)
_idleInstances--
- activeInstances.add(1, attributes)
_activeInstances++
}
@@ -188,7 +100,6 @@ public class FunctionObject(
* Report the failure of a function invocation.
*/
internal fun reportFailure() {
- failedInvocations.add(1, attributes)
_failedInvocations++
}
@@ -196,11 +107,8 @@ public class FunctionObject(
* Report the end of a function invocation.
*/
internal fun reportEnd(duration: Long) {
- activeTime.record(duration, attributes)
_activeTime.addValue(duration.toDouble())
- idleInstances.add(1, attributes)
_idleInstances++
- activeInstances.add(-1, attributes)
_activeInstances--
}
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
index ce3b2b98..4ee55dea 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
@@ -22,8 +22,6 @@
package org.opendc.faas.service.internal
-import io.opentelemetry.api.metrics.Meter
-import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.*
import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import mu.KotlinLogging
@@ -42,6 +40,7 @@ import org.opendc.faas.service.telemetry.FunctionStats
import org.opendc.faas.service.telemetry.SchedulerStats
import java.lang.IllegalStateException
import java.time.Clock
+import java.time.Duration
import java.util.*
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
@@ -57,10 +56,10 @@ import kotlin.coroutines.resumeWithException
internal class FaaSServiceImpl(
context: CoroutineContext,
private val clock: Clock,
- meterProvider: MeterProvider,
private val deployer: FunctionDeployer,
private val routingPolicy: RoutingPolicy,
- private val terminationPolicy: FunctionTerminationPolicy
+ private val terminationPolicy: FunctionTerminationPolicy,
+ quantum: Duration
) : FaaSService, FunctionInstanceListener {
/**
* The [CoroutineScope] of the service bounded by the lifecycle of the service.
@@ -73,14 +72,9 @@ internal class FaaSServiceImpl(
private val logger = KotlinLogging.logger {}
/**
- * The [Meter] that collects the metrics of this service.
- */
- private val meter = meterProvider.get("org.opendc.faas.service")
-
- /**
* The [Pacer] to use for scheduling the scheduler cycles.
*/
- private val pacer = Pacer(scope.coroutineContext, clock, quantum = 100) { doSchedule() }
+ private val pacer = Pacer(scope.coroutineContext, clock, quantum = quantum.toMillis()) { doSchedule() }
/**
* The [Random] instance used to generate unique identifiers for the objects.
@@ -99,30 +93,10 @@ internal class FaaSServiceImpl(
private val queue = ArrayDeque<InvocationRequest>()
/**
- * The total amount of function invocations received by the service.
+ * Metrics tracked by the service.
*/
- private val _invocations = meter.counterBuilder("service.invocations.total")
- .setDescription("Number of function invocations")
- .setUnit("1")
- .build()
private var totalInvocations = 0L
-
- /**
- * The amount of function invocations that could be handled directly.
- */
- private val _timelyInvocations = meter.counterBuilder("service.invocations.warm")
- .setDescription("Number of function invocations handled directly")
- .setUnit("1")
- .build()
private var timelyInvocations = 0L
-
- /**
- * The amount of function invocations that were delayed due to function deployment.
- */
- private val _delayedInvocations = meter.counterBuilder("service.invocations.cold")
- .setDescription("Number of function invocations that are delayed")
- .setUnit("1")
- .build()
private var delayedInvocations = 0L
override fun newClient(): FaaSClient {
@@ -165,7 +139,6 @@ internal class FaaSServiceImpl(
val uid = UUID(clock.millis(), random.nextLong())
val function = FunctionObject(
- meter,
uid,
name,
memorySize,
@@ -232,7 +205,6 @@ internal class FaaSServiceImpl(
}
val instance = if (activeInstance != null) {
- _timelyInvocations.add(1)
timelyInvocations++
function.reportDeployment(isDelayed = false)
@@ -242,7 +214,6 @@ internal class FaaSServiceImpl(
instances.add(instance)
terminationPolicy.enqueue(instance)
- _delayedInvocations.add(1)
delayedInvocations++
function.reportDeployment(isDelayed = true)
@@ -271,7 +242,6 @@ internal class FaaSServiceImpl(
suspend fun invoke(function: FunctionObject) {
check(function.uid in functions) { "Function does not exist (anymore)" }
- _invocations.add(1)
totalInvocations++
function.reportSubmission()
diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
index 1612e10b..560039c1 100644
--- a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
+++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt
@@ -23,8 +23,6 @@
package org.opendc.faas.service
import io.mockk.*
-import io.opentelemetry.api.metrics.MeterProvider
-import kotlinx.coroutines.ExperimentalCoroutinesApi
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
@@ -39,12 +37,11 @@ import java.util.*
/**
* Test suite for the [FaaSService] implementation.
*/
-@OptIn(ExperimentalCoroutinesApi::class)
internal class FaaSServiceTest {
@Test
fun testClientState() = runBlockingSimulation {
- val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
val client = assertDoesNotThrow { service.newClient() }
assertDoesNotThrow { client.close() }
@@ -58,7 +55,7 @@ internal class FaaSServiceTest {
@Test
fun testClientInvokeUnknown() = runBlockingSimulation {
- val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -67,7 +64,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionCreation() = runBlockingSimulation {
- val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -78,7 +75,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionQuery() = runBlockingSimulation {
- val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -91,7 +88,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionFindById() = runBlockingSimulation {
- val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -104,7 +101,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionFindByName() = runBlockingSimulation {
- val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -117,7 +114,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionDuplicateName() = runBlockingSimulation {
- val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
val client = service.newClient()
@@ -128,7 +125,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionDelete() = runBlockingSimulation {
- val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -142,7 +139,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionCannotInvokeDeleted() = runBlockingSimulation {
- val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), mockk(), mockk(), mockk())
+ val service = FaaSService(coroutineContext, clock, mockk(), mockk(), mockk())
val client = service.newClient()
val function = client.newFunction("test", 128)
@@ -155,7 +152,7 @@ internal class FaaSServiceTest {
@Test
fun testClientFunctionInvoke() = runBlockingSimulation {
val deployer = mockk<FunctionDeployer>()
- val service = FaaSService(coroutineContext, clock, MeterProvider.noop(), deployer, mockk(), mockk(relaxUnitFun = true))
+ val service = FaaSService(coroutineContext, clock, deployer, mockk(), mockk(relaxUnitFun = true))
every { deployer.deploy(any(), any()) } answers {
object : FunctionInstance {
diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
index 792a8584..d528558c 100644
--- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
+++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
@@ -24,7 +24,6 @@ package org.opendc.faas.simulator
import io.mockk.coVerify
import io.mockk.spyk
-import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.yield
@@ -78,7 +77,10 @@ internal class SimFaaSServiceTest {
val delayInjector = StochasticDelayInjector(ColdStartModel.GOOGLE, random)
val deployer = SimFunctionDeployer(clock, this, machineModel, delayInjector) { workload }
val service = FaaSService(
- coroutineContext, clock, MeterProvider.noop(), deployer, RandomRoutingPolicy(),
+ coroutineContext,
+ clock,
+ deployer,
+ RandomRoutingPolicy(),
FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMillis(10000))
)