diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-31 15:26:45 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-04-08 20:19:49 +0200 |
| commit | 8df422ca5164bd712caf594951669ebeb656f5fb (patch) | |
| tree | 93a1232d3b9bc9e5f58b699d0cc0351c75f88346 /simulator | |
| parent | c2250720d694c6e7e19b3c0ba2fc27a124d3cadb (diff) | |
exp: Add experiment testing the serverless module
This change adds an experiments testing the OpenDC Serverless module.
Diffstat (limited to 'simulator')
8 files changed, 220 insertions, 48 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts index 4c7d97b1..40b15af4 100644 --- a/simulator/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts +++ b/simulator/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts @@ -37,4 +37,10 @@ dependencies { implementation(project(":opendc-telemetry:opendc-telemetry-sdk")) implementation("io.github.microutils:kotlin-logging") + + implementation("org.apache.parquet:parquet-avro:${versions["parquet-avro"]}") + implementation("org.apache.hadoop:hadoop-client:${versions["hadoop-client"]}") { + exclude(group = "org.slf4j", module = "slf4j-log4j12") + exclude(group = "log4j") + } } diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt new file mode 100644 index 00000000..e4dc6753 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.serverless + +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runBlockingTest +import mu.KotlinLogging +import org.opendc.experiments.serverless.trace.FunctionTraceWorkload +import org.opendc.experiments.serverless.trace.ServerlessTraceReader +import org.opendc.harness.dsl.Experiment +import org.opendc.harness.dsl.anyOf +import org.opendc.serverless.service.ServerlessService +import org.opendc.serverless.service.router.RandomRoutingPolicy +import org.opendc.serverless.simulator.SimFunctionDeployer +import org.opendc.simulator.compute.SimMachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.telemetry.sdk.toOtelClock +import java.io.File +import kotlin.math.max + +/** + * A reproduction of the experiments of Soufiane Jounaid's BSc Computer Science thesis: + * OpenDC Serverless: Design, Implementation and Evaluation of a FaaS Platform Simulator. + */ +public class ServerlessExperiment : Experiment("Serverless") { + /** + * The logger for this portfolio instance. + */ + private val logger = KotlinLogging.logger {} + + /** + * The path to where the traces are located. + */ + private val tracePath by anyOf(File("../../input/traces/serverless")) + + /** + * The path to where the output results should be written. + */ + private val outputPath by anyOf(File("output/")) + + /** + * The routing policy to test. + */ + private val routingPolicy by anyOf(RandomRoutingPolicy()) + + @OptIn(ExperimentalCoroutinesApi::class) + override fun doRun(repeat: Int): Unit = runBlockingTest { + val clock = DelayControllerClockAdapter(this) + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + val trace = ServerlessTraceReader().parse(tracePath) + val traceById = trace.associateBy { it.id } + val deployer = SimFunctionDeployer(clock, this, createMachineModel()) { FunctionTraceWorkload(traceById.getValue(it.name)) } + val service = ServerlessService(coroutineContext, clock, meterProvider.get("opendc-serverless"), deployer, routingPolicy) + val client = service.newClient() + + coroutineScope { + for (entry in trace) { + launch { + val function = client.newFunction(entry.id, entry.maxMemory.toLong()) + var offset = Long.MIN_VALUE + + for (sample in entry.samples) { + if (sample.invocations == 0) { + continue + } + + if (offset < 0) { + offset = sample.timestamp - clock.millis() + } + + delay(max(0, (sample.timestamp - offset) - clock.millis())) + + logger.info { "Invoking function ${entry.id} ${sample.invocations} times [${sample.timestamp}]" } + + repeat(sample.invocations) { + function.invoke() + } + } + } + } + } + + client.close() + service.close() + } + + /** + * Construct the machine model to test with. + */ + private fun createMachineModel(): SimMachineModel { + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + + return SimMachineModel( + cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, + memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + ) + } +} diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTrace.kt b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTrace.kt index 97c98e2a..4fea6b96 100644 --- a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTrace.kt +++ b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTrace.kt @@ -25,4 +25,4 @@ package org.opendc.experiments.serverless.trace /** * A trace for a single function */ -public data class FunctionTrace(val id: String, val samples: List<FunctionSample>) +public data class FunctionTrace(val id: String, val maxMemory: Int, val samples: List<FunctionSample>) diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt new file mode 100644 index 00000000..7d824857 --- /dev/null +++ b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.serverless.trace + +import org.opendc.serverless.simulator.workload.SimServerlessWorkload +import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.simulator.compute.workload.SimWorkload + +/** + * A [SimServerlessWorkload] for a [FunctionTrace]. + */ +public class FunctionTraceWorkload(trace: FunctionTrace) : SimServerlessWorkload, SimWorkload by SimTraceWorkload(trace.samples.asSequence().map { SimTraceWorkload.Fragment(it.duration, it.cpuUsage, 1) }) { + override suspend fun invoke() {} +} diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/ServerlessTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/ServerlessTraceReader.kt index 87dd33fa..6dc35c59 100644 --- a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/ServerlessTraceReader.kt +++ b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/ServerlessTraceReader.kt @@ -22,13 +22,20 @@ package org.opendc.experiments.serverless.trace +import mu.KotlinLogging import java.io.File +import kotlin.math.max /** * A trace reader for the serverless workload trace used in the OpenDC Serverless thesis. */ public class ServerlessTraceReader { /** + * The logger for this portfolio instance. + */ + private val logger = KotlinLogging.logger {} + + /** * Parse the traces at the specified [path]. */ public fun parse(path: File): List<FunctionTrace> { @@ -37,7 +44,10 @@ public class ServerlessTraceReader { } else { path.walk() .filterNot { it.isDirectory } - .map { parseSingle(it) } + .map { file -> + logger.info { "Parsing $file" } + parseSingle(file) + } .toList() } } @@ -57,16 +67,17 @@ public class ServerlessTraceReader { var provMemCol = 0 var cpuUsageCol = 0 var memoryUsageCol = 0 + var maxMemory = 0 path.forEachLine { line -> - if (line.startsWith("#") && line.isNotBlank()) { - return@forEachLine - } + if (line.startsWith("#") && line.isNotBlank()) { + return@forEachLine + } val values = line.split(",") /* Header parsing */ - if (++idx == 0){ + if (idx++ == 0) { val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap() timestampCol = header["Timestamp [ms]"]!! invocationsCol = header["Invocations"]!! @@ -78,7 +89,7 @@ public class ServerlessTraceReader { return@forEachLine } - val timestamp = values[timestampCol].trim().toLong() + val timestamp = values[timestampCol].trim().toLong() val invocations = values[invocationsCol].trim().toInt() val execTime = values[execTimeCol].trim().toLong() val provisionedCpu = values[provCpuCol].trim().toInt() @@ -86,9 +97,11 @@ public class ServerlessTraceReader { val cpuUsage = values[cpuUsageCol].trim().toDouble() val memoryUsage = values[memoryUsageCol].trim().toDouble() + maxMemory = max(maxMemory, provisionedMemory) + samples.add(FunctionSample(timestamp, execTime, invocations, provisionedCpu, provisionedMemory, cpuUsage, memoryUsage)) } - return FunctionTrace(id, samples) + return FunctionTrace(id, maxMemory, samples) } } diff --git a/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt index f28a926b..160f8ebb 100644 --- a/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt +++ b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt @@ -111,33 +111,34 @@ public class SimFunctionDeployer( internal fun start() { check(state == FunctionInstanceState.Provisioning) { "Invalid state of function instance" } job = scope.launch { - workload.onStart() + launch { + try { + machine.run(workload) + } finally { + state = FunctionInstanceState.Terminated + } + } - try { - while (isActive) { - chan.receive() + while (isActive) { + chan.receive() - if (queue.isNotEmpty()) { - state = FunctionInstanceState.Active - } + if (queue.isNotEmpty()) { + state = FunctionInstanceState.Active + } - while (queue.isNotEmpty()) { - val request = queue.poll() - try { - machine.run(workload.onInvoke()) - request.cont.resume(Unit) - } catch (cause: CancellationException) { - request.cont.resumeWithException(cause) - throw cause - } catch (cause: Throwable) { - request.cont.resumeWithException(cause) - } + while (queue.isNotEmpty()) { + val request = queue.poll() + try { + workload.invoke() + request.cont.resume(Unit) + } catch (cause: CancellationException) { + request.cont.resumeWithException(cause) + throw cause + } catch (cause: Throwable) { + request.cont.resumeWithException(cause) } - state = FunctionInstanceState.Idle } - } finally { - state = FunctionInstanceState.Terminated - workload.onStop() + state = FunctionInstanceState.Idle } } } diff --git a/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/workload/SimServerlessWorkload.kt b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/workload/SimServerlessWorkload.kt index afdc05af..121bf915 100644 --- a/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/workload/SimServerlessWorkload.kt +++ b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/workload/SimServerlessWorkload.kt @@ -27,19 +27,9 @@ import org.opendc.simulator.compute.workload.SimWorkload /** * A model for a serverless workload, which may be invoked multiple times. */ -public interface SimServerlessWorkload { - /** - * This method is invoked when a function instance is launched. - */ - public fun onStart() {} - +public interface SimServerlessWorkload : SimWorkload { /** * This method is invoked when an active function instance is invoked. */ - public fun onInvoke(): SimWorkload - - /** - * This method is invoked when the function instance is stopped. - */ - public fun onStop() {} + public suspend fun invoke() } diff --git a/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt b/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt index 597b5ce0..89998585 100644 --- a/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt +++ b/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt @@ -22,8 +22,8 @@ package org.opendc.serverless.simulator +import io.mockk.coVerify import io.mockk.spyk -import io.mockk.verify import io.opentelemetry.api.metrics.MeterProvider import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay @@ -65,8 +65,8 @@ internal class SimServerlessServiceTest { fun testSmoke() = runBlockingTest { val meter = MeterProvider.noop().get("opendc-serverless") val clock = DelayControllerClockAdapter(this) - val workload = spyk(object : SimServerlessWorkload { - override fun onInvoke(): SimWorkload = SimFlopsWorkload(1000) + val workload = spyk(object : SimServerlessWorkload, SimWorkload by SimFlopsWorkload(1000) { + override suspend fun invoke() {} }) val deployer = SimFunctionDeployer(clock, this, machineModel) { workload } val service = ServerlessService(coroutineContext, clock, meter, deployer, RandomRoutingPolicy()) @@ -82,9 +82,7 @@ internal class SimServerlessServiceTest { yield() assertAll( - { verify { workload.onStart() } }, - { verify { workload.onInvoke() } }, - { verify { workload.onStop() } } + { coVerify { workload.invoke() } }, ) } } |
