summaryrefslogtreecommitdiff
path: root/opendc-serverless/opendc-serverless-service/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-serverless/opendc-serverless-service/src')
-rw-r--r--opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/FunctionObject.kt139
-rw-r--r--opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt67
-rw-r--r--opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt43
-rw-r--r--opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt55
-rw-r--r--opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstanceState.kt53
-rw-r--r--opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessFunctionImpl.kt70
-rw-r--r--opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt290
-rw-r--r--opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt36
-rw-r--r--opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt36
-rw-r--r--opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt186
10 files changed, 975 insertions, 0 deletions
diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/FunctionObject.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/FunctionObject.kt
new file mode 100644
index 00000000..c12bbfe2
--- /dev/null
+++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/FunctionObject.kt
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.serverless.service
+
+import io.opentelemetry.api.metrics.BoundLongCounter
+import io.opentelemetry.api.metrics.BoundLongUpDownCounter
+import io.opentelemetry.api.metrics.BoundLongValueRecorder
+import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.common.Labels
+import org.opendc.serverless.service.deployer.FunctionInstance
+import java.util.*
+
+/**
+ * An [FunctionObject] represents the service's view of a serverless function.
+ */
+public class FunctionObject(
+ meter: Meter,
+ public val uid: UUID,
+ name: String,
+ allocatedMemory: Long,
+ labels: Map<String, String>,
+ meta: Map<String, Any>
+) : AutoCloseable {
+ /**
+ * The total amount of function invocations received by the function.
+ */
+ public val invocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.total")
+ .setDescription("Number of function invocations")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("function", uid.toString()))
+
+ /**
+ * The amount of function invocations that could be handled directly.
+ */
+ public val timelyInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.warm")
+ .setDescription("Number of function invocations handled directly")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("function", uid.toString()))
+
+ /**
+ * The amount of function invocations that were delayed due to function deployment.
+ */
+ public val delayedInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.cold")
+ .setDescription("Number of function invocations that are delayed")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("function", uid.toString()))
+
+ /**
+ * The amount of function invocations that failed.
+ */
+ public val failedInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.failed")
+ .setDescription("Number of function invocations that failed")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("function", uid.toString()))
+
+ /**
+ * The amount of instances for this function.
+ */
+ public val activeInstances: BoundLongUpDownCounter = meter.longUpDownCounterBuilder("function.instances.active")
+ .setDescription("Number of active function instances")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("function", uid.toString()))
+
+ /**
+ * The amount of idle instances for this function.
+ */
+ public val idleInstances: BoundLongUpDownCounter = meter.longUpDownCounterBuilder("function.instances.idle")
+ .setDescription("Number of idle function instances")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("function", uid.toString()))
+
+ /**
+ * The time that the function waited.
+ */
+ public val waitTime: BoundLongValueRecorder = meter.longValueRecorderBuilder("function.time.wait")
+ .setDescription("Time the function has to wait before being started")
+ .setUnit("ms")
+ .build()
+ .bind(Labels.of("function", uid.toString()))
+
+ /**
+ * The time that the function was running.
+ */
+ public val activeTime: BoundLongValueRecorder = meter.longValueRecorderBuilder("function.time.active")
+ .setDescription("Time the function was running")
+ .setUnit("ms")
+ .build()
+ .bind(Labels.of("function", uid.toString()))
+
+ /**
+ * The instances associated with this function.
+ */
+ public val instances: MutableList<FunctionInstance> = mutableListOf()
+
+ public var name: String = name
+ private set
+
+ public var memorySize: Long = allocatedMemory
+ private set
+
+ public val labels: MutableMap<String, String> = labels.toMutableMap()
+
+ public val meta: MutableMap<String, Any> = meta.toMutableMap()
+
+ override fun close() {
+ instances.forEach(FunctionInstance::close)
+ instances.clear()
+ }
+
+ override fun equals(other: Any?): Boolean = other is FunctionObject && uid == other.uid
+
+ override fun hashCode(): Int = uid.hashCode()
+}
diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt
new file mode 100644
index 00000000..a791c815
--- /dev/null
+++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.serverless.service
+
+import io.opentelemetry.api.metrics.Meter
+import org.opendc.serverless.api.ServerlessClient
+import org.opendc.serverless.service.deployer.FunctionDeployer
+import org.opendc.serverless.service.internal.ServerlessServiceImpl
+import org.opendc.serverless.service.router.RoutingPolicy
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * The [ServerlessService] hosts the API implementation of the OpenDC Serverless service.
+ */
+public interface ServerlessService : AutoCloseable {
+ /**
+ * Create a new [ServerlessClient] to control the compute service.
+ */
+ public fun newClient(): ServerlessClient
+
+ /**
+ * Terminate the lifecycle of the serverless service, stopping all running function instances.
+ */
+ public override fun close()
+
+ public companion object {
+ /**
+ * Construct a new [ServerlessService] implementation.
+ *
+ * @param context The [CoroutineContext] to use in the service.
+ * @param clock The clock instance to use.
+ * @param meter The meter to report metrics to.
+ * @param deployer the [FunctionDeployer] to use for deploying function instances.
+ * @param routingPolicy The policy to route function invocations.
+ */
+ public operator fun invoke(
+ context: CoroutineContext,
+ clock: Clock,
+ meter: Meter,
+ deployer: FunctionDeployer,
+ routingPolicy: RoutingPolicy,
+ ): ServerlessService {
+ return ServerlessServiceImpl(context, clock, meter, deployer, routingPolicy)
+ }
+ }
+}
diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt
new file mode 100644
index 00000000..83592a68
--- /dev/null
+++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.serverless.service.deployer
+
+import org.opendc.serverless.service.FunctionObject
+
+/**
+ * A [FunctionDeployer] is responsible for ensuring that an instance of an arbitrary function, a [FunctionInstance],
+ * is deployed.
+ *
+ * The function deployer should combines the configuration stored in the function registry, the parameters supplied by
+ * the requester, and other factors into a decision of how the function should be deployed, including how many and
+ * what kind of resources it should receive.
+ *
+ * Though it decides how the function instance should be deployed, the deployment of the function instance itself is
+ * delegated to the Resource Orchestration Layer.
+ */
+public interface FunctionDeployer {
+ /**
+ * Deploy the specified [function].
+ */
+ public fun deploy(function: FunctionObject): FunctionInstance
+}
diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt
new file mode 100644
index 00000000..d60648ea
--- /dev/null
+++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.serverless.service.deployer
+
+import org.opendc.serverless.service.FunctionObject
+
+/**
+ * A [FunctionInstance] is a a self-contained worker—typically a container—capable of handling function executions.
+ *
+ * Multiple, concurrent function instances can exists for a single function, for scalability purposes.
+ */
+public interface FunctionInstance : AutoCloseable {
+ /**
+ * The state of the instance.
+ */
+ public val state: FunctionInstanceState
+
+ /**
+ * The [FunctionObject] that is represented by this instance.
+ */
+ public val function: FunctionObject
+
+ /**
+ * Invoke the function instance.
+ *
+ * This method will suspend execution util the function instance has returned.
+ */
+ public suspend fun invoke()
+
+ /**
+ * Indicate to the resource manager that the instance is not needed anymore and may be cleaned up by the resource
+ * manager.
+ */
+ public override fun close()
+}
diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstanceState.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstanceState.kt
new file mode 100644
index 00000000..44ad80ee
--- /dev/null
+++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstanceState.kt
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.serverless.service.deployer
+
+/**
+ * This enumeration describes the states of a [FunctionInstance].
+ */
+public enum class FunctionInstanceState {
+ /**
+ * The function instance is currently being provisioned.
+ */
+ Provisioning,
+
+ /**
+ * The function instance is idle and ready to execute.
+ */
+ Idle,
+
+ /**
+ * The function instance is executing.
+ */
+ Active,
+
+ /**
+ * The function instance is stopped but can be started.
+ */
+ Terminated,
+
+ /**
+ * The function instance is released and cannot be used anymore.
+ */
+ Deleted
+}
diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessFunctionImpl.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessFunctionImpl.kt
new file mode 100644
index 00000000..80b50e77
--- /dev/null
+++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessFunctionImpl.kt
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.serverless.service.internal
+
+import org.opendc.serverless.api.ServerlessFunction
+import org.opendc.serverless.service.FunctionObject
+import java.util.*
+
+/**
+ * A [ServerlessFunction] implementation that is passed to clients.
+ */
+internal class ServerlessFunctionImpl(
+ private val service: ServerlessServiceImpl,
+ private val state: FunctionObject
+) : ServerlessFunction {
+ override val uid: UUID = state.uid
+
+ override var name: String = state.name
+ private set
+
+ override var memorySize: Long = state.memorySize
+ private set
+
+ override var labels: Map<String, String> = state.labels.toMap()
+ private set
+
+ override var meta: Map<String, Any> = state.meta.toMap()
+ private set
+
+ override suspend fun delete() {
+ service.delete(state)
+ }
+
+ override suspend fun invoke() {
+ service.invoke(state)
+ }
+
+ override suspend fun refresh() {
+ name = state.name
+ memorySize = state.memorySize
+ labels = state.labels
+ meta = state.meta
+ }
+
+ override fun equals(other: Any?): Boolean = other is ServerlessFunctionImpl && uid == other.uid
+
+ override fun hashCode(): Int = uid.hashCode()
+
+ override fun toString(): String = "ServerlessFunction[uid=$uid,name=$name]"
+}
diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt
new file mode 100644
index 00000000..515cb5fa
--- /dev/null
+++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt
@@ -0,0 +1,290 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.serverless.service.internal
+
+import io.opentelemetry.api.metrics.Meter
+import kotlinx.coroutines.*
+import kotlinx.coroutines.intrinsics.startCoroutineCancellable
+import mu.KotlinLogging
+import org.opendc.serverless.api.ServerlessClient
+import org.opendc.serverless.api.ServerlessFunction
+import org.opendc.serverless.service.FunctionObject
+import org.opendc.serverless.service.ServerlessService
+import org.opendc.serverless.service.deployer.FunctionDeployer
+import org.opendc.serverless.service.deployer.FunctionInstance
+import org.opendc.serverless.service.router.RoutingPolicy
+import org.opendc.utils.TimerScheduler
+import java.lang.IllegalStateException
+import java.time.Clock
+import java.util.*
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.resumeWithException
+
+/**
+ * Implementation of the [ServerlessService] interface.
+ *
+ * This component acts as the function router from the SPEC RG Reference Architecture for FaaS and is responsible
+ * for routing incoming requests or events to the correct [FunctionInstance]. If no [FunctionInstance] is available,
+ * this component queues the events to await the deployment of new instances.
+ */
+internal class ServerlessServiceImpl(
+ context: CoroutineContext,
+ private val clock: Clock,
+ private val meter: Meter,
+ private val deployer: FunctionDeployer,
+ private val routingPolicy: RoutingPolicy
+) : ServerlessService {
+ /**
+ * The [CoroutineScope] of the service bounded by the lifecycle of the service.
+ */
+ private val scope = CoroutineScope(context + Job())
+
+ /**
+ * The logger instance of this server.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The [TimerScheduler] to use for scheduling the scheduler cycles.
+ */
+ private val scheduler: TimerScheduler<Unit> = TimerScheduler(scope.coroutineContext, clock)
+
+ /**
+ * The [Random] instance used to generate unique identifiers for the objects.
+ */
+ private val random = Random(0)
+
+ /**
+ * The registered functions for this service.
+ */
+ private val functions = mutableMapOf<UUID, FunctionObject>()
+ private val functionsByName = mutableMapOf<String, FunctionObject>()
+
+ /**
+ * The queue of invocation requests.
+ */
+ private val queue = ArrayDeque<InvocationRequest>()
+
+ /**
+ * The total amount of function invocations received by the service.
+ */
+ private val _invocations = meter.longCounterBuilder("service.invocations.total")
+ .setDescription("Number of function invocations")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The amount of function invocations that could be handled directly.
+ */
+ private val _timelyInvocations = meter.longCounterBuilder("service.invocations.warm")
+ .setDescription("Number of function invocations handled directly")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The amount of function invocations that were delayed due to function deployment.
+ */
+ private val _delayedInvocations = meter.longCounterBuilder("service.invocations.cold")
+ .setDescription("Number of function invocations that are delayed")
+ .setUnit("1")
+ .build()
+
+ override fun newClient(): ServerlessClient {
+ return object : ServerlessClient {
+ private var isClosed: Boolean = false
+
+ /**
+ * Exposes a [FunctionObject] to a client-exposed [ServerlessFunction] instance.
+ */
+ private fun FunctionObject.asClientFunction(): ServerlessFunction {
+ return ServerlessFunctionImpl(this@ServerlessServiceImpl, this)
+ }
+
+ override suspend fun queryFunctions(): List<ServerlessFunction> {
+ check(!isClosed) { "Client is already closed" }
+
+ return functions.values.map { it.asClientFunction() }
+ }
+
+ override suspend fun findFunction(id: UUID): ServerlessFunction? {
+ check(!isClosed) { "Client is already closed" }
+
+ return functions[id]?.asClientFunction()
+ }
+
+ override suspend fun findFunction(name: String): ServerlessFunction? {
+ check(!isClosed) { "Client is already closed" }
+
+ return functionsByName[name]?.asClientFunction()
+ }
+
+ override suspend fun newFunction(
+ name: String,
+ memorySize: Long,
+ labels: Map<String, String>,
+ meta: Map<String, Any>
+ ): ServerlessFunction {
+ check(!isClosed) { "Client is already closed" }
+ require(name !in functionsByName) { "Function with same name exists" }
+
+ val uid = UUID(clock.millis(), random.nextLong())
+ val function = FunctionObject(
+ meter,
+ uid,
+ name,
+ memorySize,
+ labels,
+ meta
+ )
+
+ functionsByName[name] = function
+ functions[uid] = function
+
+ return function.asClientFunction()
+ }
+
+ override suspend fun invoke(name: String) {
+ check(!isClosed) { "Client is already closed" }
+
+ val func = requireNotNull(functionsByName[name]) { "Unknown function" }
+ this@ServerlessServiceImpl.invoke(func)
+ }
+
+ override fun close() {
+ isClosed = true
+ }
+ }
+ }
+
+ /**
+ * Indicate that a new scheduling cycle is needed due to a change to the service's state.
+ */
+ private fun schedule() {
+ // Bail out in case we have already requested a new cycle or the queue is empty.
+ if (scheduler.isTimerActive(Unit) || queue.isEmpty()) {
+ return
+ }
+
+ val quantum = 100
+
+ // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
+ // This is important because the slices of the VMs need to be aligned.
+ // We calculate here the delay until the next scheduling slot.
+ val delay = quantum - (clock.millis() % quantum)
+
+ scheduler.startSingleTimer(Unit, delay, ::doSchedule)
+ }
+
+ /**
+ * Run a single scheduling iteration.
+ */
+ @OptIn(InternalCoroutinesApi::class)
+ private fun doSchedule() {
+ try {
+ while (queue.isNotEmpty()) {
+ val (submitTime, function, cont) = queue.poll()
+
+ val instances = function.instances
+
+ // Check if there exists an instance of the function
+ val activeInstance = if (instances.isNotEmpty()) {
+ routingPolicy.select(instances, function)
+ } else {
+ null
+ }
+
+ val instance = if (activeInstance != null) {
+ _timelyInvocations.add(1)
+ function.timelyInvocations.add(1)
+
+ activeInstance
+ } else {
+ val instance = deployer.deploy(function)
+ instances.add(instance)
+
+ function.idleInstances.add(1)
+
+ _delayedInvocations.add(1)
+ function.delayedInvocations.add(1)
+
+ instance
+ }
+
+ suspend {
+ val start = clock.millis()
+ function.waitTime.record(start - submitTime)
+ function.idleInstances.add(-1)
+ function.activeInstances.add(1)
+ try {
+ instance.invoke()
+ } catch (e: Throwable) {
+ logger.debug(e) { "Function invocation failed" }
+ function.failedInvocations.add(1)
+ } finally {
+ val end = clock.millis()
+ function.activeTime.record(end - start)
+ function.idleInstances.add(1)
+ function.activeInstances.add(-1)
+ }
+ }.startCoroutineCancellable(cont)
+ }
+ } catch (cause: Throwable) {
+ logger.error(cause) { "Exception occurred during scheduling cycle" }
+ }
+ }
+
+ suspend fun invoke(function: FunctionObject) {
+ check(function.uid in functions) { "Function does not exist (anymore)" }
+
+ _invocations.add(1)
+ function.invocations.add(1)
+
+ return suspendCancellableCoroutine { cont ->
+ if (!queue.add(InvocationRequest(clock.millis(), function, cont))) {
+ cont.resumeWithException(IllegalStateException("Failed to enqueue request"))
+ } else {
+ schedule()
+ }
+ }
+ }
+
+ fun delete(function: FunctionObject) {
+ functions.remove(function.uid)
+ functionsByName.remove(function.name)
+ }
+
+ override fun close() {
+ scope.cancel()
+
+ // Stop all function instances
+ for ((_, function) in functions) {
+ function.close()
+ }
+ }
+
+ /**
+ * A request to invoke a function.
+ */
+ private data class InvocationRequest(val timestamp: Long, val function: FunctionObject, val cont: Continuation<Unit>)
+}
diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt
new file mode 100644
index 00000000..063fb80a
--- /dev/null
+++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.serverless.service.router
+
+import org.opendc.serverless.service.FunctionObject
+import org.opendc.serverless.service.deployer.FunctionInstance
+import kotlin.random.Random
+
+/**
+ * A [RoutingPolicy] that selects a random function instance.
+ */
+public class RandomRoutingPolicy(private val random: Random = Random(0)) : RoutingPolicy {
+ override fun select(instances: List<FunctionInstance>, function: FunctionObject): FunctionInstance {
+ return instances.random(random)
+ }
+}
diff --git a/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt
new file mode 100644
index 00000000..d5d1166f
--- /dev/null
+++ b/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.serverless.service.router
+
+import org.opendc.serverless.service.FunctionObject
+import org.opendc.serverless.service.deployer.FunctionInstance
+
+/**
+ * A [RoutingPolicy] decides to which [FunctionInstance] a function invocation should be routed.
+ */
+public interface RoutingPolicy {
+ /**
+ * Select the instance to which the request should be routed to.
+ */
+ public fun select(instances: List<FunctionInstance>, function: FunctionObject): FunctionInstance?
+}
diff --git a/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt b/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt
new file mode 100644
index 00000000..d9f5ee81
--- /dev/null
+++ b/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt
@@ -0,0 +1,186 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.serverless.service
+
+import io.mockk.*
+import io.opentelemetry.api.metrics.MeterProvider
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.junit.jupiter.api.assertThrows
+import org.opendc.serverless.api.ServerlessFunction
+import org.opendc.serverless.service.deployer.FunctionDeployer
+import org.opendc.serverless.service.deployer.FunctionInstance
+import org.opendc.serverless.service.deployer.FunctionInstanceState
+import org.opendc.simulator.core.runBlockingSimulation
+import java.util.*
+
+/**
+ * Test suite for the [ServerlessService] implementation.
+ */
+@OptIn(ExperimentalCoroutinesApi::class)
+internal class ServerlessServiceTest {
+
+ @Test
+ fun testClientState() = runBlockingSimulation {
+ val meter = MeterProvider.noop().get("opendc-serverless")
+ val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk())
+
+ val client = assertDoesNotThrow { service.newClient() }
+ assertDoesNotThrow { client.close() }
+
+ assertThrows<IllegalStateException> { client.queryFunctions() }
+ assertThrows<IllegalStateException> { client.newFunction("test", 128) }
+ assertThrows<IllegalStateException> { client.invoke("test") }
+ assertThrows<IllegalStateException> { client.findFunction(UUID.randomUUID()) }
+ assertThrows<IllegalStateException> { client.findFunction("name") }
+ }
+
+ @Test
+ fun testClientInvokeUnknown() = runBlockingSimulation {
+ val meter = MeterProvider.noop().get("opendc-serverless")
+ val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk())
+
+ val client = service.newClient()
+
+ assertThrows<IllegalArgumentException> { client.invoke("test") }
+ }
+
+ @Test
+ fun testClientFunctionCreation() = runBlockingSimulation {
+ val meter = MeterProvider.noop().get("opendc-serverless")
+ val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk())
+
+ val client = service.newClient()
+
+ val function = client.newFunction("test", 128)
+
+ assertEquals("test", function.name)
+ }
+
+ @Test
+ fun testClientFunctionQuery() = runBlockingSimulation {
+ val meter = MeterProvider.noop().get("opendc-serverless")
+ val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk())
+
+ val client = service.newClient()
+
+ assertEquals(emptyList<ServerlessFunction>(), client.queryFunctions())
+
+ val function = client.newFunction("test", 128)
+
+ assertEquals(listOf(function), client.queryFunctions())
+ }
+
+ @Test
+ fun testClientFunctionFindById() = runBlockingSimulation {
+ val meter = MeterProvider.noop().get("opendc-serverless")
+ val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk())
+
+ val client = service.newClient()
+
+ assertEquals(emptyList<ServerlessFunction>(), client.queryFunctions())
+
+ val function = client.newFunction("test", 128)
+
+ assertNotNull(client.findFunction(function.uid))
+ }
+
+ @Test
+ fun testClientFunctionFindByName() = runBlockingSimulation {
+ val meter = MeterProvider.noop().get("opendc-serverless")
+ val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk())
+
+ val client = service.newClient()
+
+ assertEquals(emptyList<ServerlessFunction>(), client.queryFunctions())
+
+ val function = client.newFunction("test", 128)
+
+ assertNotNull(client.findFunction(function.name))
+ }
+
+ @Test
+ fun testClientFunctionDuplicateName() = runBlockingSimulation {
+ val meter = MeterProvider.noop().get("opendc-serverless")
+ val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk())
+
+ val client = service.newClient()
+
+ client.newFunction("test", 128)
+
+ assertThrows<IllegalArgumentException> { client.newFunction("test", 128) }
+ }
+
+ @Test
+ fun testClientFunctionDelete() = runBlockingSimulation {
+ val meter = MeterProvider.noop().get("opendc-serverless")
+ val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk())
+
+ val client = service.newClient()
+ val function = client.newFunction("test", 128)
+ assertNotNull(client.findFunction(function.uid))
+ function.delete()
+ assertNull(client.findFunction(function.uid))
+
+ // Delete should be idempotent
+ function.delete()
+ }
+
+ @Test
+ fun testClientFunctionCannotInvokeDeleted() = runBlockingSimulation {
+ val meter = MeterProvider.noop().get("opendc-serverless")
+ val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk())
+
+ val client = service.newClient()
+ val function = client.newFunction("test", 128)
+ assertNotNull(client.findFunction(function.uid))
+ function.delete()
+
+ assertThrows<IllegalStateException> { function.invoke() }
+ }
+
+ @Test
+ fun testClientFunctionInvoke() = runBlockingSimulation {
+ val meter = MeterProvider.noop().get("opendc-serverless")
+ val deployer = mockk<FunctionDeployer>()
+ val service = ServerlessService(coroutineContext, clock, meter, deployer, mockk())
+
+ every { deployer.deploy(any()) } answers {
+ object : FunctionInstance {
+ override val state: FunctionInstanceState = FunctionInstanceState.Idle
+ override val function: FunctionObject = it.invocation.args[0] as FunctionObject
+
+ override suspend fun invoke() {}
+
+ override fun close() {}
+ }
+ }
+
+ val client = service.newClient()
+ val function = client.newFunction("test", 128)
+
+ function.invoke()
+ }
+}