diff options
Diffstat (limited to 'opendc-serverless/opendc-serverless-service')
11 files changed, 1016 insertions, 0 deletions
diff --git a/opendc-serverless/opendc-serverless-service/build.gradle.kts b/opendc-serverless/opendc-serverless-service/build.gradle.kts new file mode 100644 index 00000000..f7e43aba --- /dev/null +++ b/opendc-serverless/opendc-serverless-service/build.gradle.kts @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Serverless service for OpenDC" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` +} + +dependencies { + api(platform(project(":opendc-platform"))) + api(project(":opendc-serverless:opendc-serverless-api")) + api(project(":opendc-telemetry:opendc-telemetry-api")) + implementation(project(":opendc-utils")) + implementation("io.github.microutils:kotlin-logging") + + testImplementation(project(":opendc-simulator:opendc-simulator-core")) + testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl") +} diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/FunctionObject.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/FunctionObject.kt new file mode 100644 index 00000000..c12bbfe2 --- /dev/null +++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/FunctionObject.kt @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.serverless.service + +import io.opentelemetry.api.metrics.BoundLongCounter +import io.opentelemetry.api.metrics.BoundLongUpDownCounter +import io.opentelemetry.api.metrics.BoundLongValueRecorder +import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.common.Labels +import org.opendc.serverless.service.deployer.FunctionInstance +import java.util.* + +/** + * An [FunctionObject] represents the service's view of a serverless function. + */ +public class FunctionObject( + meter: Meter, + public val uid: UUID, + name: String, + allocatedMemory: Long, + labels: Map<String, String>, + meta: Map<String, Any> +) : AutoCloseable { + /** + * The total amount of function invocations received by the function. + */ + public val invocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.total") + .setDescription("Number of function invocations") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of function invocations that could be handled directly. + */ + public val timelyInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.warm") + .setDescription("Number of function invocations handled directly") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of function invocations that were delayed due to function deployment. + */ + public val delayedInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.cold") + .setDescription("Number of function invocations that are delayed") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of function invocations that failed. + */ + public val failedInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.failed") + .setDescription("Number of function invocations that failed") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of instances for this function. + */ + public val activeInstances: BoundLongUpDownCounter = meter.longUpDownCounterBuilder("function.instances.active") + .setDescription("Number of active function instances") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of idle instances for this function. + */ + public val idleInstances: BoundLongUpDownCounter = meter.longUpDownCounterBuilder("function.instances.idle") + .setDescription("Number of idle function instances") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The time that the function waited. + */ + public val waitTime: BoundLongValueRecorder = meter.longValueRecorderBuilder("function.time.wait") + .setDescription("Time the function has to wait before being started") + .setUnit("ms") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The time that the function was running. + */ + public val activeTime: BoundLongValueRecorder = meter.longValueRecorderBuilder("function.time.active") + .setDescription("Time the function was running") + .setUnit("ms") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The instances associated with this function. + */ + public val instances: MutableList<FunctionInstance> = mutableListOf() + + public var name: String = name + private set + + public var memorySize: Long = allocatedMemory + private set + + public val labels: MutableMap<String, String> = labels.toMutableMap() + + public val meta: MutableMap<String, Any> = meta.toMutableMap() + + override fun close() { + instances.forEach(FunctionInstance::close) + instances.clear() + } + + override fun equals(other: Any?): Boolean = other is FunctionObject && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt new file mode 100644 index 00000000..a791c815 --- /dev/null +++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.serverless.service + +import io.opentelemetry.api.metrics.Meter +import org.opendc.serverless.api.ServerlessClient +import org.opendc.serverless.service.deployer.FunctionDeployer +import org.opendc.serverless.service.internal.ServerlessServiceImpl +import org.opendc.serverless.service.router.RoutingPolicy +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * The [ServerlessService] hosts the API implementation of the OpenDC Serverless service. + */ +public interface ServerlessService : AutoCloseable { + /** + * Create a new [ServerlessClient] to control the compute service. + */ + public fun newClient(): ServerlessClient + + /** + * Terminate the lifecycle of the serverless service, stopping all running function instances. + */ + public override fun close() + + public companion object { + /** + * Construct a new [ServerlessService] implementation. + * + * @param context The [CoroutineContext] to use in the service. + * @param clock The clock instance to use. + * @param meter The meter to report metrics to. + * @param deployer the [FunctionDeployer] to use for deploying function instances. + * @param routingPolicy The policy to route function invocations. + */ + public operator fun invoke( + context: CoroutineContext, + clock: Clock, + meter: Meter, + deployer: FunctionDeployer, + routingPolicy: RoutingPolicy, + ): ServerlessService { + return ServerlessServiceImpl(context, clock, meter, deployer, routingPolicy) + } + } +} diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt new file mode 100644 index 00000000..83592a68 --- /dev/null +++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.serverless.service.deployer + +import org.opendc.serverless.service.FunctionObject + +/** + * A [FunctionDeployer] is responsible for ensuring that an instance of an arbitrary function, a [FunctionInstance], + * is deployed. + * + * The function deployer should combines the configuration stored in the function registry, the parameters supplied by + * the requester, and other factors into a decision of how the function should be deployed, including how many and + * what kind of resources it should receive. + * + * Though it decides how the function instance should be deployed, the deployment of the function instance itself is + * delegated to the Resource Orchestration Layer. + */ +public interface FunctionDeployer { + /** + * Deploy the specified [function]. + */ + public fun deploy(function: FunctionObject): FunctionInstance +} diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt new file mode 100644 index 00000000..d60648ea --- /dev/null +++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.serverless.service.deployer + +import org.opendc.serverless.service.FunctionObject + +/** + * A [FunctionInstance] is a a self-contained worker—typically a container—capable of handling function executions. + * + * Multiple, concurrent function instances can exists for a single function, for scalability purposes. + */ +public interface FunctionInstance : AutoCloseable { + /** + * The state of the instance. + */ + public val state: FunctionInstanceState + + /** + * The [FunctionObject] that is represented by this instance. + */ + public val function: FunctionObject + + /** + * Invoke the function instance. + * + * This method will suspend execution util the function instance has returned. + */ + public suspend fun invoke() + + /** + * Indicate to the resource manager that the instance is not needed anymore and may be cleaned up by the resource + * manager. + */ + public override fun close() +} diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstanceState.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstanceState.kt new file mode 100644 index 00000000..44ad80ee --- /dev/null +++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstanceState.kt @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.serverless.service.deployer + +/** + * This enumeration describes the states of a [FunctionInstance]. + */ +public enum class FunctionInstanceState { + /** + * The function instance is currently being provisioned. + */ + Provisioning, + + /** + * The function instance is idle and ready to execute. + */ + Idle, + + /** + * The function instance is executing. + */ + Active, + + /** + * The function instance is stopped but can be started. + */ + Terminated, + + /** + * The function instance is released and cannot be used anymore. + */ + Deleted +} diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessFunctionImpl.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessFunctionImpl.kt new file mode 100644 index 00000000..80b50e77 --- /dev/null +++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessFunctionImpl.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.serverless.service.internal + +import org.opendc.serverless.api.ServerlessFunction +import org.opendc.serverless.service.FunctionObject +import java.util.* + +/** + * A [ServerlessFunction] implementation that is passed to clients. + */ +internal class ServerlessFunctionImpl( + private val service: ServerlessServiceImpl, + private val state: FunctionObject +) : ServerlessFunction { + override val uid: UUID = state.uid + + override var name: String = state.name + private set + + override var memorySize: Long = state.memorySize + private set + + override var labels: Map<String, String> = state.labels.toMap() + private set + + override var meta: Map<String, Any> = state.meta.toMap() + private set + + override suspend fun delete() { + service.delete(state) + } + + override suspend fun invoke() { + service.invoke(state) + } + + override suspend fun refresh() { + name = state.name + memorySize = state.memorySize + labels = state.labels + meta = state.meta + } + + override fun equals(other: Any?): Boolean = other is ServerlessFunctionImpl && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "ServerlessFunction[uid=$uid,name=$name]" +} diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt new file mode 100644 index 00000000..515cb5fa --- /dev/null +++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt @@ -0,0 +1,290 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.serverless.service.internal + +import io.opentelemetry.api.metrics.Meter +import kotlinx.coroutines.* +import kotlinx.coroutines.intrinsics.startCoroutineCancellable +import mu.KotlinLogging +import org.opendc.serverless.api.ServerlessClient +import org.opendc.serverless.api.ServerlessFunction +import org.opendc.serverless.service.FunctionObject +import org.opendc.serverless.service.ServerlessService +import org.opendc.serverless.service.deployer.FunctionDeployer +import org.opendc.serverless.service.deployer.FunctionInstance +import org.opendc.serverless.service.router.RoutingPolicy +import org.opendc.utils.TimerScheduler +import java.lang.IllegalStateException +import java.time.Clock +import java.util.* +import kotlin.coroutines.Continuation +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.resumeWithException + +/** + * Implementation of the [ServerlessService] interface. + * + * This component acts as the function router from the SPEC RG Reference Architecture for FaaS and is responsible + * for routing incoming requests or events to the correct [FunctionInstance]. If no [FunctionInstance] is available, + * this component queues the events to await the deployment of new instances. + */ +internal class ServerlessServiceImpl( + context: CoroutineContext, + private val clock: Clock, + private val meter: Meter, + private val deployer: FunctionDeployer, + private val routingPolicy: RoutingPolicy +) : ServerlessService { + /** + * The [CoroutineScope] of the service bounded by the lifecycle of the service. + */ + private val scope = CoroutineScope(context + Job()) + + /** + * The logger instance of this server. + */ + private val logger = KotlinLogging.logger {} + + /** + * The [TimerScheduler] to use for scheduling the scheduler cycles. + */ + private val scheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock) + + /** + * The [Random] instance used to generate unique identifiers for the objects. + */ + private val random = Random(0) + + /** + * The registered functions for this service. + */ + private val functions = mutableMapOf<UUID, FunctionObject>() + private val functionsByName = mutableMapOf<String, FunctionObject>() + + /** + * The queue of invocation requests. + */ + private val queue = ArrayDeque<InvocationRequest>() + + /** + * The total amount of function invocations received by the service. + */ + private val _invocations = meter.longCounterBuilder("service.invocations.total") + .setDescription("Number of function invocations") + .setUnit("1") + .build() + + /** + * The amount of function invocations that could be handled directly. + */ + private val _timelyInvocations = meter.longCounterBuilder("service.invocations.warm") + .setDescription("Number of function invocations handled directly") + .setUnit("1") + .build() + + /** + * The amount of function invocations that were delayed due to function deployment. + */ + private val _delayedInvocations = meter.longCounterBuilder("service.invocations.cold") + .setDescription("Number of function invocations that are delayed") + .setUnit("1") + .build() + + override fun newClient(): ServerlessClient { + return object : ServerlessClient { + private var isClosed: Boolean = false + + /** + * Exposes a [FunctionObject] to a client-exposed [ServerlessFunction] instance. + */ + private fun FunctionObject.asClientFunction(): ServerlessFunction { + return ServerlessFunctionImpl(this@ServerlessServiceImpl, this) + } + + override suspend fun queryFunctions(): List<ServerlessFunction> { + check(!isClosed) { "Client is already closed" } + + return functions.values.map { it.asClientFunction() } + } + + override suspend fun findFunction(id: UUID): ServerlessFunction? { + check(!isClosed) { "Client is already closed" } + + return functions[id]?.asClientFunction() + } + + override suspend fun findFunction(name: String): ServerlessFunction? { + check(!isClosed) { "Client is already closed" } + + return functionsByName[name]?.asClientFunction() + } + + override suspend fun newFunction( + name: String, + memorySize: Long, + labels: Map<String, String>, + meta: Map<String, Any> + ): ServerlessFunction { + check(!isClosed) { "Client is already closed" } + require(name !in functionsByName) { "Function with same name exists" } + + val uid = UUID(clock.millis(), random.nextLong()) + val function = FunctionObject( + meter, + uid, + name, + memorySize, + labels, + meta + ) + + functionsByName[name] = function + functions[uid] = function + + return function.asClientFunction() + } + + override suspend fun invoke(name: String) { + check(!isClosed) { "Client is already closed" } + + val func = requireNotNull(functionsByName[name]) { "Unknown function" } + this@ServerlessServiceImpl.invoke(func) + } + + override fun close() { + isClosed = true + } + } + } + + /** + * Indicate that a new scheduling cycle is needed due to a change to the service's state. + */ + private fun schedule() { + // Bail out in case we have already requested a new cycle or the queue is empty. + if (scheduler.isTimerActive(Unit) || queue.isEmpty()) { + return + } + + val quantum = 100 + + // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). + // This is important because the slices of the VMs need to be aligned. + // We calculate here the delay until the next scheduling slot. + val delay = quantum - (clock.millis() % quantum) + + scheduler.startSingleTimer(Unit, delay, ::doSchedule) + } + + /** + * Run a single scheduling iteration. + */ + @OptIn(InternalCoroutinesApi::class) + private fun doSchedule() { + try { + while (queue.isNotEmpty()) { + val (submitTime, function, cont) = queue.poll() + + val instances = function.instances + + // Check if there exists an instance of the function + val activeInstance = if (instances.isNotEmpty()) { + routingPolicy.select(instances, function) + } else { + null + } + + val instance = if (activeInstance != null) { + _timelyInvocations.add(1) + function.timelyInvocations.add(1) + + activeInstance + } else { + val instance = deployer.deploy(function) + instances.add(instance) + + function.idleInstances.add(1) + + _delayedInvocations.add(1) + function.delayedInvocations.add(1) + + instance + } + + suspend { + val start = clock.millis() + function.waitTime.record(start - submitTime) + function.idleInstances.add(-1) + function.activeInstances.add(1) + try { + instance.invoke() + } catch (e: Throwable) { + logger.debug(e) { "Function invocation failed" } + function.failedInvocations.add(1) + } finally { + val end = clock.millis() + function.activeTime.record(end - start) + function.idleInstances.add(1) + function.activeInstances.add(-1) + } + }.startCoroutineCancellable(cont) + } + } catch (cause: Throwable) { + logger.error(cause) { "Exception occurred during scheduling cycle" } + } + } + + suspend fun invoke(function: FunctionObject) { + check(function.uid in functions) { "Function does not exist (anymore)" } + + _invocations.add(1) + function.invocations.add(1) + + return suspendCancellableCoroutine { cont -> + if (!queue.add(InvocationRequest(clock.millis(), function, cont))) { + cont.resumeWithException(IllegalStateException("Failed to enqueue request")) + } else { + schedule() + } + } + } + + fun delete(function: FunctionObject) { + functions.remove(function.uid) + functionsByName.remove(function.name) + } + + override fun close() { + scope.cancel() + + // Stop all function instances + for ((_, function) in functions) { + function.close() + } + } + + /** + * A request to invoke a function. + */ + private data class InvocationRequest(val timestamp: Long, val function: FunctionObject, val cont: Continuation<Unit>) +} diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt new file mode 100644 index 00000000..063fb80a --- /dev/null +++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.serverless.service.router + +import org.opendc.serverless.service.FunctionObject +import org.opendc.serverless.service.deployer.FunctionInstance +import kotlin.random.Random + +/** + * A [RoutingPolicy] that selects a random function instance. + */ +public class RandomRoutingPolicy(private val random: Random = Random(0)) : RoutingPolicy { + override fun select(instances: List<FunctionInstance>, function: FunctionObject): FunctionInstance { + return instances.random(random) + } +} diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt new file mode 100644 index 00000000..d5d1166f --- /dev/null +++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.serverless.service.router + +import org.opendc.serverless.service.FunctionObject +import org.opendc.serverless.service.deployer.FunctionInstance + +/** + * A [RoutingPolicy] decides to which [FunctionInstance] a function invocation should be routed. + */ +public interface RoutingPolicy { + /** + * Select the instance to which the request should be routed to. + */ + public fun select(instances: List<FunctionInstance>, function: FunctionObject): FunctionInstance? +} diff --git a/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt b/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt new file mode 100644 index 00000000..d9f5ee81 --- /dev/null +++ b/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt @@ -0,0 +1,186 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.serverless.service + +import io.mockk.* +import io.opentelemetry.api.metrics.MeterProvider +import kotlinx.coroutines.ExperimentalCoroutinesApi +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows +import org.opendc.serverless.api.ServerlessFunction +import org.opendc.serverless.service.deployer.FunctionDeployer +import org.opendc.serverless.service.deployer.FunctionInstance +import org.opendc.serverless.service.deployer.FunctionInstanceState +import org.opendc.simulator.core.runBlockingSimulation +import java.util.* + +/** + * Test suite for the [ServerlessService] implementation. + */ +@OptIn(ExperimentalCoroutinesApi::class) +internal class ServerlessServiceTest { + + @Test + fun testClientState() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-serverless") + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) + + val client = assertDoesNotThrow { service.newClient() } + assertDoesNotThrow { client.close() } + + assertThrows<IllegalStateException> { client.queryFunctions() } + assertThrows<IllegalStateException> { client.newFunction("test", 128) } + assertThrows<IllegalStateException> { client.invoke("test") } + assertThrows<IllegalStateException> { client.findFunction(UUID.randomUUID()) } + assertThrows<IllegalStateException> { client.findFunction("name") } + } + + @Test + fun testClientInvokeUnknown() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-serverless") + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) + + val client = service.newClient() + + assertThrows<IllegalArgumentException> { client.invoke("test") } + } + + @Test + fun testClientFunctionCreation() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-serverless") + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) + + val client = service.newClient() + + val function = client.newFunction("test", 128) + + assertEquals("test", function.name) + } + + @Test + fun testClientFunctionQuery() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-serverless") + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) + + val client = service.newClient() + + assertEquals(emptyList<ServerlessFunction>(), client.queryFunctions()) + + val function = client.newFunction("test", 128) + + assertEquals(listOf(function), client.queryFunctions()) + } + + @Test + fun testClientFunctionFindById() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-serverless") + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) + + val client = service.newClient() + + assertEquals(emptyList<ServerlessFunction>(), client.queryFunctions()) + + val function = client.newFunction("test", 128) + + assertNotNull(client.findFunction(function.uid)) + } + + @Test + fun testClientFunctionFindByName() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-serverless") + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) + + val client = service.newClient() + + assertEquals(emptyList<ServerlessFunction>(), client.queryFunctions()) + + val function = client.newFunction("test", 128) + + assertNotNull(client.findFunction(function.name)) + } + + @Test + fun testClientFunctionDuplicateName() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-serverless") + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) + + val client = service.newClient() + + client.newFunction("test", 128) + + assertThrows<IllegalArgumentException> { client.newFunction("test", 128) } + } + + @Test + fun testClientFunctionDelete() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-serverless") + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) + + val client = service.newClient() + val function = client.newFunction("test", 128) + assertNotNull(client.findFunction(function.uid)) + function.delete() + assertNull(client.findFunction(function.uid)) + + // Delete should be idempotent + function.delete() + } + + @Test + fun testClientFunctionCannotInvokeDeleted() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-serverless") + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) + + val client = service.newClient() + val function = client.newFunction("test", 128) + assertNotNull(client.findFunction(function.uid)) + function.delete() + + assertThrows<IllegalStateException> { function.invoke() } + } + + @Test + fun testClientFunctionInvoke() = runBlockingSimulation { + val meter = MeterProvider.noop().get("opendc-serverless") + val deployer = mockk<FunctionDeployer>() + val service = ServerlessService(coroutineContext, clock, meter, deployer, mockk()) + + every { deployer.deploy(any()) } answers { + object : FunctionInstance { + override val state: FunctionInstanceState = FunctionInstanceState.Idle + override val function: FunctionObject = it.invocation.args[0] as FunctionObject + + override suspend fun invoke() {} + + override fun close() {} + } + } + + val client = service.newClient() + val function = client.newFunction("test", 128) + + function.invoke() + } +} |
