diff options
Diffstat (limited to 'opendc-faas/opendc-faas-service/src')
14 files changed, 1155 insertions, 0 deletions
diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt new file mode 100644 index 00000000..7e716a34 --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FaaSService.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service + +import io.opentelemetry.api.metrics.Meter +import org.opendc.faas.api.FaaSClient +import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy +import org.opendc.faas.service.deployer.FunctionDeployer +import org.opendc.faas.service.internal.FaaSServiceImpl +import org.opendc.faas.service.router.RoutingPolicy +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * The [FaaSService] hosts the service implementation of the OpenDC FaaS platform. + */ +public interface FaaSService : AutoCloseable { + /** + * Create a new [FaaSClient] to control the compute service. + */ + public fun newClient(): FaaSClient + + /** + * Terminate the lifecycle of the FaaS service, stopping all running function instances. + */ + public override fun close() + + public companion object { + /** + * Construct a new [FaaSService] implementation. + * + * @param context The [CoroutineContext] to use in the service. + * @param clock The clock instance to use. + * @param meter The meter to report metrics to. + * @param deployer the [FunctionDeployer] to use for deploying function instances. + * @param routingPolicy The policy to route function invocations. + * @param terminationPolicy The policy for terminating function instances. + */ + public operator fun invoke( + context: CoroutineContext, + clock: Clock, + meter: Meter, + deployer: FunctionDeployer, + routingPolicy: RoutingPolicy, + terminationPolicy: FunctionTerminationPolicy, + ): FaaSService { + return FaaSServiceImpl(context, clock, meter, deployer, routingPolicy, terminationPolicy) + } + } +} diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt new file mode 100644 index 00000000..7c7621b8 --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service + +import io.opentelemetry.api.metrics.BoundLongCounter +import io.opentelemetry.api.metrics.BoundLongUpDownCounter +import io.opentelemetry.api.metrics.BoundLongValueRecorder +import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.common.Labels +import org.opendc.faas.service.deployer.FunctionInstance +import java.util.* + +/** + * An [FunctionObject] represents the service's view of a serverless function. + */ +public class FunctionObject( + meter: Meter, + public val uid: UUID, + name: String, + allocatedMemory: Long, + labels: Map<String, String>, + meta: Map<String, Any> +) : AutoCloseable { + /** + * The total amount of function invocations received by the function. + */ + public val invocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.total") + .setDescription("Number of function invocations") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of function invocations that could be handled directly. + */ + public val timelyInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.warm") + .setDescription("Number of function invocations handled directly") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of function invocations that were delayed due to function deployment. + */ + public val delayedInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.cold") + .setDescription("Number of function invocations that are delayed") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of function invocations that failed. + */ + public val failedInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.failed") + .setDescription("Number of function invocations that failed") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of instances for this function. + */ + public val activeInstances: BoundLongUpDownCounter = meter.longUpDownCounterBuilder("function.instances.active") + .setDescription("Number of active function instances") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of idle instances for this function. + */ + public val idleInstances: BoundLongUpDownCounter = meter.longUpDownCounterBuilder("function.instances.idle") + .setDescription("Number of idle function instances") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The time that the function waited. + */ + public val waitTime: BoundLongValueRecorder = meter.longValueRecorderBuilder("function.time.wait") + .setDescription("Time the function has to wait before being started") + .setUnit("ms") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The time that the function was running. + */ + public val activeTime: BoundLongValueRecorder = meter.longValueRecorderBuilder("function.time.active") + .setDescription("Time the function was running") + .setUnit("ms") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The instances associated with this function. + */ + public val instances: MutableList<FunctionInstance> = mutableListOf() + + public var name: String = name + private set + + public var memorySize: Long = allocatedMemory + private set + + public val labels: MutableMap<String, String> = labels.toMutableMap() + + public val meta: MutableMap<String, Any> = meta.toMutableMap() + + override fun close() { + instances.forEach(FunctionInstance::close) + instances.clear() + } + + override fun equals(other: Any?): Boolean = other is FunctionObject && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicy.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicy.kt new file mode 100644 index 00000000..2ab3638b --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicy.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service.autoscaler + +import org.opendc.faas.service.deployer.FunctionInstance +import org.opendc.faas.service.deployer.FunctionInstanceListener + +/** + * A management policy that is responsible for downscaling the active function instances for a function. + */ +public interface FunctionTerminationPolicy : FunctionInstanceListener { + /** + * Enqueue the specified [instance] to be scheduled for termination a + */ + public fun enqueue(instance: FunctionInstance) +} diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt new file mode 100644 index 00000000..1e224ed1 --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyFixed.kt @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service.autoscaler + +import org.opendc.faas.service.deployer.FunctionInstance +import org.opendc.faas.service.deployer.FunctionInstanceState +import org.opendc.utils.TimerScheduler +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * A [FunctionTerminationPolicy] that terminates idle function instances after a fixed keep-alive time. + * + * @param timeout The idle timeout after which the function instance is terminated. + */ +public class FunctionTerminationPolicyFixed( + context: CoroutineContext, + clock: Clock, + public val timeout: Long +) : FunctionTerminationPolicy { + /** + * The [TimerScheduler] used to schedule the function terminations. + */ + private val scheduler = TimerScheduler<FunctionInstance>(context, clock) + + override fun enqueue(instance: FunctionInstance) { + // Cancel the existing timeout timer + scheduler.cancel(instance) + } + + override fun onStateChanged(instance: FunctionInstance, newState: FunctionInstanceState) { + when (newState) { + FunctionInstanceState.Active -> scheduler.cancel(instance) + FunctionInstanceState.Idle -> schedule(instance) + else -> {} + } + } + + /** + * Schedule termination for the specified [instance]. + */ + private fun schedule(instance: FunctionInstance) { + scheduler.startSingleTimer(instance, delay = timeout) { instance.close() } + } +} diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyNull.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyNull.kt new file mode 100644 index 00000000..957e569b --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/autoscaler/FunctionTerminationPolicyNull.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service.autoscaler + +import org.opendc.faas.service.deployer.FunctionInstance + +/** + * A [FunctionTerminationPolicy] that never terminates function instances. + */ +public class FunctionTerminationPolicyNull : FunctionTerminationPolicy { + override fun enqueue(instance: FunctionInstance) { + // No-op + } +} diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionDeployer.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionDeployer.kt new file mode 100644 index 00000000..049f1cc7 --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionDeployer.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service.deployer + +import org.opendc.faas.service.FunctionObject + +/** + * A [FunctionDeployer] is responsible for ensuring that an instance of an arbitrary function, a [FunctionInstance], + * is deployed. + * + * The function deployer should combines the configuration stored in the function registry, the parameters supplied by + * the requester, and other factors into a decision of how the function should be deployed, including how many and + * what kind of resources it should receive. + * + * Though it decides how the function instance should be deployed, the deployment of the function instance itself is + * delegated to the Resource Orchestration Layer. + */ +public interface FunctionDeployer { + /** + * Deploy the specified [function]. + */ + public fun deploy(function: FunctionObject, listener: FunctionInstanceListener): FunctionInstance +} diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstance.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstance.kt new file mode 100644 index 00000000..a8b04df4 --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstance.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service.deployer + +import org.opendc.faas.service.FunctionObject + +/** + * A [FunctionInstance] is a a self-contained worker—typically a container—capable of handling function executions. + * + * Multiple, concurrent function instances can exists for a single function, for scalability purposes. + */ +public interface FunctionInstance : AutoCloseable { + /** + * The state of the instance. + */ + public val state: FunctionInstanceState + + /** + * The [FunctionObject] that is represented by this instance. + */ + public val function: FunctionObject + + /** + * Invoke the function instance. + * + * This method will suspend execution util the function instance has returned. + */ + public suspend fun invoke() + + /** + * Indicate to the resource manager that the instance is not needed anymore and may be cleaned up by the resource + * manager. + */ + public override fun close() +} diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceListener.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceListener.kt new file mode 100644 index 00000000..20e280a2 --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceListener.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service.deployer + +/** + * Listener interface for events originating from a [FunctionInstance]. + */ +public interface FunctionInstanceListener { + /** + * This method is invoked when the state of a [FunctionInstance] has changed. + */ + public fun onStateChanged(instance: FunctionInstance, newState: FunctionInstanceState) {} +} diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceState.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceState.kt new file mode 100644 index 00000000..2b6b6eba --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/deployer/FunctionInstanceState.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service.deployer + +/** + * This enumeration describes the states of a [FunctionInstance]. + */ +public enum class FunctionInstanceState { + /** + * The function instance is currently being provisioned. + */ + Provisioning, + + /** + * The function instance is idle and ready to execute. + */ + Idle, + + /** + * The function instance is executing. + */ + Active, + + /** + * The function instance is released and cannot be used anymore. + */ + Deleted +} diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSFunctionImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSFunctionImpl.kt new file mode 100644 index 00000000..bd7f13f6 --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSFunctionImpl.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service.internal + +import org.opendc.faas.api.FaaSFunction +import org.opendc.faas.service.FunctionObject +import java.util.* + +/** + * A [FaaSFunction] implementation that is passed to clients. + */ +internal class FaaSFunctionImpl( + private val service: FaaSServiceImpl, + private val state: FunctionObject +) : FaaSFunction { + override val uid: UUID = state.uid + + override var name: String = state.name + private set + + override var memorySize: Long = state.memorySize + private set + + override var labels: Map<String, String> = state.labels.toMap() + private set + + override var meta: Map<String, Any> = state.meta.toMap() + private set + + override suspend fun delete() { + service.delete(state) + } + + override suspend fun invoke() { + service.invoke(state) + } + + override suspend fun refresh() { + name = state.name + memorySize = state.memorySize + labels = state.labels + meta = state.meta + } + + override fun equals(other: Any?): Boolean = other is FaaSFunctionImpl && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "FaaSFunction[uid=$uid,name=$name]" +} diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt new file mode 100644 index 00000000..b169436f --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -0,0 +1,304 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service.internal + +import io.opentelemetry.api.metrics.Meter +import kotlinx.coroutines.* +import kotlinx.coroutines.intrinsics.startCoroutineCancellable +import mu.KotlinLogging +import org.opendc.faas.api.FaaSClient +import org.opendc.faas.api.FaaSFunction +import org.opendc.faas.service.FaaSService +import org.opendc.faas.service.FunctionObject +import org.opendc.faas.service.autoscaler.FunctionTerminationPolicy +import org.opendc.faas.service.deployer.FunctionDeployer +import org.opendc.faas.service.deployer.FunctionInstance +import org.opendc.faas.service.deployer.FunctionInstanceListener +import org.opendc.faas.service.deployer.FunctionInstanceState +import org.opendc.faas.service.router.RoutingPolicy +import org.opendc.utils.TimerScheduler +import java.lang.IllegalStateException +import java.time.Clock +import java.util.* +import kotlin.coroutines.Continuation +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.resumeWithException + +/** + * Implementation of the [FaaSService] interface. + * + * This component acts as the function router from the SPEC RG Reference Architecture for FaaS and is responsible + * for routing incoming requests or events to the correct [FunctionInstance]. If no [FunctionInstance] is available, + * this component queues the events to await the deployment of new instances. + */ +internal class FaaSServiceImpl( + context: CoroutineContext, + private val clock: Clock, + private val meter: Meter, + private val deployer: FunctionDeployer, + private val routingPolicy: RoutingPolicy, + private val terminationPolicy: FunctionTerminationPolicy +) : FaaSService, FunctionInstanceListener { + /** + * The [CoroutineScope] of the service bounded by the lifecycle of the service. + */ + private val scope = CoroutineScope(context + Job()) + + /** + * The logger instance of this server. + */ + private val logger = KotlinLogging.logger {} + + /** + * The [TimerScheduler] to use for scheduling the scheduler cycles. + */ + private val scheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock) + + /** + * The [Random] instance used to generate unique identifiers for the objects. + */ + private val random = Random(0) + + /** + * The registered functions for this service. + */ + private val functions = mutableMapOf<UUID, FunctionObject>() + private val functionsByName = mutableMapOf<String, FunctionObject>() + + /** + * The queue of invocation requests. + */ + private val queue = ArrayDeque<InvocationRequest>() + + /** + * The total amount of function invocations received by the service. + */ + private val _invocations = meter.longCounterBuilder("service.invocations.total") + .setDescription("Number of function invocations") + .setUnit("1") + .build() + + /** + * The amount of function invocations that could be handled directly. + */ + private val _timelyInvocations = meter.longCounterBuilder("service.invocations.warm") + .setDescription("Number of function invocations handled directly") + .setUnit("1") + .build() + + /** + * The amount of function invocations that were delayed due to function deployment. + */ + private val _delayedInvocations = meter.longCounterBuilder("service.invocations.cold") + .setDescription("Number of function invocations that are delayed") + .setUnit("1") + .build() + + override fun newClient(): FaaSClient { + return object : FaaSClient { + private var isClosed: Boolean = false + + /** + * Exposes a [FunctionObject] to a client-exposed [FaaSFunction] instance. + */ + private fun FunctionObject.asClientFunction(): FaaSFunction { + return FaaSFunctionImpl(this@FaaSServiceImpl, this) + } + + override suspend fun queryFunctions(): List<FaaSFunction> { + check(!isClosed) { "Client is already closed" } + + return functions.values.map { it.asClientFunction() } + } + + override suspend fun findFunction(id: UUID): FaaSFunction? { + check(!isClosed) { "Client is already closed" } + + return functions[id]?.asClientFunction() + } + + override suspend fun findFunction(name: String): FaaSFunction? { + check(!isClosed) { "Client is already closed" } + + return functionsByName[name]?.asClientFunction() + } + + override suspend fun newFunction( + name: String, + memorySize: Long, + labels: Map<String, String>, + meta: Map<String, Any> + ): FaaSFunction { + check(!isClosed) { "Client is already closed" } + require(name !in functionsByName) { "Function with same name exists" } + + val uid = UUID(clock.millis(), random.nextLong()) + val function = FunctionObject( + meter, + uid, + name, + memorySize, + labels, + meta + ) + + functionsByName[name] = function + functions[uid] = function + + return function.asClientFunction() + } + + override suspend fun invoke(name: String) { + check(!isClosed) { "Client is already closed" } + + val func = requireNotNull(functionsByName[name]) { "Unknown function" } + this@FaaSServiceImpl.invoke(func) + } + + override fun close() { + isClosed = true + } + } + } + + /** + * Indicate that a new scheduling cycle is needed due to a change to the service's state. + */ + private fun schedule() { + // Bail out in case we have already requested a new cycle or the queue is empty. + if (scheduler.isTimerActive(Unit) || queue.isEmpty()) { + return + } + + val quantum = 100 + + // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). + // This is important because the slices of the VMs need to be aligned. + // We calculate here the delay until the next scheduling slot. + val delay = quantum - (clock.millis() % quantum) + + scheduler.startSingleTimer(Unit, delay, ::doSchedule) + } + + /** + * Run a single scheduling iteration. + */ + @OptIn(InternalCoroutinesApi::class) + private fun doSchedule() { + try { + while (queue.isNotEmpty()) { + val (submitTime, function, cont) = queue.poll() + + val instances = function.instances + + // Check if there exists an instance of the function + val activeInstance = if (instances.isNotEmpty()) { + routingPolicy.select(instances, function) + } else { + null + } + + val instance = if (activeInstance != null) { + _timelyInvocations.add(1) + function.timelyInvocations.add(1) + + activeInstance + } else { + val instance = deployer.deploy(function, this) + instances.add(instance) + terminationPolicy.enqueue(instance) + + function.idleInstances.add(1) + + _delayedInvocations.add(1) + function.delayedInvocations.add(1) + + instance + } + + suspend { + val start = clock.millis() + function.waitTime.record(start - submitTime) + function.idleInstances.add(-1) + function.activeInstances.add(1) + try { + instance.invoke() + } catch (e: Throwable) { + logger.debug(e) { "Function invocation failed" } + function.failedInvocations.add(1) + } finally { + val end = clock.millis() + function.activeTime.record(end - start) + function.idleInstances.add(1) + function.activeInstances.add(-1) + } + }.startCoroutineCancellable(cont) + } + } catch (cause: Throwable) { + logger.error(cause) { "Exception occurred during scheduling cycle" } + } + } + + suspend fun invoke(function: FunctionObject) { + check(function.uid in functions) { "Function does not exist (anymore)" } + + _invocations.add(1) + function.invocations.add(1) + + return suspendCancellableCoroutine { cont -> + if (!queue.add(InvocationRequest(clock.millis(), function, cont))) { + cont.resumeWithException(IllegalStateException("Failed to enqueue request")) + } else { + schedule() + } + } + } + + fun delete(function: FunctionObject) { + functions.remove(function.uid) + functionsByName.remove(function.name) + } + + override fun close() { + scope.cancel() + + // Stop all function instances + for ((_, function) in functions) { + function.close() + } + } + + override fun onStateChanged(instance: FunctionInstance, newState: FunctionInstanceState) { + terminationPolicy.onStateChanged(instance, newState) + + if (newState == FunctionInstanceState.Deleted) { + val function = instance.function + function.instances.remove(instance) + } + } + + /** + * A request to invoke a function. + */ + private data class InvocationRequest(val timestamp: Long, val function: FunctionObject, val cont: Continuation<Unit>) +} diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt new file mode 100644 index 00000000..5bd9d4d3 --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RandomRoutingPolicy.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service.router + +import org.opendc.faas.service.FunctionObject +import org.opendc.faas.service.deployer.FunctionInstance +import kotlin.random.Random + +/** + * A [RoutingPolicy] that selects a random function instance. + */ +public class RandomRoutingPolicy(private val random: Random = Random(0)) : RoutingPolicy { + override fun select(instances: List<FunctionInstance>, function: FunctionObject): FunctionInstance { + return instances.random(random) + } +} diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RoutingPolicy.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RoutingPolicy.kt new file mode 100644 index 00000000..e99e329a --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/router/RoutingPolicy.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service.router + +import org.opendc.faas.service.FunctionObject +import org.opendc.faas.service.deployer.FunctionInstance + +/** + * A [RoutingPolicy] decides to which [FunctionInstance] a function invocation should be routed. + */ +public interface RoutingPolicy { + /** + * Select the instance to which the request should be routed to. + */ + public fun select(instances: List<FunctionInstance>, function: FunctionObject): FunctionInstance? +} diff --git a/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt new file mode 100644 index 00000000..6b99684a --- /dev/null +++ b/opendc-faas/opendc-faas-service/src/test/kotlin/org/opendc/faas/service/FaaSServiceTest.kt @@ -0,0 +1,186 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.faas.service + +import io.mockk.* +import io.opentelemetry.api.metrics.MeterProvider +import kotlinx.coroutines.ExperimentalCoroutinesApi +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows +import org.opendc.faas.api.FaaSFunction +import org.opendc.faas.service.deployer.FunctionDeployer +import org.opendc.faas.service.deployer.FunctionInstance +import org.opendc.faas.service.deployer.FunctionInstanceState +import org.opendc.simulator.core.runBlockingSimulation +import java.util.* + +/** + * Test suite for the [FaaSService] implementation. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class FaaSServiceTest { + + @Test + fun testClientState() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-faas") + val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + + val client = assertDoesNotThrow { service.newClient() } + assertDoesNotThrow { client.close() } + + assertThrows<IllegalStateException> { client.queryFunctions() } + assertThrows<IllegalStateException> { client.newFunction("test", 128) } + assertThrows<IllegalStateException> { client.invoke("test") } + assertThrows<IllegalStateException> { client.findFunction(UUID.randomUUID()) } + assertThrows<IllegalStateException> { client.findFunction("name") } + } + + @Test + fun testClientInvokeUnknown() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-faas") + val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + + val client = service.newClient() + + assertThrows<IllegalArgumentException> { client.invoke("test") } + } + + @Test + fun testClientFunctionCreation() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-faas") + val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + + val client = service.newClient() + + val function = client.newFunction("test", 128) + + assertEquals("test", function.name) + } + + @Test + fun testClientFunctionQuery() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-faas") + val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + + val client = service.newClient() + + assertEquals(emptyList<FaaSFunction>(), client.queryFunctions()) + + val function = client.newFunction("test", 128) + + assertEquals(listOf(function), client.queryFunctions()) + } + + @Test + fun testClientFunctionFindById() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-faas") + val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + + val client = service.newClient() + + assertEquals(emptyList<FaaSFunction>(), client.queryFunctions()) + + val function = client.newFunction("test", 128) + + assertNotNull(client.findFunction(function.uid)) + } + + @Test + fun testClientFunctionFindByName() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-faas") + val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + + val client = service.newClient() + + assertEquals(emptyList<FaaSFunction>(), client.queryFunctions()) + + val function = client.newFunction("test", 128) + + assertNotNull(client.findFunction(function.name)) + } + + @Test + fun testClientFunctionDuplicateName() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-faas") + val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + + val client = service.newClient() + + client.newFunction("test", 128) + + assertThrows<IllegalArgumentException> { client.newFunction("test", 128) } + } + + @Test + fun testClientFunctionDelete() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-faas") + val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + + val client = service.newClient() + val function = client.newFunction("test", 128) + assertNotNull(client.findFunction(function.uid)) + function.delete() + assertNull(client.findFunction(function.uid)) + + // Delete should be idempotent + function.delete() + } + + @Test + fun testClientFunctionCannotInvokeDeleted() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-faas") + val service = FaaSService(coroutineContext, clock, meter, mockk(), mockk(), mockk()) + + val client = service.newClient() + val function = client.newFunction("test", 128) + assertNotNull(client.findFunction(function.uid)) + function.delete() + + assertThrows<IllegalStateException> { function.invoke() } + } + + @Test + fun testClientFunctionInvoke() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-faas") + val deployer = mockk<FunctionDeployer>() + val service = FaaSService(coroutineContext, clock, meter, deployer, mockk(), mockk(relaxUnitFun = true)) + + every { deployer.deploy(any(), any()) } answers { + object : FunctionInstance { + override val state: FunctionInstanceState = FunctionInstanceState.Idle + override val function: FunctionObject = it.invocation.args[0] as FunctionObject + + override suspend fun invoke() {} + + override fun close() {} + } + } + + val client = service.newClient() + val function = client.newFunction("test", 128) + + function.invoke() + } +} |
