diff options
Diffstat (limited to 'opendc-faas/opendc-faas-service/src/main')
12 files changed, 95 insertions, 69 deletions
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt index 96619cdb..e9634ccc 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt @@ -72,7 +72,7 @@ public interface FaaSService : AutoCloseable { deployer: FunctionDeployer, routingPolicy: RoutingPolicy, terminationPolicy: FunctionTerminationPolicy, - quantum: Duration = Duration.ofMillis(100) + quantum: Duration = Duration.ofMillis(100), ): FaaSService { return FaaSServiceImpl(dispatcher, deployer, routingPolicy, terminationPolicy, quantum) } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt index 091e82a8..0ed96b96 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt @@ -35,21 +35,23 @@ public class FunctionObject( name: String, allocatedMemory: Long, labels: Map<String, String>, - meta: Map<String, Any> + meta: Map<String, Any>, ) : AutoCloseable { /** * Metrics tracked per function. */ - private var _invocations = 0L - private var _timelyInvocations = 0L - private var _delayedInvocations = 0L - private var _failedInvocations = 0L - private var _activeInstances = 0 - private var _idleInstances = 0 - private val _waitTime = DescriptiveStatistics() - .apply { windowSize = 100 } - private val _activeTime = DescriptiveStatistics() - .apply { windowSize = 100 } + private var localInvocations = 0L + private var localTimelyInvocations = 0L + private var localDelayedInvocations = 0L + private var localFailedInvocations = 0L + private var localActiveInstances = 0 + private var localIdleInstances = 0 + private val localWaitTime = + DescriptiveStatistics() + .apply { windowSize = 100 } + private val localActiveTime = + DescriptiveStatistics() + .apply { windowSize = 100 } /** * The instances associated with this function. @@ -70,7 +72,7 @@ public class FunctionObject( * Report a scheduled invocation. */ internal fun reportSubmission() { - _invocations++ + localInvocations++ } /** @@ -78,38 +80,41 @@ public class FunctionObject( */ internal fun reportDeployment(isDelayed: Boolean) { if (isDelayed) { - _delayedInvocations++ - _idleInstances++ + localDelayedInvocations++ + localIdleInstances++ } else { - _timelyInvocations++ + localTimelyInvocations++ } } /** * Report the start of a function invocation. */ - internal fun reportStart(start: Long, submitTime: Long) { + internal fun reportStart( + start: Long, + submitTime: Long, + ) { val wait = start - submitTime - _waitTime.addValue(wait.toDouble()) + localWaitTime.addValue(wait.toDouble()) - _idleInstances-- - _activeInstances++ + localIdleInstances-- + localActiveInstances++ } /** * Report the failure of a function invocation. */ internal fun reportFailure() { - _failedInvocations++ + localFailedInvocations++ } /** * Report the end of a function invocation. */ internal fun reportEnd(duration: Long) { - _activeTime.addValue(duration.toDouble()) - _idleInstances++ - _activeInstances-- + localActiveTime.addValue(duration.toDouble()) + localIdleInstances++ + localActiveInstances-- } /** @@ -117,14 +122,14 @@ public class FunctionObject( */ internal fun getStats(): FunctionStats { return FunctionStats( - _invocations, - _timelyInvocations, - _delayedInvocations, - _failedInvocations, - _activeInstances, - _idleInstances, - _waitTime.copy(), - _activeTime.copy() + localInvocations, + localTimelyInvocations, + localDelayedInvocations, + localFailedInvocations, + localActiveInstances, + localIdleInstances, + localWaitTime.copy(), + localActiveTime.copy(), ) } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt index a2c371e1..9edb8c1d 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt @@ -35,7 +35,7 @@ import java.time.Duration */ public class FunctionTerminationPolicyFixed( dispatcher: Dispatcher, - public val timeout: Duration + public val timeout: Duration, ) : FunctionTerminationPolicy { /** * The [TimerScheduler] used to schedule the function terminations. @@ -47,7 +47,10 @@ public class FunctionTerminationPolicyFixed( scheduler.cancel(instance) } - override fun onStateChanged(instance: FunctionInstance, newState: FunctionInstanceState) { + override fun onStateChanged( + instance: FunctionInstance, + newState: FunctionInstanceState, + ) { when (newState) { FunctionInstanceState.Active -> scheduler.cancel(instance) FunctionInstanceState.Idle -> schedule(instance) diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionDeployer.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionDeployer.kt index 18d16d06..13d48fbf 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionDeployer.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionDeployer.kt @@ -39,5 +39,8 @@ public interface FunctionDeployer { /** * Deploy the specified [function]. */ - public fun deploy(function: FunctionObject, listener: FunctionInstanceListener): FunctionInstance + public fun deploy( + function: FunctionObject, + listener: FunctionInstanceListener, + ): FunctionInstance } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceListener.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceListener.kt index 20e280a2..e88b7104 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceListener.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceListener.kt @@ -29,5 +29,8 @@ public interface FunctionInstanceListener { /** * This method is invoked when the state of a [FunctionInstance] has changed. */ - public fun onStateChanged(instance: FunctionInstance, newState: FunctionInstanceState) {} + public fun onStateChanged( + instance: FunctionInstance, + newState: FunctionInstanceState, + ) {} } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceState.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceState.kt index 2b6b6eba..0c310e6b 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceState.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceState.kt @@ -44,5 +44,5 @@ public enum class FunctionInstanceState { /** * The function instance is released and cannot be used anymore. */ - Deleted + Deleted, } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSFunctionImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSFunctionImpl.kt index 36532aa8..7cc85e40 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSFunctionImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSFunctionImpl.kt @@ -31,7 +31,7 @@ import java.util.UUID */ internal class FaaSFunctionImpl( private val service: FaaSServiceImpl, - private val state: FunctionObject + private val state: FunctionObject, ) : FaaSFunction { override val uid: UUID = state.uid diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index b1e6b3f5..397b0e7d 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -61,7 +61,7 @@ internal class FaaSServiceImpl( private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy, private val terminationPolicy: FunctionTerminationPolicy, - quantum: Duration + quantum: Duration, ) : FaaSService, FunctionInstanceListener { /** * The logger instance of this server. @@ -134,19 +134,20 @@ internal class FaaSServiceImpl( name: String, memorySize: Long, labels: Map<String, String>, - meta: Map<String, Any> + meta: Map<String, Any>, ): FaaSFunction { check(!isClosed) { "Client is already closed" } require(name !in functionsByName) { "Function with same name exists" } val uid = UUID(clock.millis(), random.nextLong()) - val function = FunctionObject( - uid, - name, - memorySize, - labels, - meta - ) + val function = + FunctionObject( + uid, + name, + memorySize, + labels, + meta, + ) functionsByName[name] = function functions[uid] = function @@ -200,27 +201,29 @@ internal class FaaSServiceImpl( val instances = function.instances // Check if there exists an instance of the function - val activeInstance = if (instances.isNotEmpty()) { - routingPolicy.select(instances, function) - } else { - null - } + val activeInstance = + if (instances.isNotEmpty()) { + routingPolicy.select(instances, function) + } else { + null + } - val instance = if (activeInstance != null) { - timelyInvocations++ - function.reportDeployment(isDelayed = false) + val instance = + if (activeInstance != null) { + timelyInvocations++ + function.reportDeployment(isDelayed = false) - activeInstance - } else { - val instance = deployer.deploy(function, this) - instances.add(instance) - terminationPolicy.enqueue(instance) + activeInstance + } else { + val instance = deployer.deploy(function, this) + instances.add(instance) + terminationPolicy.enqueue(instance) - delayedInvocations++ - function.reportDeployment(isDelayed = true) + delayedInvocations++ + function.reportDeployment(isDelayed = true) - instance - } + instance + } suspend { val start = clock.millis() @@ -268,7 +271,10 @@ internal class FaaSServiceImpl( } } - override fun onStateChanged(instance: FunctionInstance, newState: FunctionInstanceState) { + override fun onStateChanged( + instance: FunctionInstance, + newState: FunctionInstanceState, + ) { terminationPolicy.onStateChanged(instance, newState) if (newState == FunctionInstanceState.Deleted) { diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt index 22bf7266..1eb03e5a 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt @@ -31,7 +31,10 @@ import java.util.random.RandomGenerator * A [RoutingPolicy] that selects a random function instance. */ public class RandomRoutingPolicy(private val random: RandomGenerator = SplittableRandom(0)) : RoutingPolicy { - override fun select(instances: List<FunctionInstance>, function: FunctionObject): FunctionInstance { + override fun select( + instances: List<FunctionInstance>, + function: FunctionObject, + ): FunctionInstance { val idx = random.nextInt(instances.size) return instances.elementAt(idx) } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RoutingPolicy.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RoutingPolicy.kt index e99e329a..c8ea37fc 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RoutingPolicy.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RoutingPolicy.kt @@ -32,5 +32,8 @@ public interface RoutingPolicy { /** * Select the instance to which the request should be routed to. */ - public fun select(instances: List<FunctionInstance>, function: FunctionObject): FunctionInstance? + public fun select( + instances: List<FunctionInstance>, + function: FunctionObject, + ): FunctionInstance? } diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt index 497ee423..db6db6c1 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/FunctionStats.kt @@ -44,5 +44,5 @@ public data class FunctionStats( val activeInstances: Int, val idleInstances: Int, val waitTime: DescriptiveStatistics, - val activeTime: DescriptiveStatistics + val activeTime: DescriptiveStatistics, ) diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt index cabb1d56..b65dfb03 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/telemetry/SchedulerStats.kt @@ -32,5 +32,5 @@ package org.opendc.faas.service.telemetry public data class SchedulerStats( val totalInvocations: Long, val timelyInvocations: Long, - val delayedInvocations: Long + val delayedInvocations: Long, ) |
