From b8ae32eb9a2420fe596ac5f89d3eabee83c2291a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 29 Mar 2021 13:45:41 +0200 Subject: serverless: Expose metrics from Serverless service This change exposes several metrics from the Serverless service, which are needed for the experiments. --- .../opendc-serverless-service/build.gradle.kts | 1 + .../opendc/serverless/service/ServerlessService.kt | 7 ++++- .../service/internal/ServerlessServiceImpl.kt | 31 ++++++++++++++++++++++ .../serverless/service/ServerlessServiceTest.kt | 31 +++++++++++++++------- 4 files changed, 59 insertions(+), 11 deletions(-) (limited to 'simulator/opendc-serverless/opendc-serverless-service') diff --git a/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts b/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts index 0221829a..f7e43aba 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts +++ b/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts @@ -32,6 +32,7 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-serverless:opendc-serverless-api")) + api(project(":opendc-telemetry:opendc-telemetry-api")) implementation(project(":opendc-utils")) implementation("io.github.microutils:kotlin-logging") diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt index 18717ef5..a791c815 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt @@ -22,6 +22,7 @@ package org.opendc.serverless.service +import io.opentelemetry.api.metrics.Meter import org.opendc.serverless.api.ServerlessClient import org.opendc.serverless.service.deployer.FunctionDeployer import org.opendc.serverless.service.internal.ServerlessServiceImpl @@ -49,14 +50,18 @@ public interface ServerlessService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. + * @param meter The meter to report metrics to. + * @param deployer the [FunctionDeployer] to use for deploying function instances. + * @param routingPolicy The policy to route function invocations. */ public operator fun invoke( context: CoroutineContext, clock: Clock, + meter: Meter, deployer: FunctionDeployer, routingPolicy: RoutingPolicy, ): ServerlessService { - return ServerlessServiceImpl(context, clock, deployer, routingPolicy) + return ServerlessServiceImpl(context, clock, meter, deployer, routingPolicy) } } } diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt index b3f395c3..c49871df 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt @@ -22,6 +22,7 @@ package org.opendc.serverless.service.internal +import io.opentelemetry.api.metrics.Meter import kotlinx.coroutines.* import kotlinx.coroutines.intrinsics.startCoroutineCancellable import mu.KotlinLogging @@ -49,6 +50,7 @@ import kotlin.coroutines.resumeWithException internal class ServerlessServiceImpl( context: CoroutineContext, private val clock: Clock, + internal val meter: Meter, private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy ) : ServerlessService { @@ -88,6 +90,30 @@ internal class ServerlessServiceImpl( */ private val instancesByFunction = mutableMapOf>() + /** + * The total amount of function invocations received by the service. + */ + private val _invocations = meter.longCounterBuilder("invocations.total") + .setDescription("Number of function invocations") + .setUnit("1") + .build() + + /** + * The amount of function invocations that could be handled directly. + */ + private val _timelyInvocations = meter.longCounterBuilder("invocations.warm") + .setDescription("Number of function invocations handled directly") + .setUnit("1") + .build() + + /** + * The amount of function invocations that were delayed due to function deployment. + */ + private val _delayedInvocations = meter.longCounterBuilder("invocations.cold") + .setDescription("Number of function invocations that are delayed") + .setUnit("1") + .build() + override fun newClient(): ServerlessClient { return object : ServerlessClient { private var isClosed: Boolean = false @@ -183,6 +209,8 @@ internal class ServerlessServiceImpl( } val instance = if (activeInstance != null) { + _timelyInvocations.add(1) + activeInstance } else { val instance = deployer.deploy(function) @@ -195,6 +223,8 @@ internal class ServerlessServiceImpl( } } + _delayedInvocations.add(1) + instance } @@ -209,6 +239,7 @@ internal class ServerlessServiceImpl( internal suspend fun invoke(function: InternalFunction) { check(function.uid in functions) { "Function does not exist (anymore)" } + _invocations.add(1) return suspendCancellableCoroutine { cont -> if (!queue.add(InvocationRequest(function, cont))) { cont.resumeWithException(IllegalStateException("Failed to enqueue request")) diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt b/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt index d9c2bcd2..c4910758 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt @@ -23,6 +23,7 @@ package org.opendc.serverless.service import io.mockk.* +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.* @@ -44,8 +45,9 @@ internal class ServerlessServiceTest { @Test fun testClientState() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = assertDoesNotThrow { service.newClient() } assertDoesNotThrow { client.close() } @@ -59,8 +61,9 @@ internal class ServerlessServiceTest { @Test fun testClientInvokeUnknown() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() @@ -69,8 +72,9 @@ internal class ServerlessServiceTest { @Test fun testClientFunctionCreation() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() @@ -81,8 +85,9 @@ internal class ServerlessServiceTest { @Test fun testClientFunctionQuery() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() @@ -95,8 +100,9 @@ internal class ServerlessServiceTest { @Test fun testClientFunctionFindById() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() @@ -109,8 +115,9 @@ internal class ServerlessServiceTest { @Test fun testClientFunctionFindByName() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() @@ -123,8 +130,9 @@ internal class ServerlessServiceTest { @Test fun testClientFunctionDuplicateName() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() @@ -135,8 +143,9 @@ internal class ServerlessServiceTest { @Test fun testClientFunctionDelete() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test") @@ -150,8 +159,9 @@ internal class ServerlessServiceTest { @Test fun testClientFunctionCannotInvokeDeleted() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() val function = client.newFunction("test") @@ -163,9 +173,10 @@ internal class ServerlessServiceTest { @Test fun testClientFunctionInvoke() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) val deployer = mockk() - val service = ServerlessService(coroutineContext, clock, deployer, mockk()) + val service = ServerlessService(coroutineContext, clock, meter, deployer, mockk()) every { deployer.deploy(any()) } answers { object : FunctionInstance { -- cgit v1.2.3 From 1fb04ae372f96b32f9996c43fd066c98405ba634 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 29 Mar 2021 14:10:20 +0200 Subject: serverless: Add possibility to specify function memory requirements --- .../serverless/service/internal/ClientFunction.kt | 4 ++++ .../serverless/service/internal/InternalFunction.kt | 4 ++++ .../service/internal/ServerlessServiceImpl.kt | 2 ++ .../serverless/service/ServerlessServiceTest.kt | 20 ++++++++++---------- 4 files changed, 20 insertions(+), 10 deletions(-) (limited to 'simulator/opendc-serverless/opendc-serverless-service') diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ClientFunction.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ClientFunction.kt index 1258a037..a26e7d87 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ClientFunction.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ClientFunction.kt @@ -34,6 +34,9 @@ internal class ClientFunction(private val delegate: ServerlessFunction) : Server override var name: String = delegate.name private set + override var memorySize: Long = delegate.memorySize + private set + override var labels: Map = delegate.labels.toMap() private set @@ -52,6 +55,7 @@ internal class ClientFunction(private val delegate: ServerlessFunction) : Server delegate.refresh() name = delegate.name + memorySize = delegate.memorySize labels = delegate.labels meta = delegate.meta } diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/InternalFunction.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/InternalFunction.kt index a6e22912..cea2018d 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/InternalFunction.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/InternalFunction.kt @@ -32,12 +32,16 @@ internal class InternalFunction( private val service: ServerlessServiceImpl, override val uid: UUID, name: String, + allocatedMemory: Long, labels: Map, meta: Map ) : ServerlessFunction { override var name: String = name private set + override var memorySize: Long = allocatedMemory + private set + override val labels: MutableMap = labels.toMutableMap() override val meta: MutableMap = meta.toMutableMap() diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt index c49871df..9d8dadb1 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt @@ -138,6 +138,7 @@ internal class ServerlessServiceImpl( override suspend fun newFunction( name: String, + memorySize: Long, labels: Map, meta: Map ): ServerlessFunction { @@ -149,6 +150,7 @@ internal class ServerlessServiceImpl( this@ServerlessServiceImpl, uid, name, + memorySize, labels, meta ) diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt b/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt index c4910758..569e9246 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt @@ -53,7 +53,7 @@ internal class ServerlessServiceTest { assertDoesNotThrow { client.close() } assertThrows { client.queryFunctions() } - assertThrows { client.newFunction("test") } + assertThrows { client.newFunction("test", 128) } assertThrows { client.invoke("test") } assertThrows { client.findFunction(UUID.randomUUID()) } assertThrows { client.findFunction("name") } @@ -78,7 +78,7 @@ internal class ServerlessServiceTest { val client = service.newClient() - val function = client.newFunction("test") + val function = client.newFunction("test", 128) assertEquals("test", function.name) } @@ -93,7 +93,7 @@ internal class ServerlessServiceTest { assertEquals(emptyList(), client.queryFunctions()) - val function = client.newFunction("test") + val function = client.newFunction("test", 128) assertEquals(listOf(function), client.queryFunctions()) } @@ -108,7 +108,7 @@ internal class ServerlessServiceTest { assertEquals(emptyList(), client.queryFunctions()) - val function = client.newFunction("test") + val function = client.newFunction("test", 128) assertNotNull(client.findFunction(function.uid)) } @@ -123,7 +123,7 @@ internal class ServerlessServiceTest { assertEquals(emptyList(), client.queryFunctions()) - val function = client.newFunction("test") + val function = client.newFunction("test", 128) assertNotNull(client.findFunction(function.name)) } @@ -136,9 +136,9 @@ internal class ServerlessServiceTest { val client = service.newClient() - client.newFunction("test") + client.newFunction("test", 128) - assertThrows { client.newFunction("test") } + assertThrows { client.newFunction("test", 128) } } @Test @@ -148,7 +148,7 @@ internal class ServerlessServiceTest { val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() - val function = client.newFunction("test") + val function = client.newFunction("test", 128) assertNotNull(client.findFunction(function.uid)) function.delete() assertNull(client.findFunction(function.uid)) @@ -164,7 +164,7 @@ internal class ServerlessServiceTest { val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() - val function = client.newFunction("test") + val function = client.newFunction("test", 128) assertNotNull(client.findFunction(function.uid)) function.delete() @@ -190,7 +190,7 @@ internal class ServerlessServiceTest { } val client = service.newClient() - val function = client.newFunction("test") + val function = client.newFunction("test", 128) function.invoke() } -- cgit v1.2.3 From 69a881dca5ace9ba4ed294f72fd9a192fab83b3f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 1 Apr 2021 20:42:43 +0200 Subject: serverless: Track metrics per function This change adds metrics that are tracked per function instance, which includes the runtime of the invocations, the number of invocations (total, warm, cold, failed). --- .../opendc/serverless/service/FunctionObject.kt | 139 +++++++++++++++++++++ .../service/deployer/FunctionDeployer.kt | 4 +- .../service/deployer/FunctionInstance.kt | 6 +- .../serverless/service/internal/ClientFunction.kt | 68 ---------- .../service/internal/InternalFunction.kt | 64 ---------- .../service/internal/ServerlessFunctionImpl.kt | 70 +++++++++++ .../service/internal/ServerlessServiceImpl.kt | 95 ++++++++------ .../service/router/RandomRoutingPolicy.kt | 4 +- .../serverless/service/router/RoutingPolicy.kt | 4 +- .../serverless/service/ServerlessServiceTest.kt | 2 +- 10 files changed, 275 insertions(+), 181 deletions(-) create mode 100644 simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/FunctionObject.kt delete mode 100644 simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ClientFunction.kt delete mode 100644 simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/InternalFunction.kt create mode 100644 simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessFunctionImpl.kt (limited to 'simulator/opendc-serverless/opendc-serverless-service') diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/FunctionObject.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/FunctionObject.kt new file mode 100644 index 00000000..c12bbfe2 --- /dev/null +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/FunctionObject.kt @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.serverless.service + +import io.opentelemetry.api.metrics.BoundLongCounter +import io.opentelemetry.api.metrics.BoundLongUpDownCounter +import io.opentelemetry.api.metrics.BoundLongValueRecorder +import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.common.Labels +import org.opendc.serverless.service.deployer.FunctionInstance +import java.util.* + +/** + * An [FunctionObject] represents the service's view of a serverless function. + */ +public class FunctionObject( + meter: Meter, + public val uid: UUID, + name: String, + allocatedMemory: Long, + labels: Map, + meta: Map +) : AutoCloseable { + /** + * The total amount of function invocations received by the function. + */ + public val invocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.total") + .setDescription("Number of function invocations") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of function invocations that could be handled directly. + */ + public val timelyInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.warm") + .setDescription("Number of function invocations handled directly") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of function invocations that were delayed due to function deployment. + */ + public val delayedInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.cold") + .setDescription("Number of function invocations that are delayed") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of function invocations that failed. + */ + public val failedInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.failed") + .setDescription("Number of function invocations that failed") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of instances for this function. + */ + public val activeInstances: BoundLongUpDownCounter = meter.longUpDownCounterBuilder("function.instances.active") + .setDescription("Number of active function instances") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of idle instances for this function. + */ + public val idleInstances: BoundLongUpDownCounter = meter.longUpDownCounterBuilder("function.instances.idle") + .setDescription("Number of idle function instances") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The time that the function waited. + */ + public val waitTime: BoundLongValueRecorder = meter.longValueRecorderBuilder("function.time.wait") + .setDescription("Time the function has to wait before being started") + .setUnit("ms") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The time that the function was running. + */ + public val activeTime: BoundLongValueRecorder = meter.longValueRecorderBuilder("function.time.active") + .setDescription("Time the function was running") + .setUnit("ms") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The instances associated with this function. + */ + public val instances: MutableList = mutableListOf() + + public var name: String = name + private set + + public var memorySize: Long = allocatedMemory + private set + + public val labels: MutableMap = labels.toMutableMap() + + public val meta: MutableMap = meta.toMutableMap() + + override fun close() { + instances.forEach(FunctionInstance::close) + instances.clear() + } + + override fun equals(other: Any?): Boolean = other is FunctionObject && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt index e0a37009..83592a68 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt @@ -22,7 +22,7 @@ package org.opendc.serverless.service.deployer -import org.opendc.serverless.api.ServerlessFunction +import org.opendc.serverless.service.FunctionObject /** * A [FunctionDeployer] is responsible for ensuring that an instance of an arbitrary function, a [FunctionInstance], @@ -39,5 +39,5 @@ public interface FunctionDeployer { /** * Deploy the specified [function]. */ - public fun deploy(function: ServerlessFunction): FunctionInstance + public fun deploy(function: FunctionObject): FunctionInstance } diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt index 410df5d4..d60648ea 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt @@ -22,7 +22,7 @@ package org.opendc.serverless.service.deployer -import org.opendc.serverless.api.ServerlessFunction +import org.opendc.serverless.service.FunctionObject /** * A [FunctionInstance] is a a self-contained worker—typically a container—capable of handling function executions. @@ -36,9 +36,9 @@ public interface FunctionInstance : AutoCloseable { public val state: FunctionInstanceState /** - * The [ServerlessFunction] that is represented by this instance. + * The [FunctionObject] that is represented by this instance. */ - public val function: ServerlessFunction + public val function: FunctionObject /** * Invoke the function instance. diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ClientFunction.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ClientFunction.kt deleted file mode 100644 index a26e7d87..00000000 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ClientFunction.kt +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.serverless.service.internal - -import org.opendc.serverless.api.ServerlessFunction -import java.util.* - -/** - * A [ServerlessFunction] implementation that is passed to clients but delegates its implementation to another class. - */ -internal class ClientFunction(private val delegate: ServerlessFunction) : ServerlessFunction { - override val uid: UUID = delegate.uid - - override var name: String = delegate.name - private set - - override var memorySize: Long = delegate.memorySize - private set - - override var labels: Map = delegate.labels.toMap() - private set - - override var meta: Map = delegate.meta.toMap() - private set - - override suspend fun delete() { - delegate.delete() - } - - override suspend fun invoke() { - delegate.invoke() - } - - override suspend fun refresh() { - delegate.refresh() - - name = delegate.name - memorySize = delegate.memorySize - labels = delegate.labels - meta = delegate.meta - } - - override fun equals(other: Any?): Boolean = other is ClientFunction && uid == other.uid - - override fun hashCode(): Int = uid.hashCode() - - override fun toString(): String = "ServerlessFunction[uid=$uid,name=$name]" -} diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/InternalFunction.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/InternalFunction.kt deleted file mode 100644 index cea2018d..00000000 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/InternalFunction.kt +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.serverless.service.internal - -import org.opendc.serverless.api.ServerlessFunction -import java.util.* - -/** - * Internal stateful representation of a [ServerlessFunction]. - */ -internal class InternalFunction( - private val service: ServerlessServiceImpl, - override val uid: UUID, - name: String, - allocatedMemory: Long, - labels: Map, - meta: Map -) : ServerlessFunction { - override var name: String = name - private set - - override var memorySize: Long = allocatedMemory - private set - - override val labels: MutableMap = labels.toMutableMap() - - override val meta: MutableMap = meta.toMutableMap() - - override suspend fun refresh() { - // No-op: this object is the source-of-truth - } - - override suspend fun invoke() { - service.invoke(this) - } - - override suspend fun delete() { - service.delete(this) - } - - override fun equals(other: Any?): Boolean = other is ServerlessFunction && uid == other.uid - - override fun hashCode(): Int = uid.hashCode() -} diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessFunctionImpl.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessFunctionImpl.kt new file mode 100644 index 00000000..80b50e77 --- /dev/null +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessFunctionImpl.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.serverless.service.internal + +import org.opendc.serverless.api.ServerlessFunction +import org.opendc.serverless.service.FunctionObject +import java.util.* + +/** + * A [ServerlessFunction] implementation that is passed to clients. + */ +internal class ServerlessFunctionImpl( + private val service: ServerlessServiceImpl, + private val state: FunctionObject +) : ServerlessFunction { + override val uid: UUID = state.uid + + override var name: String = state.name + private set + + override var memorySize: Long = state.memorySize + private set + + override var labels: Map = state.labels.toMap() + private set + + override var meta: Map = state.meta.toMap() + private set + + override suspend fun delete() { + service.delete(state) + } + + override suspend fun invoke() { + service.invoke(state) + } + + override suspend fun refresh() { + name = state.name + memorySize = state.memorySize + labels = state.labels + meta = state.meta + } + + override fun equals(other: Any?): Boolean = other is ServerlessFunctionImpl && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() + + override fun toString(): String = "ServerlessFunction[uid=$uid,name=$name]" +} diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt index 9d8dadb1..515cb5fa 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt @@ -28,6 +28,7 @@ import kotlinx.coroutines.intrinsics.startCoroutineCancellable import mu.KotlinLogging import org.opendc.serverless.api.ServerlessClient import org.opendc.serverless.api.ServerlessFunction +import org.opendc.serverless.service.FunctionObject import org.opendc.serverless.service.ServerlessService import org.opendc.serverless.service.deployer.FunctionDeployer import org.opendc.serverless.service.deployer.FunctionInstance @@ -50,7 +51,7 @@ import kotlin.coroutines.resumeWithException internal class ServerlessServiceImpl( context: CoroutineContext, private val clock: Clock, - internal val meter: Meter, + private val meter: Meter, private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy ) : ServerlessService { @@ -77,23 +78,18 @@ internal class ServerlessServiceImpl( /** * The registered functions for this service. */ - private val functions = mutableMapOf() - private val functionsByName = mutableMapOf() + private val functions = mutableMapOf() + private val functionsByName = mutableMapOf() /** * The queue of invocation requests. */ private val queue = ArrayDeque() - /** - * The active function instances. - */ - private val instancesByFunction = mutableMapOf>() - /** * The total amount of function invocations received by the service. */ - private val _invocations = meter.longCounterBuilder("invocations.total") + private val _invocations = meter.longCounterBuilder("service.invocations.total") .setDescription("Number of function invocations") .setUnit("1") .build() @@ -101,7 +97,7 @@ internal class ServerlessServiceImpl( /** * The amount of function invocations that could be handled directly. */ - private val _timelyInvocations = meter.longCounterBuilder("invocations.warm") + private val _timelyInvocations = meter.longCounterBuilder("service.invocations.warm") .setDescription("Number of function invocations handled directly") .setUnit("1") .build() @@ -109,7 +105,7 @@ internal class ServerlessServiceImpl( /** * The amount of function invocations that were delayed due to function deployment. */ - private val _delayedInvocations = meter.longCounterBuilder("invocations.cold") + private val _delayedInvocations = meter.longCounterBuilder("service.invocations.cold") .setDescription("Number of function invocations that are delayed") .setUnit("1") .build() @@ -118,22 +114,29 @@ internal class ServerlessServiceImpl( return object : ServerlessClient { private var isClosed: Boolean = false + /** + * Exposes a [FunctionObject] to a client-exposed [ServerlessFunction] instance. + */ + private fun FunctionObject.asClientFunction(): ServerlessFunction { + return ServerlessFunctionImpl(this@ServerlessServiceImpl, this) + } + override suspend fun queryFunctions(): List { check(!isClosed) { "Client is already closed" } - return functions.values.map { ClientFunction(it) } + return functions.values.map { it.asClientFunction() } } override suspend fun findFunction(id: UUID): ServerlessFunction? { check(!isClosed) { "Client is already closed" } - return functions[id]?.let { ClientFunction(it) } + return functions[id]?.asClientFunction() } override suspend fun findFunction(name: String): ServerlessFunction? { check(!isClosed) { "Client is already closed" } - return functionsByName[name]?.let { ClientFunction(it) } + return functionsByName[name]?.asClientFunction() } override suspend fun newFunction( @@ -146,8 +149,8 @@ internal class ServerlessServiceImpl( require(name !in functionsByName) { "Function with same name exists" } val uid = UUID(clock.millis(), random.nextLong()) - val function = InternalFunction( - this@ServerlessServiceImpl, + val function = FunctionObject( + meter, uid, name, memorySize, @@ -158,13 +161,14 @@ internal class ServerlessServiceImpl( functionsByName[name] = function functions[uid] = function - return ClientFunction(function) + return function.asClientFunction() } override suspend fun invoke(name: String) { check(!isClosed) { "Client is already closed" } - requireNotNull(functionsByName[name]) { "Unknown function" }.invoke() + val func = requireNotNull(functionsByName[name]) { "Unknown function" } + this@ServerlessServiceImpl.invoke(func) } override fun close() { @@ -182,7 +186,7 @@ internal class ServerlessServiceImpl( return } - val quantum = 1000 + val quantum = 100 // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). // This is important because the slices of the VMs need to be aligned. @@ -199,12 +203,12 @@ internal class ServerlessServiceImpl( private fun doSchedule() { try { while (queue.isNotEmpty()) { - val (function, cont) = queue.poll() + val (submitTime, function, cont) = queue.poll() - val instances = instancesByFunction[function] + val instances = function.instances // Check if there exists an instance of the function - val activeInstance = if (instances != null && instances.isNotEmpty()) { + val activeInstance = if (instances.isNotEmpty()) { routingPolicy.select(instances, function) } else { null @@ -212,38 +216,52 @@ internal class ServerlessServiceImpl( val instance = if (activeInstance != null) { _timelyInvocations.add(1) + function.timelyInvocations.add(1) activeInstance } else { val instance = deployer.deploy(function) - instancesByFunction.compute(function) { _, v -> - if (v != null) { - v.add(instance) - v - } else { - mutableListOf(instance) - } - } + instances.add(instance) + + function.idleInstances.add(1) _delayedInvocations.add(1) + function.delayedInvocations.add(1) instance } - // Invoke the function instance - suspend { instance.invoke() }.startCoroutineCancellable(cont) + suspend { + val start = clock.millis() + function.waitTime.record(start - submitTime) + function.idleInstances.add(-1) + function.activeInstances.add(1) + try { + instance.invoke() + } catch (e: Throwable) { + logger.debug(e) { "Function invocation failed" } + function.failedInvocations.add(1) + } finally { + val end = clock.millis() + function.activeTime.record(end - start) + function.idleInstances.add(1) + function.activeInstances.add(-1) + } + }.startCoroutineCancellable(cont) } } catch (cause: Throwable) { logger.error(cause) { "Exception occurred during scheduling cycle" } } } - internal suspend fun invoke(function: InternalFunction) { + suspend fun invoke(function: FunctionObject) { check(function.uid in functions) { "Function does not exist (anymore)" } _invocations.add(1) + function.invocations.add(1) + return suspendCancellableCoroutine { cont -> - if (!queue.add(InvocationRequest(function, cont))) { + if (!queue.add(InvocationRequest(clock.millis(), function, cont))) { cont.resumeWithException(IllegalStateException("Failed to enqueue request")) } else { schedule() @@ -251,7 +269,7 @@ internal class ServerlessServiceImpl( } } - internal fun delete(function: InternalFunction) { + fun delete(function: FunctionObject) { functions.remove(function.uid) functionsByName.remove(function.name) } @@ -260,14 +278,13 @@ internal class ServerlessServiceImpl( scope.cancel() // Stop all function instances - for ((_, instances) in instancesByFunction) { - instances.forEach(FunctionInstance::close) + for ((_, function) in functions) { + function.close() } - instancesByFunction.clear() } /** * A request to invoke a function. */ - private data class InvocationRequest(val function: InternalFunction, val cont: Continuation) + private data class InvocationRequest(val timestamp: Long, val function: FunctionObject, val cont: Continuation) } diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt index 015704ca..063fb80a 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt @@ -22,7 +22,7 @@ package org.opendc.serverless.service.router -import org.opendc.serverless.api.ServerlessFunction +import org.opendc.serverless.service.FunctionObject import org.opendc.serverless.service.deployer.FunctionInstance import kotlin.random.Random @@ -30,7 +30,7 @@ import kotlin.random.Random * A [RoutingPolicy] that selects a random function instance. */ public class RandomRoutingPolicy(private val random: Random = Random(0)) : RoutingPolicy { - override fun select(instances: List, function: ServerlessFunction): FunctionInstance { + override fun select(instances: List, function: FunctionObject): FunctionInstance { return instances.random(random) } } diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt index 77f43059..d5d1166f 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt @@ -22,7 +22,7 @@ package org.opendc.serverless.service.router -import org.opendc.serverless.api.ServerlessFunction +import org.opendc.serverless.service.FunctionObject import org.opendc.serverless.service.deployer.FunctionInstance /** @@ -32,5 +32,5 @@ public interface RoutingPolicy { /** * Select the instance to which the request should be routed to. */ - public fun select(instances: List, function: ServerlessFunction): FunctionInstance? + public fun select(instances: List, function: FunctionObject): FunctionInstance? } diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt b/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt index 569e9246..bf99d0e7 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt @@ -181,7 +181,7 @@ internal class ServerlessServiceTest { every { deployer.deploy(any()) } answers { object : FunctionInstance { override val state: FunctionInstanceState = FunctionInstanceState.Idle - override val function: ServerlessFunction = it.invocation.args[0] as ServerlessFunction + override val function: FunctionObject = it.invocation.args[0] as FunctionObject override suspend fun invoke() {} -- cgit v1.2.3