summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-faas/opendc-faas-service/build.gradle.kts1
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt13
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt104
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt35
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt48
-rw-r--r--opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt36
-rw-r--r--opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt1
-rw-r--r--opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt25
8 files changed, 236 insertions, 27 deletions
diff --git a/opendc-faas/opendc-faas-service/build.gradle.kts b/opendc-faas/opendc-faas-service/build.gradle.kts
index c54595d3..1803ae69 100644
--- a/opendc-faas/opendc-faas-service/build.gradle.kts
+++ b/opendc-faas/opendc-faas-service/build.gradle.kts
@@ -30,6 +30,7 @@ plugins {
dependencies {
api(projects.opendcFaas.opendcFaasApi)
api(projects.opendcTelemetry.opendcTelemetryApi)
+ api(libs.commons.math3)
implementation(projects.opendcCommon)
implementation(libs.kotlin.logging)
implementation(libs.opentelemetry.semconv)
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
index 1d5331cb..f7dc3c1f 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt
@@ -25,10 +25,13 @@ package org.opendc.faas.service
import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.api.metrics.MeterProvider
import org.opendc.faas.api.FaaSClient
+import org.opendc.faas.api.FaaSFunction
import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy
import org.opendc.faas.service.deployer.FunctionDeployer
import org.opendc.faas.service.internal.FaaSServiceImpl
import org.opendc.faas.service.router.RoutingPolicy
+import org.opendc.faas.service.telemetry.FunctionStats
+import org.opendc.faas.service.telemetry.SchedulerStats
import java.time.Clock
import kotlin.coroutines.CoroutineContext
@@ -42,6 +45,16 @@ public interface FaaSService : AutoCloseable {
public fun newClient(): FaaSClient
/**
+ * Collect statistics about the scheduler of the service.
+ */
+ public fun getSchedulerStats(): SchedulerStats
+
+ /**
+ * Collect statistics about the specified [function].
+ */
+ public fun getFunctionStats(function: FaaSFunction): FunctionStats
+
+ /**
* Terminate the lifecycle of the FaaS service, stopping all running function instances.
*/
public override fun close()
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
index 836231c8..52fcffa1 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt
@@ -29,7 +29,9 @@ import io.opentelemetry.api.metrics.LongHistogram
import io.opentelemetry.api.metrics.LongUpDownCounter
import io.opentelemetry.api.metrics.Meter
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics
import org.opendc.faas.service.deployer.FunctionInstance
+import org.opendc.faas.service.telemetry.FunctionStats
import java.util.*
/**
@@ -46,7 +48,7 @@ public class FunctionObject(
/**
* The attributes of this function.
*/
- public val attributes: Attributes = Attributes.builder()
+ private val attributes: Attributes = Attributes.builder()
.put(ResourceAttributes.FAAS_ID, uid.toString())
.put(ResourceAttributes.FAAS_NAME, name)
.put(ResourceAttributes.FAAS_MAX_MEMORY, allocatedMemory)
@@ -56,68 +58,78 @@ public class FunctionObject(
/**
* The total amount of function invocations received by the function.
*/
- public val invocations: LongCounter = meter.counterBuilder("function.invocations.total")
+ private val invocations: LongCounter = meter.counterBuilder("function.invocations.total")
.setDescription("Number of function invocations")
.setUnit("1")
.build()
+ private var _invocations = 0L
/**
* The amount of function invocations that could be handled directly.
*/
- public val timelyInvocations: LongCounter = meter.counterBuilder("function.invocations.warm")
+ private val timelyInvocations: LongCounter = meter.counterBuilder("function.invocations.warm")
.setDescription("Number of function invocations handled directly")
.setUnit("1")
.build()
+ private var _timelyInvocations = 0L
/**
* The amount of function invocations that were delayed due to function deployment.
*/
- public val delayedInvocations: LongCounter = meter.counterBuilder("function.invocations.cold")
+ private val delayedInvocations: LongCounter = meter.counterBuilder("function.invocations.cold")
.setDescription("Number of function invocations that are delayed")
.setUnit("1")
.build()
+ private var _delayedInvocations = 0L
/**
* The amount of function invocations that failed.
*/
- public val failedInvocations: LongCounter = meter.counterBuilder("function.invocations.failed")
+ private val failedInvocations: LongCounter = meter.counterBuilder("function.invocations.failed")
.setDescription("Number of function invocations that failed")
.setUnit("1")
.build()
+ private var _failedInvocations = 0L
/**
* The amount of instances for this function.
*/
- public val activeInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.active")
+ private val activeInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.active")
.setDescription("Number of active function instances")
.setUnit("1")
.build()
+ private var _activeInstances = 0
/**
* The amount of idle instances for this function.
*/
- public val idleInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle")
+ private val idleInstances: LongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle")
.setDescription("Number of idle function instances")
.setUnit("1")
.build()
+ private var _idleInstances = 0
/**
* The time that the function waited.
*/
- public val waitTime: LongHistogram = meter.histogramBuilder("function.time.wait")
+ private val waitTime: LongHistogram = meter.histogramBuilder("function.time.wait")
.ofLongs()
.setDescription("Time the function has to wait before being started")
.setUnit("ms")
.build()
+ private val _waitTime = DescriptiveStatistics()
+ .apply { windowSize = 100 }
/**
* The time that the function was running.
*/
- public val activeTime: LongHistogram = meter.histogramBuilder("function.time.active")
+ private val activeTime: LongHistogram = meter.histogramBuilder("function.time.active")
.ofLongs()
.setDescription("Time the function was running")
.setUnit("ms")
.build()
+ private val _activeTime = DescriptiveStatistics()
+ .apply { windowSize = 100 }
/**
* The instances associated with this function.
@@ -134,6 +146,80 @@ public class FunctionObject(
public val meta: MutableMap<String, Any> = meta.toMutableMap()
+ /**
+ * Report a scheduled invocation.
+ */
+ internal fun reportSubmission() {
+ invocations.add(1, attributes)
+ _invocations++
+ }
+
+ /**
+ * Report the deployment of an invocation.
+ */
+ internal fun reportDeployment(isDelayed: Boolean) {
+ if (isDelayed) {
+ delayedInvocations.add(1, attributes)
+ _delayedInvocations++
+
+ idleInstances.add(1, attributes)
+ _idleInstances++
+ } else {
+ timelyInvocations.add(1, attributes)
+ _timelyInvocations++
+ }
+ }
+
+ /**
+ * Report the start of a function invocation.
+ */
+ internal fun reportStart(start: Long, submitTime: Long) {
+ val wait = start - submitTime
+ waitTime.record(wait, attributes)
+ _waitTime.addValue(wait.toDouble())
+
+ idleInstances.add(-1, attributes)
+ _idleInstances--
+ activeInstances.add(1, attributes)
+ _activeInstances++
+ }
+
+ /**
+ * Report the failure of a function invocation.
+ */
+ internal fun reportFailure() {
+ failedInvocations.add(1, attributes)
+ _failedInvocations++
+ }
+
+ /**
+ * Report the end of a function invocation.
+ */
+ internal fun reportEnd(duration: Long) {
+ activeTime.record(duration, attributes)
+ _activeTime.addValue(duration.toDouble())
+ idleInstances.add(1, attributes)
+ _idleInstances++
+ activeInstances.add(-1, attributes)
+ _activeInstances--
+ }
+
+ /**
+ * Collect the statistics of this function.
+ */
+ internal fun getStats(): FunctionStats {
+ return FunctionStats(
+ _invocations,
+ _timelyInvocations,
+ _delayedInvocations,
+ _failedInvocations,
+ _activeInstances,
+ _idleInstances,
+ _waitTime.copy(),
+ _activeTime.copy()
+ )
+ }
+
override fun close() {
instances.forEach(FunctionInstance::close)
instances.clear()
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
index 1526be9d..ce3b2b98 100644
--- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt
@@ -38,6 +38,8 @@ import org.opendc.faas.service.deployer.FunctionInstance
import org.opendc.faas.service.deployer.FunctionInstanceListener
import org.opendc.faas.service.deployer.FunctionInstanceState
import org.opendc.faas.service.router.RoutingPolicy
+import org.opendc.faas.service.telemetry.FunctionStats
+import org.opendc.faas.service.telemetry.SchedulerStats
import java.lang.IllegalStateException
import java.time.Clock
import java.util.*
@@ -103,6 +105,7 @@ internal class FaaSServiceImpl(
.setDescription("Number of function invocations")
.setUnit("1")
.build()
+ private var totalInvocations = 0L
/**
* The amount of function invocations that could be handled directly.
@@ -111,6 +114,7 @@ internal class FaaSServiceImpl(
.setDescription("Number of function invocations handled directly")
.setUnit("1")
.build()
+ private var timelyInvocations = 0L
/**
* The amount of function invocations that were delayed due to function deployment.
@@ -119,6 +123,7 @@ internal class FaaSServiceImpl(
.setDescription("Number of function invocations that are delayed")
.setUnit("1")
.build()
+ private var delayedInvocations = 0L
override fun newClient(): FaaSClient {
return object : FaaSClient {
@@ -187,6 +192,15 @@ internal class FaaSServiceImpl(
}
}
+ override fun getSchedulerStats(): SchedulerStats {
+ return SchedulerStats(totalInvocations, timelyInvocations, delayedInvocations)
+ }
+
+ override fun getFunctionStats(function: FaaSFunction): FunctionStats {
+ val func = requireNotNull(functions[function.uid]) { "Unknown function" }
+ return func.getStats()
+ }
+
/**
* Indicate that a new scheduling cycle is needed due to a change to the service's state.
*/
@@ -219,7 +233,8 @@ internal class FaaSServiceImpl(
val instance = if (activeInstance != null) {
_timelyInvocations.add(1)
- function.timelyInvocations.add(1, function.attributes)
+ timelyInvocations++
+ function.reportDeployment(isDelayed = false)
activeInstance
} else {
@@ -227,29 +242,24 @@ internal class FaaSServiceImpl(
instances.add(instance)
terminationPolicy.enqueue(instance)
- function.idleInstances.add(1, function.attributes)
-
_delayedInvocations.add(1)
- function.delayedInvocations.add(1, function.attributes)
+ delayedInvocations++
+ function.reportDeployment(isDelayed = true)
instance
}
suspend {
val start = clock.millis()
- function.waitTime.record(start - submitTime, function.attributes)
- function.idleInstances.add(-1, function.attributes)
- function.activeInstances.add(1, function.attributes)
+ function.reportStart(start, submitTime)
try {
instance.invoke()
} catch (e: Throwable) {
logger.debug(e) { "Function invocation failed" }
- function.failedInvocations.add(1, function.attributes)
+ function.reportFailure()
} finally {
val end = clock.millis()
- function.activeTime.record(end - start, function.attributes)
- function.idleInstances.add(1, function.attributes)
- function.activeInstances.add(-1, function.attributes)
+ function.reportEnd(end - start)
}
}.startCoroutineCancellable(cont)
}
@@ -262,7 +272,8 @@ internal class FaaSServiceImpl(
check(function.uid in functions) { "Function does not exist (anymore)" }
_invocations.add(1)
- function.invocations.add(1, function.attributes)
+ totalInvocations++
+ function.reportSubmission()
return suspendCancellableCoroutine { cont ->
if (!queue.add(InvocationRequest(clock.millis(), function, cont))) {
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt
new file mode 100644
index 00000000..497ee423
--- /dev/null
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.faas.service.telemetry
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics
+
+/**
+ * Statistics about function invocations.
+ *
+ * @property totalInvocations The number of function invocations.
+ * @property timelyInvocations The number of function invocations that could be handled directly.
+ * @property delayedInvocations The number of function invocations that are delayed (cold starts).
+ * @property failedInvocations The number of function invocations that failed.
+ * @property activeInstances The number of active function instances.
+ * @property idleInstances The number of idle function instances.
+ * @property waitTime Statistics about the wait time of a function invocation.
+ * @property activeTime Statistics about the runtime of a function invocation.
+ */
+public data class FunctionStats(
+ val totalInvocations: Long,
+ val timelyInvocations: Long,
+ val delayedInvocations: Long,
+ val failedInvocations: Long,
+ val activeInstances: Int,
+ val idleInstances: Int,
+ val waitTime: DescriptiveStatistics,
+ val activeTime: DescriptiveStatistics
+)
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt
new file mode 100644
index 00000000..cabb1d56
--- /dev/null
+++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.faas.service.telemetry
+
+/**
+ * Statistics reported by the FaaS scheduler.
+ *
+ * @property totalInvocations The total amount of function invocations received by the scheduler.
+ * @property timelyInvocations The amount of function invocations that could be handled directly.
+ * @property delayedInvocations The amount of function invocations that were delayed due to function deployment.
+ */
+public data class SchedulerStats(
+ val totalInvocations: Long,
+ val timelyInvocations: Long,
+ val delayedInvocations: Long
+)
diff --git a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
index 68233c1a..a3d0d34e 100644
--- a/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
+++ b/opendc-faas/opendc-faas-simulator/src/main/kotlin/org/opendc/faas/simulator/SimFunctionDeployer.kt
@@ -123,7 +123,6 @@ public class SimFunctionDeployer(
/**
* Start the function instance.
*/
- @OptIn(InternalCoroutinesApi::class)
internal fun start() {
check(state == FunctionInstanceState.Provisioning) { "Invalid state of function instance" }
job = scope.launch {
diff --git a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
index 0dc9ba87..792a8584 100644
--- a/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
+++ b/opendc-faas/opendc-faas-simulator/src/test/kotlin/org/opendc/faas/simulator/SimFaaSServiceTest.kt
@@ -28,22 +28,25 @@ import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.yield
+import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.faas.service.FaaSService
import org.opendc.faas.service.autoscaler.FunctionTerminationPolicyFixed
import org.opendc.faas.service.router.RandomRoutingPolicy
-import org.opendc.faas.simulator.delay.ZeroDelayInjector
+import org.opendc.faas.simulator.delay.ColdStartModel
+import org.opendc.faas.simulator.delay.StochasticDelayInjector
import org.opendc.faas.simulator.workload.SimFaaSWorkload
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
-import org.opendc.simulator.compute.workload.SimFlopsWorkload
+import org.opendc.simulator.compute.workload.SimRuntimeWorkload
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.core.runBlockingSimulation
import java.time.Duration
+import java.util.*
/**
* A test suite for the [FaaSService] implementation under simulated conditions.
@@ -65,10 +68,15 @@ internal class SimFaaSServiceTest {
@Test
fun testSmoke() = runBlockingSimulation {
- val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimFlopsWorkload(1000) {
- override suspend fun invoke() {}
+ val random = Random(0)
+ val workload = spyk(object : SimFaaSWorkload, SimWorkload by SimRuntimeWorkload(1000) {
+ override suspend fun invoke() {
+ delay(random.nextInt(1000).toLong())
+ }
})
- val deployer = SimFunctionDeployer(clock, this, machineModel, ZeroDelayInjector) { workload }
+
+ val delayInjector = StochasticDelayInjector(ColdStartModel.GOOGLE, random)
+ val deployer = SimFunctionDeployer(clock, this, machineModel, delayInjector) { workload }
val service = FaaSService(
coroutineContext, clock, MeterProvider.noop(), deployer, RandomRoutingPolicy(),
FunctionTerminationPolicyFixed(coroutineContext, clock, timeout = Duration.ofMillis(10000))
@@ -84,8 +92,15 @@ internal class SimFaaSServiceTest {
yield()
+ val funcStats = service.getFunctionStats(function)
+
assertAll(
{ coVerify { workload.invoke() } },
+ { assertEquals(1, funcStats.totalInvocations) },
+ { assertEquals(1, funcStats.delayedInvocations) },
+ { assertEquals(0, funcStats.failedInvocations) },
+ { assertEquals(100.0, funcStats.waitTime.mean) },
+ { assertEquals(1285.0, funcStats.activeTime.mean) },
)
}
}