summaryrefslogtreecommitdiff
path: root/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-04-08 20:44:30 +0200
committerGitHub <noreply@github.com>2021-04-08 20:44:30 +0200
commit5fdbfbe7d340bc10f8b9eebd5aa23bdfd7dc4e18 (patch)
tree21020cd0451664006a5bf291a5c27dd74f6129d0 /simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org
parent3fd45fc5befb1fc9a67d4494e8a3786a5dceae3a (diff)
parent831ba3d882a46dad2abe6ac281b736b729dc7080 (diff)
exp: Add serverless experiments (v1)
This pull request is the first in a series of pull request to add the serverless experiments from Soufiane Jounaid's BSc thesis to the main OpenDC repository. In this pull request, we add the serverless experiment and trace reader. * Add `opendc-experiments-serverless20` which will contain the serverless experiments. * Add `ServerlessTraceReader` which reads the traces from Soufiane's work. * Add support for cold start delays * Expose metrics per function.
Diffstat (limited to 'simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org')
-rw-r--r--simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/FunctionObject.kt139
-rw-r--r--simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt7
-rw-r--r--simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt4
-rw-r--r--simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt6
-rw-r--r--simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/InternalFunction.kt60
-rw-r--r--simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessFunctionImpl.kt (renamed from simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ClientFunction.kt)34
-rw-r--r--simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt114
-rw-r--r--simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt4
-rw-r--r--simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt4
9 files changed, 256 insertions, 116 deletions
diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/FunctionObject.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/FunctionObject.kt
new file mode 100644
index 00000000..c12bbfe2
--- /dev/null
+++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/FunctionObject.kt
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.serverless.service
+
+import io.opentelemetry.api.metrics.BoundLongCounter
+import io.opentelemetry.api.metrics.BoundLongUpDownCounter
+import io.opentelemetry.api.metrics.BoundLongValueRecorder
+import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.common.Labels
+import org.opendc.serverless.service.deployer.FunctionInstance
+import java.util.*
+
+/**
+ * An [FunctionObject] represents the service's view of a serverless function.
+ */
+public class FunctionObject(
+ meter: Meter,
+ public val uid: UUID,
+ name: String,
+ allocatedMemory: Long,
+ labels: Map<String, String>,
+ meta: Map<String, Any>
+) : AutoCloseable {
+ /**
+ * The total amount of function invocations received by the function.
+ */
+ public val invocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.total")
+ .setDescription("Number of function invocations")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("function", uid.toString()))
+
+ /**
+ * The amount of function invocations that could be handled directly.
+ */
+ public val timelyInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.warm")
+ .setDescription("Number of function invocations handled directly")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("function", uid.toString()))
+
+ /**
+ * The amount of function invocations that were delayed due to function deployment.
+ */
+ public val delayedInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.cold")
+ .setDescription("Number of function invocations that are delayed")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("function", uid.toString()))
+
+ /**
+ * The amount of function invocations that failed.
+ */
+ public val failedInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.failed")
+ .setDescription("Number of function invocations that failed")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("function", uid.toString()))
+
+ /**
+ * The amount of instances for this function.
+ */
+ public val activeInstances: BoundLongUpDownCounter = meter.longUpDownCounterBuilder("function.instances.active")
+ .setDescription("Number of active function instances")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("function", uid.toString()))
+
+ /**
+ * The amount of idle instances for this function.
+ */
+ public val idleInstances: BoundLongUpDownCounter = meter.longUpDownCounterBuilder("function.instances.idle")
+ .setDescription("Number of idle function instances")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("function", uid.toString()))
+
+ /**
+ * The time that the function waited.
+ */
+ public val waitTime: BoundLongValueRecorder = meter.longValueRecorderBuilder("function.time.wait")
+ .setDescription("Time the function has to wait before being started")
+ .setUnit("ms")
+ .build()
+ .bind(Labels.of("function", uid.toString()))
+
+ /**
+ * The time that the function was running.
+ */
+ public val activeTime: BoundLongValueRecorder = meter.longValueRecorderBuilder("function.time.active")
+ .setDescription("Time the function was running")
+ .setUnit("ms")
+ .build()
+ .bind(Labels.of("function", uid.toString()))
+
+ /**
+ * The instances associated with this function.
+ */
+ public val instances: MutableList<FunctionInstance> = mutableListOf()
+
+ public var name: String = name
+ private set
+
+ public var memorySize: Long = allocatedMemory
+ private set
+
+ public val labels: MutableMap<String, String> = labels.toMutableMap()
+
+ public val meta: MutableMap<String, Any> = meta.toMutableMap()
+
+ override fun close() {
+ instances.forEach(FunctionInstance::close)
+ instances.clear()
+ }
+
+ override fun equals(other: Any?): Boolean = other is FunctionObject && uid == other.uid
+
+ override fun hashCode(): Int = uid.hashCode()
+}
diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt
index 18717ef5..a791c815 100644
--- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt
+++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt
@@ -22,6 +22,7 @@
package org.opendc.serverless.service
+import io.opentelemetry.api.metrics.Meter
import org.opendc.serverless.api.ServerlessClient
import org.opendc.serverless.service.deployer.FunctionDeployer
import org.opendc.serverless.service.internal.ServerlessServiceImpl
@@ -49,14 +50,18 @@ public interface ServerlessService : AutoCloseable {
*
* @param context The [CoroutineContext] to use in the service.
* @param clock The clock instance to use.
+ * @param meter The meter to report metrics to.
+ * @param deployer the [FunctionDeployer] to use for deploying function instances.
+ * @param routingPolicy The policy to route function invocations.
*/
public operator fun invoke(
context: CoroutineContext,
clock: Clock,
+ meter: Meter,
deployer: FunctionDeployer,
routingPolicy: RoutingPolicy,
): ServerlessService {
- return ServerlessServiceImpl(context, clock, deployer, routingPolicy)
+ return ServerlessServiceImpl(context, clock, meter, deployer, routingPolicy)
}
}
}
diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt
index e0a37009..83592a68 100644
--- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt
+++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt
@@ -22,7 +22,7 @@
package org.opendc.serverless.service.deployer
-import org.opendc.serverless.api.ServerlessFunction
+import org.opendc.serverless.service.FunctionObject
/**
* A [FunctionDeployer] is responsible for ensuring that an instance of an arbitrary function, a [FunctionInstance],
@@ -39,5 +39,5 @@ public interface FunctionDeployer {
/**
* Deploy the specified [function].
*/
- public fun deploy(function: ServerlessFunction): FunctionInstance
+ public fun deploy(function: FunctionObject): FunctionInstance
}
diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt
index 410df5d4..d60648ea 100644
--- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt
+++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt
@@ -22,7 +22,7 @@
package org.opendc.serverless.service.deployer
-import org.opendc.serverless.api.ServerlessFunction
+import org.opendc.serverless.service.FunctionObject
/**
* A [FunctionInstance] is a a self-contained worker—typically a container—capable of handling function executions.
@@ -36,9 +36,9 @@ public interface FunctionInstance : AutoCloseable {
public val state: FunctionInstanceState
/**
- * The [ServerlessFunction] that is represented by this instance.
+ * The [FunctionObject] that is represented by this instance.
*/
- public val function: ServerlessFunction
+ public val function: FunctionObject
/**
* Invoke the function instance.
diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/InternalFunction.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/InternalFunction.kt
deleted file mode 100644
index a6e22912..00000000
--- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/InternalFunction.kt
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.serverless.service.internal
-
-import org.opendc.serverless.api.ServerlessFunction
-import java.util.*
-
-/**
- * Internal stateful representation of a [ServerlessFunction].
- */
-internal class InternalFunction(
- private val service: ServerlessServiceImpl,
- override val uid: UUID,
- name: String,
- labels: Map<String, String>,
- meta: Map<String, Any>
-) : ServerlessFunction {
- override var name: String = name
- private set
-
- override val labels: MutableMap<String, String> = labels.toMutableMap()
-
- override val meta: MutableMap<String, Any> = meta.toMutableMap()
-
- override suspend fun refresh() {
- // No-op: this object is the source-of-truth
- }
-
- override suspend fun invoke() {
- service.invoke(this)
- }
-
- override suspend fun delete() {
- service.delete(this)
- }
-
- override fun equals(other: Any?): Boolean = other is ServerlessFunction && uid == other.uid
-
- override fun hashCode(): Int = uid.hashCode()
-}
diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ClientFunction.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessFunctionImpl.kt
index 1258a037..80b50e77 100644
--- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ClientFunction.kt
+++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessFunctionImpl.kt
@@ -23,40 +23,46 @@
package org.opendc.serverless.service.internal
import org.opendc.serverless.api.ServerlessFunction
+import org.opendc.serverless.service.FunctionObject
import java.util.*
/**
- * A [ServerlessFunction] implementation that is passed to clients but delegates its implementation to another class.
+ * A [ServerlessFunction] implementation that is passed to clients.
*/
-internal class ClientFunction(private val delegate: ServerlessFunction) : ServerlessFunction {
- override val uid: UUID = delegate.uid
+internal class ServerlessFunctionImpl(
+ private val service: ServerlessServiceImpl,
+ private val state: FunctionObject
+) : ServerlessFunction {
+ override val uid: UUID = state.uid
- override var name: String = delegate.name
+ override var name: String = state.name
private set
- override var labels: Map<String, String> = delegate.labels.toMap()
+ override var memorySize: Long = state.memorySize
private set
- override var meta: Map<String, Any> = delegate.meta.toMap()
+ override var labels: Map<String, String> = state.labels.toMap()
+ private set
+
+ override var meta: Map<String, Any> = state.meta.toMap()
private set
override suspend fun delete() {
- delegate.delete()
+ service.delete(state)
}
override suspend fun invoke() {
- delegate.invoke()
+ service.invoke(state)
}
override suspend fun refresh() {
- delegate.refresh()
-
- name = delegate.name
- labels = delegate.labels
- meta = delegate.meta
+ name = state.name
+ memorySize = state.memorySize
+ labels = state.labels
+ meta = state.meta
}
- override fun equals(other: Any?): Boolean = other is ClientFunction && uid == other.uid
+ override fun equals(other: Any?): Boolean = other is ServerlessFunctionImpl && uid == other.uid
override fun hashCode(): Int = uid.hashCode()
diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt
index b3f395c3..515cb5fa 100644
--- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt
+++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt
@@ -22,11 +22,13 @@
package org.opendc.serverless.service.internal
+import io.opentelemetry.api.metrics.Meter
import kotlinx.coroutines.*
import kotlinx.coroutines.intrinsics.startCoroutineCancellable
import mu.KotlinLogging
import org.opendc.serverless.api.ServerlessClient
import org.opendc.serverless.api.ServerlessFunction
+import org.opendc.serverless.service.FunctionObject
import org.opendc.serverless.service.ServerlessService
import org.opendc.serverless.service.deployer.FunctionDeployer
import org.opendc.serverless.service.deployer.FunctionInstance
@@ -49,6 +51,7 @@ import kotlin.coroutines.resumeWithException
internal class ServerlessServiceImpl(
context: CoroutineContext,
private val clock: Clock,
+ private val meter: Meter,
private val deployer: FunctionDeployer,
private val routingPolicy: RoutingPolicy
) : ServerlessService {
@@ -75,8 +78,8 @@ internal class ServerlessServiceImpl(
/**
* The registered functions for this service.
*/
- private val functions = mutableMapOf<UUID, InternalFunction>()
- private val functionsByName = mutableMapOf<String, InternalFunction>()
+ private val functions = mutableMapOf<UUID, FunctionObject>()
+ private val functionsByName = mutableMapOf<String, FunctionObject>()
/**
* The queue of invocation requests.
@@ -84,34 +87,61 @@ internal class ServerlessServiceImpl(
private val queue = ArrayDeque<InvocationRequest>()
/**
- * The active function instances.
+ * The total amount of function invocations received by the service.
*/
- private val instancesByFunction = mutableMapOf<InternalFunction, MutableList<FunctionInstance>>()
+ private val _invocations = meter.longCounterBuilder("service.invocations.total")
+ .setDescription("Number of function invocations")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The amount of function invocations that could be handled directly.
+ */
+ private val _timelyInvocations = meter.longCounterBuilder("service.invocations.warm")
+ .setDescription("Number of function invocations handled directly")
+ .setUnit("1")
+ .build()
+
+ /**
+ * The amount of function invocations that were delayed due to function deployment.
+ */
+ private val _delayedInvocations = meter.longCounterBuilder("service.invocations.cold")
+ .setDescription("Number of function invocations that are delayed")
+ .setUnit("1")
+ .build()
override fun newClient(): ServerlessClient {
return object : ServerlessClient {
private var isClosed: Boolean = false
+ /**
+ * Exposes a [FunctionObject] to a client-exposed [ServerlessFunction] instance.
+ */
+ private fun FunctionObject.asClientFunction(): ServerlessFunction {
+ return ServerlessFunctionImpl(this@ServerlessServiceImpl, this)
+ }
+
override suspend fun queryFunctions(): List<ServerlessFunction> {
check(!isClosed) { "Client is already closed" }
- return functions.values.map { ClientFunction(it) }
+ return functions.values.map { it.asClientFunction() }
}
override suspend fun findFunction(id: UUID): ServerlessFunction? {
check(!isClosed) { "Client is already closed" }
- return functions[id]?.let { ClientFunction(it) }
+ return functions[id]?.asClientFunction()
}
override suspend fun findFunction(name: String): ServerlessFunction? {
check(!isClosed) { "Client is already closed" }
- return functionsByName[name]?.let { ClientFunction(it) }
+ return functionsByName[name]?.asClientFunction()
}
override suspend fun newFunction(
name: String,
+ memorySize: Long,
labels: Map<String, String>,
meta: Map<String, Any>
): ServerlessFunction {
@@ -119,10 +149,11 @@ internal class ServerlessServiceImpl(
require(name !in functionsByName) { "Function with same name exists" }
val uid = UUID(clock.millis(), random.nextLong())
- val function = InternalFunction(
- this@ServerlessServiceImpl,
+ val function = FunctionObject(
+ meter,
uid,
name,
+ memorySize,
labels,
meta
)
@@ -130,13 +161,14 @@ internal class ServerlessServiceImpl(
functionsByName[name] = function
functions[uid] = function
- return ClientFunction(function)
+ return function.asClientFunction()
}
override suspend fun invoke(name: String) {
check(!isClosed) { "Client is already closed" }
- requireNotNull(functionsByName[name]) { "Unknown function" }.invoke()
+ val func = requireNotNull(functionsByName[name]) { "Unknown function" }
+ this@ServerlessServiceImpl.invoke(func)
}
override fun close() {
@@ -154,7 +186,7 @@ internal class ServerlessServiceImpl(
return
}
- val quantum = 1000
+ val quantum = 100
// We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120).
// This is important because the slices of the VMs need to be aligned.
@@ -171,46 +203,65 @@ internal class ServerlessServiceImpl(
private fun doSchedule() {
try {
while (queue.isNotEmpty()) {
- val (function, cont) = queue.poll()
+ val (submitTime, function, cont) = queue.poll()
- val instances = instancesByFunction[function]
+ val instances = function.instances
// Check if there exists an instance of the function
- val activeInstance = if (instances != null && instances.isNotEmpty()) {
+ val activeInstance = if (instances.isNotEmpty()) {
routingPolicy.select(instances, function)
} else {
null
}
val instance = if (activeInstance != null) {
+ _timelyInvocations.add(1)
+ function.timelyInvocations.add(1)
+
activeInstance
} else {
val instance = deployer.deploy(function)
- instancesByFunction.compute(function) { _, v ->
- if (v != null) {
- v.add(instance)
- v
- } else {
- mutableListOf(instance)
- }
- }
+ instances.add(instance)
+
+ function.idleInstances.add(1)
+
+ _delayedInvocations.add(1)
+ function.delayedInvocations.add(1)
instance
}
- // Invoke the function instance
- suspend { instance.invoke() }.startCoroutineCancellable(cont)
+ suspend {
+ val start = clock.millis()
+ function.waitTime.record(start - submitTime)
+ function.idleInstances.add(-1)
+ function.activeInstances.add(1)
+ try {
+ instance.invoke()
+ } catch (e: Throwable) {
+ logger.debug(e) { "Function invocation failed" }
+ function.failedInvocations.add(1)
+ } finally {
+ val end = clock.millis()
+ function.activeTime.record(end - start)
+ function.idleInstances.add(1)
+ function.activeInstances.add(-1)
+ }
+ }.startCoroutineCancellable(cont)
}
} catch (cause: Throwable) {
logger.error(cause) { "Exception occurred during scheduling cycle" }
}
}
- internal suspend fun invoke(function: InternalFunction) {
+ suspend fun invoke(function: FunctionObject) {
check(function.uid in functions) { "Function does not exist (anymore)" }
+ _invocations.add(1)
+ function.invocations.add(1)
+
return suspendCancellableCoroutine { cont ->
- if (!queue.add(InvocationRequest(function, cont))) {
+ if (!queue.add(InvocationRequest(clock.millis(), function, cont))) {
cont.resumeWithException(IllegalStateException("Failed to enqueue request"))
} else {
schedule()
@@ -218,7 +269,7 @@ internal class ServerlessServiceImpl(
}
}
- internal fun delete(function: InternalFunction) {
+ fun delete(function: FunctionObject) {
functions.remove(function.uid)
functionsByName.remove(function.name)
}
@@ -227,14 +278,13 @@ internal class ServerlessServiceImpl(
scope.cancel()
// Stop all function instances
- for ((_, instances) in instancesByFunction) {
- instances.forEach(FunctionInstance::close)
+ for ((_, function) in functions) {
+ function.close()
}
- instancesByFunction.clear()
}
/**
* A request to invoke a function.
*/
- private data class InvocationRequest(val function: InternalFunction, val cont: Continuation<Unit>)
+ private data class InvocationRequest(val timestamp: Long, val function: FunctionObject, val cont: Continuation<Unit>)
}
diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt
index 015704ca..063fb80a 100644
--- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt
+++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt
@@ -22,7 +22,7 @@
package org.opendc.serverless.service.router
-import org.opendc.serverless.api.ServerlessFunction
+import org.opendc.serverless.service.FunctionObject
import org.opendc.serverless.service.deployer.FunctionInstance
import kotlin.random.Random
@@ -30,7 +30,7 @@ import kotlin.random.Random
* A [RoutingPolicy] that selects a random function instance.
*/
public class RandomRoutingPolicy(private val random: Random = Random(0)) : RoutingPolicy {
- override fun select(instances: List<FunctionInstance>, function: ServerlessFunction): FunctionInstance {
+ override fun select(instances: List<FunctionInstance>, function: FunctionObject): FunctionInstance {
return instances.random(random)
}
}
diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt
index 77f43059..d5d1166f 100644
--- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt
+++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt
@@ -22,7 +22,7 @@
package org.opendc.serverless.service.router
-import org.opendc.serverless.api.ServerlessFunction
+import org.opendc.serverless.service.FunctionObject
import org.opendc.serverless.service.deployer.FunctionInstance
/**
@@ -32,5 +32,5 @@ public interface RoutingPolicy {
/**
* Select the instance to which the request should be routed to.
*/
- public fun select(instances: List<FunctionInstance>, function: ServerlessFunction): FunctionInstance?
+ public fun select(instances: List<FunctionInstance>, function: FunctionObject): FunctionInstance?
}