diff options
28 files changed, 889 insertions, 156 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/README.md b/simulator/opendc-experiments/opendc-experiments-serverless20/README.md new file mode 100644 index 00000000..40855ad0 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-serverless20/README.md @@ -0,0 +1,7 @@ +OpenDC Serverless +================= + +This module contains a reproduction of the experiments of Soufiane Jounaid's BSc Computer Science thesis: +OpenDC Serverless: Design, Implementation and Evaluation of a FaaS Platform Simulator [1] + +[1] https://drive.google.com/file/d/12hox3PwagpD0jNFA57tO4r2HqvOonkY3/view?usp=sharing diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts new file mode 100644 index 00000000..40b15af4 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Experiments for OpenDC Serverless" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` + `experiment-conventions` + `testing-conventions` +} + +dependencies { + api(platform(project(":opendc-platform"))) + api(project(":opendc-harness")) + implementation(project(":opendc-serverless:opendc-serverless-service")) + implementation(project(":opendc-serverless:opendc-serverless-simulator")) + implementation(project(":opendc-telemetry:opendc-telemetry-sdk")) + + implementation("io.github.microutils:kotlin-logging") + + implementation("org.apache.parquet:parquet-avro:${versions["parquet-avro"]}") + implementation("org.apache.hadoop:hadoop-client:${versions["hadoop-client"]}") { + exclude(group = "org.slf4j", module = "slf4j-log4j12") + exclude(group = "log4j") + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt new file mode 100644 index 00000000..757617f8 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.serverless + +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runBlockingTest +import mu.KotlinLogging +import org.opendc.experiments.serverless.trace.FunctionTraceWorkload +import org.opendc.experiments.serverless.trace.ServerlessTraceReader +import org.opendc.harness.dsl.Experiment +import org.opendc.harness.dsl.anyOf +import org.opendc.serverless.service.ServerlessService +import org.opendc.serverless.service.router.RandomRoutingPolicy +import org.opendc.serverless.simulator.SimFunctionDeployer +import org.opendc.serverless.simulator.delay.ColdStartModel +import org.opendc.serverless.simulator.delay.StochasticDelayInjector +import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.telemetry.sdk.toOtelClock +import java.io.File +import java.util.* +import kotlin.math.max + +/** + * A reproduction of the experiments of Soufiane Jounaid's BSc Computer Science thesis: + * OpenDC Serverless: Design, Implementation and Evaluation of a FaaS Platform Simulator. + */ +public class ServerlessExperiment : Experiment("Serverless") { + /** + * The logger for this portfolio instance. + */ + private val logger = KotlinLogging.logger {} + + /** + * The path to where the traces are located. + */ + private val tracePath by anyOf(File("../../input/traces/serverless")) + + /** + * The path to where the output results should be written. + */ + private val outputPath by anyOf(File("output/")) + + /** + * The routing policy to test. + */ + private val routingPolicy by anyOf(RandomRoutingPolicy()) + + /** + * The cold start models to test. + */ + private val coldStartModel by anyOf(ColdStartModel.LAMBDA, ColdStartModel.AZURE, ColdStartModel.GOOGLE) + + @OptIn(ExperimentalCoroutinesApi::class) + override fun doRun(repeat: Int): Unit = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + val trace = ServerlessTraceReader().parse(tracePath) + val traceById = trace.associateBy { it.id } + val delayInjector = StochasticDelayInjector(coldStartModel, Random()) + val deployer = SimFunctionDeployer(clock, this, createMachineModel(), delayInjector) { FunctionTraceWorkload(traceById.getValue(it.name)) } + val service = ServerlessService(coroutineContext, clock, meterProvider.get("opendc-serverless"), deployer, routingPolicy) + val client = service.newClient() + + coroutineScope { + for (entry in trace) { + launch { + val function = client.newFunction(entry.id, entry.maxMemory.toLong()) + var offset = Long.MIN_VALUE + + for (sample in entry.samples) { + if (sample.invocations == 0) { + continue + } + + if (offset < 0) { + offset = sample.timestamp - clock.millis() + } + + delay(max(0, (sample.timestamp - offset) - clock.millis())) + + logger.info { "Invoking function ${entry.id} ${sample.invocations} times [${sample.timestamp}]" } + + repeat(sample.invocations) { + function.invoke() + } + } + } + } + } + + client.close() + service.close() + } + + /** + * Construct the machine model to test with. + */ + private fun createMachineModel(): SimMachineModel { + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + + return SimMachineModel( + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + ) + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionSample.kt b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionSample.kt new file mode 100644 index 00000000..492f44b9 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionSample.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.serverless.trace + +/** + * A sample of a single function. + * + * @param timestamp The timestamp of the function. + * @param duration The average execution time of the function. + * @param invocations The number of invocations. + * @param provisionedCpu The provisioned CPU for this function in MHz. + * @param provisionedMem The amount of memory provisioned for this function in MB. + * @param cpuUsage The actual CPU usage in MHz. + * @param memUsage The actual memory usage in MB. + */ +public data class FunctionSample( + val timestamp: Long, + val duration: Long, + val invocations: Int, + val provisionedCpu: Int, + val provisionedMem: Int, + val cpuUsage: Double, + val memUsage: Double +) diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTrace.kt b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTrace.kt new file mode 100644 index 00000000..4fea6b96 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTrace.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.serverless.trace + +/** + * A trace for a single function + */ +public data class FunctionTrace(val id: String, val maxMemory: Int, val samples: List<FunctionSample>) diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt new file mode 100644 index 00000000..7d824857 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.serverless.trace + +import org.opendc.serverless.simulator.workload.SimServerlessWorkload +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload + +/** + * A [SimServerlessWorkload] for a [FunctionTrace]. + */ +public class FunctionTraceWorkload(trace: FunctionTrace) : SimServerlessWorkload, SimWorkload by SimTraceWorkload(trace.samples.asSequence().map { SimTraceWorkload.Fragment(it.duration, it.cpuUsage, 1) }) { + override suspend fun invoke() {} +} diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/ServerlessTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/ServerlessTraceReader.kt new file mode 100644 index 00000000..6dc35c59 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/ServerlessTraceReader.kt @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.serverless.trace + +import mu.KotlinLogging +import java.io.File +import kotlin.math.max + +/** + * A trace reader for the serverless workload trace used in the OpenDC Serverless thesis. + */ +public class ServerlessTraceReader { + /** + * The logger for this portfolio instance. + */ + private val logger = KotlinLogging.logger {} + + /** + * Parse the traces at the specified [path]. + */ + public fun parse(path: File): List<FunctionTrace> { + return if (path.isFile) { + listOf(parseSingle(path)) + } else { + path.walk() + .filterNot { it.isDirectory } + .map { file -> + logger.info { "Parsing $file" } + parseSingle(file) + } + .toList() + } + } + + /** + * Parse a single trace. + */ + private fun parseSingle(path: File): FunctionTrace { + val samples = mutableListOf<FunctionSample>() + val id = path.nameWithoutExtension + var idx = 0 + + var timestampCol = 0 + var invocationsCol = 0 + var execTimeCol = 0 + var provCpuCol = 0 + var provMemCol = 0 + var cpuUsageCol = 0 + var memoryUsageCol = 0 + var maxMemory = 0 + + path.forEachLine { line -> + if (line.startsWith("#") && line.isNotBlank()) { + return@forEachLine + } + + val values = line.split(",") + + /* Header parsing */ + if (idx++ == 0) { + val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() + timestampCol = header["Timestamp [ms]"]!! + invocationsCol = header["Invocations"]!! + execTimeCol = header["Avg Exec time per Invocation"]!! + provCpuCol = header["Provisioned CPU [Mhz]"]!! + provMemCol = header["Provisioned Memory [mb]"]!! + cpuUsageCol = header["Avg cpu usage per Invocation [Mhz]"]!! + memoryUsageCol = header["Avg mem usage per Invocation [mb]"]!! + return@forEachLine + } + + val timestamp = values[timestampCol].trim().toLong() + val invocations = values[invocationsCol].trim().toInt() + val execTime = values[execTimeCol].trim().toLong() + val provisionedCpu = values[provCpuCol].trim().toInt() + val provisionedMemory = values[provMemCol].trim().toInt() + val cpuUsage = values[cpuUsageCol].trim().toDouble() + val memoryUsage = values[memoryUsageCol].trim().toDouble() + + maxMemory = max(maxMemory, provisionedMemory) + + samples.add(FunctionSample(timestamp, execTime, invocations, provisionedCpu, provisionedMemory, cpuUsage, memoryUsage)) + } + + return FunctionTrace(id, maxMemory, samples) + } +} diff --git a/simulator/opendc-serverless/opendc-serverless-api/src/main/kotlin/org/opendc/serverless/api/ServerlessClient.kt b/simulator/opendc-serverless/opendc-serverless-api/src/main/kotlin/org/opendc/serverless/api/ServerlessClient.kt index f14f4977..b66369ec 100644 --- a/simulator/opendc-serverless/opendc-serverless-api/src/main/kotlin/org/opendc/serverless/api/ServerlessClient.kt +++ b/simulator/opendc-serverless/opendc-serverless-api/src/main/kotlin/org/opendc/serverless/api/ServerlessClient.kt @@ -51,11 +51,13 @@ public interface ServerlessClient : AutoCloseable { * Create a new serverless function. * * @param name The name of the function. + * @param memorySize The memory allocated for the function in MB. * @param labels The labels associated with the function. * @param meta The metadata associated with the function. */ public suspend fun newFunction( name: String, + memorySize: Long, labels: Map<String, String> = emptyMap(), meta: Map<String, Any> = emptyMap() ): ServerlessFunction diff --git a/simulator/opendc-serverless/opendc-serverless-api/src/main/kotlin/org/opendc/serverless/api/ServerlessFunction.kt b/simulator/opendc-serverless/opendc-serverless-api/src/main/kotlin/org/opendc/serverless/api/ServerlessFunction.kt index ec0fad71..f1360966 100644 --- a/simulator/opendc-serverless/opendc-serverless-api/src/main/kotlin/org/opendc/serverless/api/ServerlessFunction.kt +++ b/simulator/opendc-serverless/opendc-serverless-api/src/main/kotlin/org/opendc/serverless/api/ServerlessFunction.kt @@ -39,6 +39,11 @@ public interface ServerlessFunction { public val name: String /** + * The amount of memory allocated for this function in MB. + */ + public val memorySize: Long + + /** * The identifying labels attached to the resource. */ public val labels: Map<String, String> diff --git a/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts b/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts index 0221829a..f7e43aba 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts +++ b/simulator/opendc-serverless/opendc-serverless-service/build.gradle.kts @@ -32,6 +32,7 @@ plugins { dependencies { api(platform(project(":opendc-platform"))) api(project(":opendc-serverless:opendc-serverless-api")) + api(project(":opendc-telemetry:opendc-telemetry-api")) implementation(project(":opendc-utils")) implementation("io.github.microutils:kotlin-logging") diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/FunctionObject.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/FunctionObject.kt new file mode 100644 index 00000000..c12bbfe2 --- /dev/null +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/FunctionObject.kt @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.serverless.service + +import io.opentelemetry.api.metrics.BoundLongCounter +import io.opentelemetry.api.metrics.BoundLongUpDownCounter +import io.opentelemetry.api.metrics.BoundLongValueRecorder +import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.common.Labels +import org.opendc.serverless.service.deployer.FunctionInstance +import java.util.* + +/** + * An [FunctionObject] represents the service's view of a serverless function. + */ +public class FunctionObject( + meter: Meter, + public val uid: UUID, + name: String, + allocatedMemory: Long, + labels: Map<String, String>, + meta: Map<String, Any> +) : AutoCloseable { + /** + * The total amount of function invocations received by the function. + */ + public val invocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.total") + .setDescription("Number of function invocations") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of function invocations that could be handled directly. + */ + public val timelyInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.warm") + .setDescription("Number of function invocations handled directly") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of function invocations that were delayed due to function deployment. + */ + public val delayedInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.cold") + .setDescription("Number of function invocations that are delayed") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of function invocations that failed. + */ + public val failedInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.failed") + .setDescription("Number of function invocations that failed") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of instances for this function. + */ + public val activeInstances: BoundLongUpDownCounter = meter.longUpDownCounterBuilder("function.instances.active") + .setDescription("Number of active function instances") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The amount of idle instances for this function. + */ + public val idleInstances: BoundLongUpDownCounter = meter.longUpDownCounterBuilder("function.instances.idle") + .setDescription("Number of idle function instances") + .setUnit("1") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The time that the function waited. + */ + public val waitTime: BoundLongValueRecorder = meter.longValueRecorderBuilder("function.time.wait") + .setDescription("Time the function has to wait before being started") + .setUnit("ms") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The time that the function was running. + */ + public val activeTime: BoundLongValueRecorder = meter.longValueRecorderBuilder("function.time.active") + .setDescription("Time the function was running") + .setUnit("ms") + .build() + .bind(Labels.of("function", uid.toString())) + + /** + * The instances associated with this function. + */ + public val instances: MutableList<FunctionInstance> = mutableListOf() + + public var name: String = name + private set + + public var memorySize: Long = allocatedMemory + private set + + public val labels: MutableMap<String, String> = labels.toMutableMap() + + public val meta: MutableMap<String, Any> = meta.toMutableMap() + + override fun close() { + instances.forEach(FunctionInstance::close) + instances.clear() + } + + override fun equals(other: Any?): Boolean = other is FunctionObject && uid == other.uid + + override fun hashCode(): Int = uid.hashCode() +} diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt index 18717ef5..a791c815 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/ServerlessService.kt @@ -22,6 +22,7 @@ package org.opendc.serverless.service +import io.opentelemetry.api.metrics.Meter import org.opendc.serverless.api.ServerlessClient import org.opendc.serverless.service.deployer.FunctionDeployer import org.opendc.serverless.service.internal.ServerlessServiceImpl @@ -49,14 +50,18 @@ public interface ServerlessService : AutoCloseable { * * @param context The [CoroutineContext] to use in the service. * @param clock The clock instance to use. + * @param meter The meter to report metrics to. + * @param deployer the [FunctionDeployer] to use for deploying function instances. + * @param routingPolicy The policy to route function invocations. */ public operator fun invoke( context: CoroutineContext, clock: Clock, + meter: Meter, deployer: FunctionDeployer, routingPolicy: RoutingPolicy, ): ServerlessService { - return ServerlessServiceImpl(context, clock, deployer, routingPolicy) + return ServerlessServiceImpl(context, clock, meter, deployer, routingPolicy) } } } diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt index e0a37009..83592a68 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionDeployer.kt @@ -22,7 +22,7 @@ package org.opendc.serverless.service.deployer -import org.opendc.serverless.api.ServerlessFunction +import org.opendc.serverless.service.FunctionObject /** * A [FunctionDeployer] is responsible for ensuring that an instance of an arbitrary function, a [FunctionInstance], @@ -39,5 +39,5 @@ public interface FunctionDeployer { /** * Deploy the specified [function]. */ - public fun deploy(function: ServerlessFunction): FunctionInstance + public fun deploy(function: FunctionObject): FunctionInstance } diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt index 410df5d4..d60648ea 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/deployer/FunctionInstance.kt @@ -22,7 +22,7 @@ package org.opendc.serverless.service.deployer -import org.opendc.serverless.api.ServerlessFunction +import org.opendc.serverless.service.FunctionObject /** * A [FunctionInstance] is a a self-contained worker—typically a container—capable of handling function executions. @@ -36,9 +36,9 @@ public interface FunctionInstance : AutoCloseable { public val state: FunctionInstanceState /** - * The [ServerlessFunction] that is represented by this instance. + * The [FunctionObject] that is represented by this instance. */ - public val function: ServerlessFunction + public val function: FunctionObject /** * Invoke the function instance. diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ClientFunction.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessFunctionImpl.kt index 1258a037..80b50e77 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ClientFunction.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessFunctionImpl.kt @@ -23,40 +23,46 @@ package org.opendc.serverless.service.internal import org.opendc.serverless.api.ServerlessFunction +import org.opendc.serverless.service.FunctionObject import java.util.* /** - * A [ServerlessFunction] implementation that is passed to clients but delegates its implementation to another class. + * A [ServerlessFunction] implementation that is passed to clients. */ -internal class ClientFunction(private val delegate: ServerlessFunction) : ServerlessFunction { - override val uid: UUID = delegate.uid +internal class ServerlessFunctionImpl( + private val service: ServerlessServiceImpl, + private val state: FunctionObject +) : ServerlessFunction { + override val uid: UUID = state.uid - override var name: String = delegate.name + override var name: String = state.name private set - override var labels: Map<String, String> = delegate.labels.toMap() + override var memorySize: Long = state.memorySize private set - override var meta: Map<String, Any> = delegate.meta.toMap() + override var labels: Map<String, String> = state.labels.toMap() + private set + + override var meta: Map<String, Any> = state.meta.toMap() private set override suspend fun delete() { - delegate.delete() + service.delete(state) } override suspend fun invoke() { - delegate.invoke() + service.invoke(state) } override suspend fun refresh() { - delegate.refresh() - - name = delegate.name - labels = delegate.labels - meta = delegate.meta + name = state.name + memorySize = state.memorySize + labels = state.labels + meta = state.meta } - override fun equals(other: Any?): Boolean = other is ClientFunction && uid == other.uid + override fun equals(other: Any?): Boolean = other is ServerlessFunctionImpl && uid == other.uid override fun hashCode(): Int = uid.hashCode() diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt index b3f395c3..515cb5fa 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/ServerlessServiceImpl.kt @@ -22,11 +22,13 @@ package org.opendc.serverless.service.internal +import io.opentelemetry.api.metrics.Meter import kotlinx.coroutines.* import kotlinx.coroutines.intrinsics.startCoroutineCancellable import mu.KotlinLogging import org.opendc.serverless.api.ServerlessClient import org.opendc.serverless.api.ServerlessFunction +import org.opendc.serverless.service.FunctionObject import org.opendc.serverless.service.ServerlessService import org.opendc.serverless.service.deployer.FunctionDeployer import org.opendc.serverless.service.deployer.FunctionInstance @@ -49,6 +51,7 @@ import kotlin.coroutines.resumeWithException internal class ServerlessServiceImpl( context: CoroutineContext, private val clock: Clock, + private val meter: Meter, private val deployer: FunctionDeployer, private val routingPolicy: RoutingPolicy ) : ServerlessService { @@ -75,8 +78,8 @@ internal class ServerlessServiceImpl( /** * The registered functions for this service. */ - private val functions = mutableMapOf<UUID, InternalFunction>() - private val functionsByName = mutableMapOf<String, InternalFunction>() + private val functions = mutableMapOf<UUID, FunctionObject>() + private val functionsByName = mutableMapOf<String, FunctionObject>() /** * The queue of invocation requests. @@ -84,34 +87,61 @@ internal class ServerlessServiceImpl( private val queue = ArrayDeque<InvocationRequest>() /** - * The active function instances. + * The total amount of function invocations received by the service. */ - private val instancesByFunction = mutableMapOf<InternalFunction, MutableList<FunctionInstance>>() + private val _invocations = meter.longCounterBuilder("service.invocations.total") + .setDescription("Number of function invocations") + .setUnit("1") + .build() + + /** + * The amount of function invocations that could be handled directly. + */ + private val _timelyInvocations = meter.longCounterBuilder("service.invocations.warm") + .setDescription("Number of function invocations handled directly") + .setUnit("1") + .build() + + /** + * The amount of function invocations that were delayed due to function deployment. + */ + private val _delayedInvocations = meter.longCounterBuilder("service.invocations.cold") + .setDescription("Number of function invocations that are delayed") + .setUnit("1") + .build() override fun newClient(): ServerlessClient { return object : ServerlessClient { private var isClosed: Boolean = false + /** + * Exposes a [FunctionObject] to a client-exposed [ServerlessFunction] instance. + */ + private fun FunctionObject.asClientFunction(): ServerlessFunction { + return ServerlessFunctionImpl(this@ServerlessServiceImpl, this) + } + override suspend fun queryFunctions(): List<ServerlessFunction> { check(!isClosed) { "Client is already closed" } - return functions.values.map { ClientFunction(it) } + return functions.values.map { it.asClientFunction() } } override suspend fun findFunction(id: UUID): ServerlessFunction? { check(!isClosed) { "Client is already closed" } - return functions[id]?.let { ClientFunction(it) } + return functions[id]?.asClientFunction() } override suspend fun findFunction(name: String): ServerlessFunction? { check(!isClosed) { "Client is already closed" } - return functionsByName[name]?.let { ClientFunction(it) } + return functionsByName[name]?.asClientFunction() } override suspend fun newFunction( name: String, + memorySize: Long, labels: Map<String, String>, meta: Map<String, Any> ): ServerlessFunction { @@ -119,10 +149,11 @@ internal class ServerlessServiceImpl( require(name !in functionsByName) { "Function with same name exists" } val uid = UUID(clock.millis(), random.nextLong()) - val function = InternalFunction( - this@ServerlessServiceImpl, + val function = FunctionObject( + meter, uid, name, + memorySize, labels, meta ) @@ -130,13 +161,14 @@ internal class ServerlessServiceImpl( functionsByName[name] = function functions[uid] = function - return ClientFunction(function) + return function.asClientFunction() } override suspend fun invoke(name: String) { check(!isClosed) { "Client is already closed" } - requireNotNull(functionsByName[name]) { "Unknown function" }.invoke() + val func = requireNotNull(functionsByName[name]) { "Unknown function" } + this@ServerlessServiceImpl.invoke(func) } override fun close() { @@ -154,7 +186,7 @@ internal class ServerlessServiceImpl( return } - val quantum = 1000 + val quantum = 100 // We assume that the provisioner runs at a fixed slot every time quantum (e.g t=0, t=60, t=120). // This is important because the slices of the VMs need to be aligned. @@ -171,46 +203,65 @@ internal class ServerlessServiceImpl( private fun doSchedule() { try { while (queue.isNotEmpty()) { - val (function, cont) = queue.poll() + val (submitTime, function, cont) = queue.poll() - val instances = instancesByFunction[function] + val instances = function.instances // Check if there exists an instance of the function - val activeInstance = if (instances != null && instances.isNotEmpty()) { + val activeInstance = if (instances.isNotEmpty()) { routingPolicy.select(instances, function) } else { null } val instance = if (activeInstance != null) { + _timelyInvocations.add(1) + function.timelyInvocations.add(1) + activeInstance } else { val instance = deployer.deploy(function) - instancesByFunction.compute(function) { _, v -> - if (v != null) { - v.add(instance) - v - } else { - mutableListOf(instance) - } - } + instances.add(instance) + + function.idleInstances.add(1) + + _delayedInvocations.add(1) + function.delayedInvocations.add(1) instance } - // Invoke the function instance - suspend { instance.invoke() }.startCoroutineCancellable(cont) + suspend { + val start = clock.millis() + function.waitTime.record(start - submitTime) + function.idleInstances.add(-1) + function.activeInstances.add(1) + try { + instance.invoke() + } catch (e: Throwable) { + logger.debug(e) { "Function invocation failed" } + function.failedInvocations.add(1) + } finally { + val end = clock.millis() + function.activeTime.record(end - start) + function.idleInstances.add(1) + function.activeInstances.add(-1) + } + }.startCoroutineCancellable(cont) } } catch (cause: Throwable) { logger.error(cause) { "Exception occurred during scheduling cycle" } } } - internal suspend fun invoke(function: InternalFunction) { + suspend fun invoke(function: FunctionObject) { check(function.uid in functions) { "Function does not exist (anymore)" } + _invocations.add(1) + function.invocations.add(1) + return suspendCancellableCoroutine { cont -> - if (!queue.add(InvocationRequest(function, cont))) { + if (!queue.add(InvocationRequest(clock.millis(), function, cont))) { cont.resumeWithException(IllegalStateException("Failed to enqueue request")) } else { schedule() @@ -218,7 +269,7 @@ internal class ServerlessServiceImpl( } } - internal fun delete(function: InternalFunction) { + fun delete(function: FunctionObject) { functions.remove(function.uid) functionsByName.remove(function.name) } @@ -227,14 +278,13 @@ internal class ServerlessServiceImpl( scope.cancel() // Stop all function instances - for ((_, instances) in instancesByFunction) { - instances.forEach(FunctionInstance::close) + for ((_, function) in functions) { + function.close() } - instancesByFunction.clear() } /** * A request to invoke a function. */ - private data class InvocationRequest(val function: InternalFunction, val cont: Continuation<Unit>) + private data class InvocationRequest(val timestamp: Long, val function: FunctionObject, val cont: Continuation<Unit>) } diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt index 015704ca..063fb80a 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RandomRoutingPolicy.kt @@ -22,7 +22,7 @@ package org.opendc.serverless.service.router -import org.opendc.serverless.api.ServerlessFunction +import org.opendc.serverless.service.FunctionObject import org.opendc.serverless.service.deployer.FunctionInstance import kotlin.random.Random @@ -30,7 +30,7 @@ import kotlin.random.Random * A [RoutingPolicy] that selects a random function instance. */ public class RandomRoutingPolicy(private val random: Random = Random(0)) : RoutingPolicy { - override fun select(instances: List<FunctionInstance>, function: ServerlessFunction): FunctionInstance { + override fun select(instances: List<FunctionInstance>, function: FunctionObject): FunctionInstance { return instances.random(random) } } diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt index 77f43059..d5d1166f 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/router/RoutingPolicy.kt @@ -22,7 +22,7 @@ package org.opendc.serverless.service.router -import org.opendc.serverless.api.ServerlessFunction +import org.opendc.serverless.service.FunctionObject import org.opendc.serverless.service.deployer.FunctionInstance /** @@ -32,5 +32,5 @@ public interface RoutingPolicy { /** * Select the instance to which the request should be routed to. */ - public fun select(instances: List<FunctionInstance>, function: ServerlessFunction): FunctionInstance? + public fun select(instances: List<FunctionInstance>, function: FunctionObject): FunctionInstance? } diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt b/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt index d9c2bcd2..bf99d0e7 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt +++ b/simulator/opendc-serverless/opendc-serverless-service/src/test/kotlin/org/opendc/serverless/service/ServerlessServiceTest.kt @@ -23,6 +23,7 @@ package org.opendc.serverless.service import io.mockk.* +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.* @@ -44,14 +45,15 @@ internal class ServerlessServiceTest { @Test fun testClientState() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = assertDoesNotThrow { service.newClient() } assertDoesNotThrow { client.close() } assertThrows<IllegalStateException> { client.queryFunctions() } - assertThrows<IllegalStateException> { client.newFunction("test") } + assertThrows<IllegalStateException> { client.newFunction("test", 128) } assertThrows<IllegalStateException> { client.invoke("test") } assertThrows<IllegalStateException> { client.findFunction(UUID.randomUUID()) } assertThrows<IllegalStateException> { client.findFunction("name") } @@ -59,8 +61,9 @@ internal class ServerlessServiceTest { @Test fun testClientInvokeUnknown() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() @@ -69,77 +72,83 @@ internal class ServerlessServiceTest { @Test fun testClientFunctionCreation() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() - val function = client.newFunction("test") + val function = client.newFunction("test", 128) assertEquals("test", function.name) } @Test fun testClientFunctionQuery() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() assertEquals(emptyList<ServerlessFunction>(), client.queryFunctions()) - val function = client.newFunction("test") + val function = client.newFunction("test", 128) assertEquals(listOf(function), client.queryFunctions()) } @Test fun testClientFunctionFindById() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() assertEquals(emptyList<ServerlessFunction>(), client.queryFunctions()) - val function = client.newFunction("test") + val function = client.newFunction("test", 128) assertNotNull(client.findFunction(function.uid)) } @Test fun testClientFunctionFindByName() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() assertEquals(emptyList<ServerlessFunction>(), client.queryFunctions()) - val function = client.newFunction("test") + val function = client.newFunction("test", 128) assertNotNull(client.findFunction(function.name)) } @Test fun testClientFunctionDuplicateName() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() - client.newFunction("test") + client.newFunction("test", 128) - assertThrows<IllegalArgumentException> { client.newFunction("test") } + assertThrows<IllegalArgumentException> { client.newFunction("test", 128) } } @Test fun testClientFunctionDelete() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() - val function = client.newFunction("test") + val function = client.newFunction("test", 128) assertNotNull(client.findFunction(function.uid)) function.delete() assertNull(client.findFunction(function.uid)) @@ -150,11 +159,12 @@ internal class ServerlessServiceTest { @Test fun testClientFunctionCannotInvokeDeleted() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val service = ServerlessService(coroutineContext, clock, mockk(), mockk()) + val service = ServerlessService(coroutineContext, clock, meter, mockk(), mockk()) val client = service.newClient() - val function = client.newFunction("test") + val function = client.newFunction("test", 128) assertNotNull(client.findFunction(function.uid)) function.delete() @@ -163,14 +173,15 @@ internal class ServerlessServiceTest { @Test fun testClientFunctionInvoke() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) val deployer = mockk<FunctionDeployer>() - val service = ServerlessService(coroutineContext, clock, deployer, mockk()) + val service = ServerlessService(coroutineContext, clock, meter, deployer, mockk()) every { deployer.deploy(any()) } answers { object : FunctionInstance { override val state: FunctionInstanceState = FunctionInstanceState.Idle - override val function: ServerlessFunction = it.invocation.args[0] as ServerlessFunction + override val function: FunctionObject = it.invocation.args[0] as FunctionObject override suspend fun invoke() {} @@ -179,7 +190,7 @@ internal class ServerlessServiceTest { } val client = service.newClient() - val function = client.newFunction("test") + val function = client.newFunction("test", 128) function.invoke() } diff --git a/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt index f28a926b..2945a279 100644 --- a/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt +++ b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt @@ -24,10 +24,11 @@ package org.opendc.serverless.simulator import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import org.opendc.serverless.api.ServerlessFunction +import org.opendc.serverless.service.FunctionObject import org.opendc.serverless.service.deployer.FunctionDeployer import org.opendc.serverless.service.deployer.FunctionInstance import org.opendc.serverless.service.deployer.FunctionInstanceState +import org.opendc.serverless.simulator.delay.DelayInjector import org.opendc.serverless.simulator.workload.SimServerlessWorkloadMapper import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.SimMachine @@ -48,10 +49,11 @@ public class SimFunctionDeployer( private val clock: Clock, private val scope: CoroutineScope, private val model: SimMachineModel, + private val delayInjector: DelayInjector, private val mapper: SimServerlessWorkloadMapper ) : FunctionDeployer { - override fun deploy(function: ServerlessFunction): Instance { + override fun deploy(function: FunctionObject): Instance { val instance = Instance(function) instance.start() return instance @@ -60,7 +62,7 @@ public class SimFunctionDeployer( /** * A simulated [FunctionInstance]. */ - public inner class Instance(override val function: ServerlessFunction) : FunctionInstance { + public inner class Instance(override val function: FunctionObject) : FunctionInstance { /** * The workload associated with this instance. */ @@ -111,33 +113,35 @@ public class SimFunctionDeployer( internal fun start() { check(state == FunctionInstanceState.Provisioning) { "Invalid state of function instance" } job = scope.launch { - workload.onStart() + delay(delayInjector.getColdStartDelay(this@Instance)) - try { - while (isActive) { - chan.receive() + launch { + try { + machine.run(workload) + } finally { + state = FunctionInstanceState.Terminated + } + } - if (queue.isNotEmpty()) { - state = FunctionInstanceState.Active - } + while (isActive) { + if (queue.isEmpty()) { + chan.receive() + } - while (queue.isNotEmpty()) { - val request = queue.poll() - try { - machine.run(workload.onInvoke()) - request.cont.resume(Unit) - } catch (cause: CancellationException) { - request.cont.resumeWithException(cause) - throw cause - } catch (cause: Throwable) { - request.cont.resumeWithException(cause) - } + state = FunctionInstanceState.Active + while (queue.isNotEmpty()) { + val request = queue.poll() + try { + workload.invoke() + request.cont.resume(Unit) + } catch (cause: CancellationException) { + request.cont.resumeWithException(cause) + throw cause + } catch (cause: Throwable) { + request.cont.resumeWithException(cause) } - state = FunctionInstanceState.Idle } - } finally { - state = FunctionInstanceState.Terminated - workload.onStop() + state = FunctionInstanceState.Idle } } } diff --git a/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/delay/ColdStartModel.kt b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/delay/ColdStartModel.kt new file mode 100644 index 00000000..f9f3718e --- /dev/null +++ b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/delay/ColdStartModel.kt @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.serverless.simulator.delay + +/** + * Model parameters for the cold start times of serverless services. + */ +public enum class ColdStartModel { + // Min and max memory values from [Peeking Behind The Curtains of Serverless Platforms][2018], + // other values deduced from linear curve. + LAMBDA { + override fun coldStartParam(provisionedMemory: Int): Pair<Double, Double> { + return when (provisionedMemory) { + 128 -> Pair(265.21, 354.43) + 256 -> Pair(261.46, 334.23) + 512 -> Pair(257.71, 314.03) + 1024 -> Pair(253.96, 293.83) + 1536 -> Pair(250.07, 273.63) + 2048 -> Pair(246.11, 253.43) + else -> Pair(0.0, 1.0) + } + } + }, + AZURE { + // Azure by default uses 1.5gb memory to instantiate functions + override fun coldStartParam(provisionedMemory: Int): Pair<Double, Double> { + return Pair(242.66, 340.67) + } + }, + + GOOGLE { + override fun coldStartParam(provisionedMemory: Int): Pair<Double, Double> { + return when (provisionedMemory) { + 128 -> Pair(493.04, 345.8) + 256 -> Pair(416.59, 301.5) + 512 -> Pair(340.14, 257.2) + 1024 -> Pair(263.69, 212.9) + 1536 -> Pair(187.24, 168.6) + 2048 -> Pair(110.77, 124.3) + else -> Pair(0.0, 1.0) + } + } + }; + + /** + * Obtain the stochastic parameters for the cold start models. + */ + public abstract fun coldStartParam(provisionedMemory: Int): Pair<Double, Double> +} diff --git a/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/delay/DelayInjector.kt b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/delay/DelayInjector.kt new file mode 100644 index 00000000..f882031b --- /dev/null +++ b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/delay/DelayInjector.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.serverless.simulator.delay + +import org.opendc.serverless.service.deployer.FunctionInstance + +/** + * An interface for modeling the delay caused by function cold starts. + */ +public interface DelayInjector { + /** + * Returns the cold start delay duration sampled from a normal distribution, the distribution is + * initialized using custom mean and standard deviation based on provisioned memory, language and + * failure model + */ + public fun getColdStartDelay(instance: FunctionInstance): Long +} diff --git a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/InternalFunction.kt b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/delay/StochasticDelayInjector.kt index a6e22912..154378e1 100644 --- a/simulator/opendc-serverless/opendc-serverless-service/src/main/kotlin/org/opendc/serverless/service/internal/InternalFunction.kt +++ b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/delay/StochasticDelayInjector.kt @@ -20,41 +20,18 @@ * SOFTWARE. */ -package org.opendc.serverless.service.internal +package org.opendc.serverless.simulator.delay -import org.opendc.serverless.api.ServerlessFunction +import org.opendc.serverless.service.deployer.FunctionInstance import java.util.* +import kotlin.math.abs -/** - * Internal stateful representation of a [ServerlessFunction]. +/* + * Interface for instance deployment delay estimation. */ -internal class InternalFunction( - private val service: ServerlessServiceImpl, - override val uid: UUID, - name: String, - labels: Map<String, String>, - meta: Map<String, Any> -) : ServerlessFunction { - override var name: String = name - private set - - override val labels: MutableMap<String, String> = labels.toMutableMap() - - override val meta: MutableMap<String, Any> = meta.toMutableMap() - - override suspend fun refresh() { - // No-op: this object is the source-of-truth - } - - override suspend fun invoke() { - service.invoke(this) - } - - override suspend fun delete() { - service.delete(this) +public class StochasticDelayInjector(private val model: ColdStartModel, private val random: Random) : DelayInjector { + override fun getColdStartDelay(instance: FunctionInstance): Long { + val (mean, sd) = model.coldStartParam(instance.function.memorySize.toInt()) + return abs(random.nextGaussian() * sd + mean).toLong() } - - override fun equals(other: Any?): Boolean = other is ServerlessFunction && uid == other.uid - - override fun hashCode(): Int = uid.hashCode() } diff --git a/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/delay/ZeroDelayInjector.kt b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/delay/ZeroDelayInjector.kt new file mode 100644 index 00000000..0895ee18 --- /dev/null +++ b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/delay/ZeroDelayInjector.kt @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.serverless.simulator.delay + +import org.opendc.serverless.service.deployer.FunctionInstance + +public object ZeroDelayInjector : DelayInjector { + override fun getColdStartDelay(instance: FunctionInstance): Long = 0 +} diff --git a/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/workload/SimServerlessWorkload.kt b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/workload/SimServerlessWorkload.kt index afdc05af..121bf915 100644 --- a/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/workload/SimServerlessWorkload.kt +++ b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/workload/SimServerlessWorkload.kt @@ -27,19 +27,9 @@ import org.opendc.simulator.compute.workload.SimWorkload /** * A model for a serverless workload, which may be invoked multiple times. */ -public interface SimServerlessWorkload { - /** - * This method is invoked when a function instance is launched. - */ - public fun onStart() {} - +public interface SimServerlessWorkload : SimWorkload { /** * This method is invoked when an active function instance is invoked. */ - public fun onInvoke(): SimWorkload - - /** - * This method is invoked when the function instance is stopped. - */ - public fun onStop() {} + public suspend fun invoke() } diff --git a/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/workload/SimServerlessWorkloadMapper.kt b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/workload/SimServerlessWorkloadMapper.kt index 670f978d..3a47eb53 100644 --- a/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/workload/SimServerlessWorkloadMapper.kt +++ b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/workload/SimServerlessWorkloadMapper.kt @@ -23,6 +23,7 @@ package org.opendc.serverless.simulator.workload import org.opendc.serverless.api.ServerlessFunction +import org.opendc.serverless.service.FunctionObject /** * A [SimServerlessWorkloadMapper] is responsible for mapping a [ServerlessFunction] to a [SimServerlessWorkload] that @@ -32,5 +33,5 @@ public fun interface SimServerlessWorkloadMapper { /** * Map the specified [function] to a [SimServerlessWorkload] that can be simulated. */ - public fun createWorkload(function: ServerlessFunction): SimServerlessWorkload + public fun createWorkload(function: FunctionObject): SimServerlessWorkload } diff --git a/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt b/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt index f68e206a..3a070475 100644 --- a/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt +++ b/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt @@ -22,8 +22,9 @@ package org.opendc.serverless.simulator +import io.mockk.coVerify import io.mockk.spyk -import io.mockk.verify +import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.test.runBlockingTest @@ -33,6 +34,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.serverless.service.ServerlessService import org.opendc.serverless.service.router.RandomRoutingPolicy +import org.opendc.serverless.simulator.delay.ZeroDelayInjector import org.opendc.serverless.simulator.workload.SimServerlessWorkload import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -62,16 +64,17 @@ internal class SimServerlessServiceTest { @Test fun testSmoke() = runBlockingTest { + val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val workload = spyk(object : SimServerlessWorkload { - override fun onInvoke(): SimWorkload = SimFlopsWorkload(1000) + val workload = spyk(object : SimServerlessWorkload, SimWorkload by SimFlopsWorkload(1000) { + override suspend fun invoke() {} }) - val deployer = SimFunctionDeployer(clock, this, machineModel) { workload } - val service = ServerlessService(coroutineContext, clock, deployer, RandomRoutingPolicy()) + val deployer = SimFunctionDeployer(clock, this, machineModel, ZeroDelayInjector) { workload } + val service = ServerlessService(coroutineContext, clock, meter, deployer, RandomRoutingPolicy()) val client = service.newClient() - val function = client.newFunction("test") + val function = client.newFunction("test", 128) function.invoke() delay(2000) @@ -80,9 +83,7 @@ internal class SimServerlessServiceTest { yield() assertAll( - { verify { workload.onStart() } }, - { verify { workload.onInvoke() } }, - { verify { workload.onStop() } } + { coVerify { workload.invoke() } }, ) } } diff --git a/simulator/settings.gradle.kts b/simulator/settings.gradle.kts index 3de0da5a..b80b394e 100644 --- a/simulator/settings.gradle.kts +++ b/simulator/settings.gradle.kts @@ -33,6 +33,7 @@ include(":opendc-serverless:opendc-serverless-simulator") include(":opendc-format") include(":opendc-experiments:opendc-experiments-capelin") include(":opendc-experiments:opendc-experiments-energy21") +include(":opendc-experiments:opendc-experiments-serverless20") include(":opendc-runner-web") include(":opendc-simulator:opendc-simulator-core") include(":opendc-simulator:opendc-simulator-resources") |
