diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-06 09:18:43 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-06 17:45:40 +0200 |
| commit | ddb57f774415579f97d43a5097381a816c7015ca (patch) | |
| tree | ca89d43e89a0104d00c0eef36bf04a9db7fcde76 /opendc-faas | |
| parent | 470c96072fa4f112d0511383ea99cdf7d5cc0864 (diff) | |
refactor(faas/service): Directly expose scheduler/function stats to user
This change updates the `FaaSService` interface to directly expose
statistics about the scheduler and individual functions to the user, such
that they do not necessarily have to interact with OpenTelemetry to obtain
these values.
Diffstat (limited to 'opendc-faas')
8 files changed, 236 insertions, 27 deletions
diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts index c54595d3..1803ae69 100644 --- a/opendc-faas/opendc-faas-service/build.gradle.kts +++ b/opendc-faas/opendc-faas-service/build.gradle.kts @@ -30,6 +30,7 @@ plugins { dependencies { api(projects.opendcFaas.opendcFaasApi) api(projects.opendcTelemetry.opendcTelemetryApi) + api(libs.commons.math3) implementation(projects.opendcCommon) implementation(libs.kotlin.logging) implementation(libs.opentelemetry.semconv) diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt index 1d5331cb..f7dc3c1f 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt @@ -25,10 +25,13 @@ package org.opendc.faas.service import io.opentelemetry.api.metrics.Meter import io.opentelemetry.api.metrics.MeterProvider import org.opendc.faas.api.FaaSClient +import org.opendc.faas.api.FaaSFunction import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy import org.opendc.faas.service.deployer.FunctionDeployer import org.opendc.faas.service.internal.FaaSServiceImpl import org.opendc.faas.service.router.RoutingPolicy +import org.opendc.faas.service.telemetry.FunctionStats +import org.opendc.faas.service.telemetry.SchedulerStats import java.time.Clock import kotlin.coroutines.CoroutineContext @@ -42,6 +45,16 @@ public interface FaaSService : AutoCloseable { public fun newClient(): FaaSClient /** + * Collect statistics about the scheduler of the service. + */ + public fun getSchedulerStats(): SchedulerStats + + /** + * Collect statistics about the specified [function]. + */ + public fun getFunctionStats(function: FaaSFunction): FunctionStats + + /** * Terminate the lifecycle of the FaaS service, stopping all running function instances. */ public override fun close() diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt index 836231c8..52fcffa1 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt @@ -29,7 +29,9 @@ import io.opentelemetry.api.metrics.LongHistogram import io.opentelemetry.api.metrics.LongUpDownCounter import io.opentelemetry.api.metrics.Meter import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics import org.opendc.faas.service.deployer.FunctionInstance +import org.opendc.faas.service.telemetry.FunctionStats import java.util.* /** @@ -46,7 +48,7 @@ public class FunctionObject( /** * The attributes of this function. */ - public val attributes: Attributes = Attributes.builder() + private val attributes: Attributes = Attributes.builder() .put(ResourceAttributes.FAAS_ID, uid.toString()) .put(ResourceAttributes.FAAS_NAME, name) .put(ResourceAttributes.FAAS_MAX_MEMORY, allocatedMemory) @@ -56,68 +58,78 @@ public class FunctionObject( /** * The total amount of function invocations received by the function. */ - public val invocations: LongCounter = meter.counterBuilder("function.invocations.total") + private val invocations: LongCounter = meter.counterBuilder("function.invocations.total") .setDescription("Number of function invocations") .setUnit("1") .build() + private var _invocations = 0L /** * The amount of function invocations that could be handled directly. */ - public val timelyInvocations: LongCounter = meter.counterBuilder("function.invocations.warm") + private val timelyInvocations: LongCounter = meter.counterBuilder("function.invocations.warm") .setDescription("Number of function invocations handled directly") .setUnit("1") .build() + private var _timelyInvocations = 0L /** * The amount of function invocations that were delayed due to function deployment. */ - public val delayedInvocations: LongCounter = meter.counterBuilder("function.invocations.cold") + private val delayedInvocations: LongCounter = meter.counterBuilder("function.invocations.cold") .setDescription("Number of function invocations that are delayed") .setUnit("1") .build() + private var _delayedInvocations = 0L /** * The amount of function invocations that failed. */ - public val failedInvocations: LongCounter = meter.counterBuilder("function.invocations.failed") + private val failedInvocations: LongCounter = meter.counterBuilder("function.invocations.failed") .setDescription("Number of function invocations that failed") .setUnit("1") .build() + private var _failedInvocations = 0L /** * The amount of instances for this function. */ - public val activeInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.active") + private val activeInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.active") .setDescription("Number of active function instances") .setUnit("1") .build() + private var _activeInstances = 0 /** * The amount of idle instances for this function. */ - public val idleInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle") + private val idleInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle") .setDescription("Number of idle function instances") .setUnit("1") .build() + private var _idleInstances = 0 /** * The time that the function waited. */ - public val waitTime: LongHistogram = meter.histogramBuilder("function.time.wait") + private val waitTime: LongHistogram = meter.histogramBuilder("function.time.wait") .ofLongs() .setDescription("Time the function has to wait before being started") .setUnit("ms") .build() + private val _waitTime = DescriptiveStatistics() + .apply { windowSize = 100 } /** * The time that the function was running. */ - public val activeTime: LongHistogram = meter.histogramBuilder("function.time.active") + private val activeTime: LongHistogram = meter.histogramBuilder("function.time.active") .ofLongs() .setDescription("Time the function was running") .setUnit("ms") .build() + private val _activeTime = DescriptiveStatistics() + .apply { windowSize = 100 } /** * The instances associated with this function. @@ -134,6 +146,80 @@ public class FunctionObject( public val meta: MutableMap<String, Any> = meta.toMutableMap() + /** + * Report a scheduled invocation. + */ + internal fun reportSubmission() { + invocations.add(1, attributes) + _invocations++ + } + + /** + * Report the deployment of an invocation. + */ + internal fun reportDeployment(isDelayed: Boolean) { + if (isDelayed) { + delayedInvocations.add(1, attributes) + _delayedInvocations++ + + idleInstances.add(1, attributes) + _idleInstances++ + } else { + timelyInvocations.add(1, attributes) + _timelyInvocations++ + } + } + + /** + * Report the start of a function invocation. + */ + internal fun reportStart(start: Long, submitTime: Long) { + val wait = start - submitTime + waitTime.record(wait, attributes) + _waitTime.addValue(wait.toDouble()) + + idleInstances.add(-1, attributes) + _idleInstances-- + activeInstances.add(1, attributes) + _activeInstances++ + } + + /** + * Report the failure of a function invocation. + */ + internal fun reportFailure() { + failedInvocations.add(1, attributes) + _failedInvocations++ + } + + /** + * Report the end of a function invocation. + */ + internal fun reportEnd(duration: Long) { + activeTime.record(duration, attributes) + _activeTime.addValue(duration.toDouble()) + idleInstances.add(1, attributes) + _idleInstances++ + activeInstances.add(-1, attributes) + _activeInstances-- + } + + /** + * Collect the statistics of this function. + */ + internal fun getStats(): FunctionStats { + return FunctionStats( + _invocations, + _timelyInvocations, + _delayedInvocations, + _failedInvocations, + _activeInstances, + _idleInstances, + _waitTime.copy(), + _activeTime.copy() + ) + } + override fun close() { instances.forEach(FunctionInstance::close) instances.clear() diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index 1526be9d..ce3b2b98 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -38,6 +38,8 @@ import org.opendc.faas.service.deployer.FunctionInstance import org.opendc.faas.service.deployer.FunctionInstanceListener import org.opendc.faas.service.deployer.FunctionInstanceState import org.opendc.faas.service.router.RoutingPolicy +import org.opendc.faas.service.telemetry.FunctionStats +import org.opendc.faas.service.telemetry.SchedulerStats import java.lang.IllegalStateException import java.time.Clock import java.util.* @@ -103,6 +105,7 @@ internal class FaaSServiceImpl( .setDescription("Number of function invocations") .setUnit("1") .build() + private var totalInvocations = 0L /** * The amount of function invocations that could be handled directly. @@ -111,6 +114,7 @@ internal class FaaSServiceImpl( .setDescription("Number of function invocations handled directly") .setUnit("1") .build() + private var timelyInvocations = 0L /** * The amount of function invocations that were delayed due to function deployment. @@ -119,6 +123,7 @@ internal class FaaSServiceImpl( .setDescription("Number of function invocations that are delayed") .setUnit("1") .build() + private var delayedInvocations = 0L override fun newClient(): FaaSClient { return object : FaaSClient { @@ -187,6 +192,15 @@ internal class FaaSServiceImpl( } } + override fun getSchedulerStats(): SchedulerStats { + return SchedulerStats(totalInvocations, timelyInvocations, delayedInvocations) + } + + override fun getFunctionStats(function: FaaSFunction): FunctionStats { + val func = requireNotNull(functions[function.uid]) { "Unknown function" } + return func.getStats() + } + /** * Indicate that a new scheduling cycle is needed due to a change to the service's state. */ @@ -219,7 +233,8 @@ internal class FaaSServiceImpl( val instance = if (activeInstance != null) { _timelyInvocations.add(1) - function.timelyInvocations.add(1, function.attributes) + timelyInvocations++ + function.reportDeployment(isDelayed = false) activeInstance } else { @@ -227,29 +242,24 @@ internal class FaaSServiceImpl( instances.add(instance) terminationPolicy.enqueue(instance) - function.idleInstances.add(1, function.attributes) - _delayedInvocations.add(1) - function.delayedInvocations.add(1, function.attributes) + delayedInvocations++ + function.reportDeployment(isDelayed = true) instance } suspend { val start = clock.millis() - function.waitTime.record(start - submitTime, function.attributes) - function.idleInstances.add(-1, function.attributes) - function.activeInstances.add(1, function.attributes) + function.reportStart(start, submitTime) try { instance.invoke() } catch (e: Throwable) { logger.debug(e) { "Function invocation failed" } - function.failedInvocations.add(1, function.attributes) + function.reportFailure() } finally { val end = clock.millis() - function.activeTime.record(end - start, function.attributes) - function.idleInstances.add(1, function.attributes) - function.activeInstances.add(-1, function.attributes) + function.reportEnd(end - start) } }.startCoroutineCancellable(cont) } @@ -262,7 +272,8 @@ internal class FaaSServiceImpl( check(function.uid in functions) { "Function does not exist (anymore)" } _invocations.add(1) - function.invocations.add(1, function.attributes) + totalInvocations++ + function.reportSubmission() return suspendCancellableCoroutine { cont -> if (!queue.add(InvocationRequest(clock.millis(), function, cont))) { diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt new file mode 100644 index 00000000..497ee423 --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service.telemetry + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics + +/** + * Statistics about function invocations. + * + * @property totalInvocations The number of function invocations. + * @property timelyInvocations The number of function invocations that could be handled directly. + * @property delayedInvocations The number of function invocations that are delayed (cold starts). + * @property failedInvocations The number of function invocations that failed. + * @property activeInstances The number of active function instances. + * @property idleInstances The number of idle function instances. + * @property waitTime Statistics about the wait time of a function invocation. + * @property activeTime Statistics about the runtime of a function invocation. + */ +public data class FunctionStats( + val totalInvocations: Long, + val timelyInvocations: Long, + val delayedInvocations: Long, + val failedInvocations: Long, + val activeInstances: Int, + val idleInstances: Int, + val waitTime: DescriptiveStatistics, + val activeTime: DescriptiveStatistics +) diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt new file mode 100644 index 00000000..cabb1d56 --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service.telemetry + +/** + * Statistics reported by the FaaS scheduler. + * + * @property totalInvocations The total amount of function invocations received by the scheduler. + * @property timelyInvocations The amount of function invocations that could be handled directly. + * @property delayedInvocations The amount of function invocations that were delayed due to function deployment. + */ +public data class SchedulerStats( + val totalInvocations: Long, + val timelyInvocations: Long, + val delayedInvocations: Long +) diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt index 68233c1a..a3d0d34e 100644 --- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt +++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt @@ -123,7 +123,6 @@ public class SimFunctionDeployer( /** * Start the function instance. */ - @OptIn(InternalCoroutinesApi::class) internal fun start() { check(state == FunctionInstanceState.Provisioning) { "Invalid state of function instance" } job = scope.launch { diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt index 0dc9ba87..792a8584 100644 --- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt +++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt @@ -28,22 +28,25 @@ import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.faas.service.FaaSService import org.opendc.faas.service.autoscaler.FunctionTerminationPolicyFixed import org.opendc.faas.service.router.RandomRoutingPolicy -import org.opendc.faas.simulator.delay.ZeroDelayInjector +import org.opendc.faas.simulator.delay.ColdStartModel +import org.opendc.faas.simulator.delay.StochasticDelayInjector import org.opendc.faas.simulator.workload.SimFaaSWorkload import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit -import org.opendc.simulator.compute.workload.SimFlopsWorkload +import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation import java.time.Duration +import java.util.* /** * A test suite for the [FaaSService] implementation under simulated conditions. @@ -65,10 +68,15 @@ internal class SimFaaSServiceTest { @Test fun testSmoke() = runBlockingSimulation { - val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimFlopsWorkload(1000) { - override suspend fun invoke() {} + val random = Random(0) + val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimRuntimeWorkload(1000) { + override suspend fun invoke() { + delay(random.nextInt(1000).toLong()) + } }) - val deployer = SimFunctionDeployer(clock, this, machineModel, ZeroDelayInjector) { workload } + + val delayInjector = StochasticDelayInjector(ColdStartModel.GOOGLE, random) + val deployer = SimFunctionDeployer(clock, this, machineModel, delayInjector) { workload } val service = FaaSService( coroutineContext, clock, MeterProvider.noop(), deployer, RandomRoutingPolicy(), FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMillis(10000)) @@ -84,8 +92,15 @@ internal class SimFaaSServiceTest { yield() + val funcStats = service.getFunctionStats(function) + assertAll( { coVerify { workload.invoke() } }, + { assertEquals(1, funcStats.totalInvocations) }, + { assertEquals(1, funcStats.delayedInvocations) }, + { assertEquals(0, funcStats.failedInvocations) }, + { assertEquals(100.0, funcStats.waitTime.mean) }, + { assertEquals(1285.0, funcStats.activeTime.mean) }, ) } } |
