summaryrefslogtreecommitdiff
path: root/simulator
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-31 15:26:45 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-04-08 20:19:49 +0200
commit8df422ca5164bd712caf594951669ebeb656f5fb (patch)
tree93a1232d3b9bc9e5f58b699d0cc0351c75f88346 /simulator
parentc2250720d694c6e7e19b3c0ba2fc27a124d3cadb (diff)
exp: Add experiment testing the serverless module
This change adds an experiments testing the OpenDC Serverless module.
Diffstat (limited to 'simulator')
-rw-r--r--simulator/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts6
-rw-r--r--simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt130
-rw-r--r--simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTrace.kt2
-rw-r--r--simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt34
-rw-r--r--simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/ServerlessTraceReader.kt27
-rw-r--r--simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt45
-rw-r--r--simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/workload/SimServerlessWorkload.kt14
-rw-r--r--simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt10
8 files changed, 220 insertions, 48 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts
index 4c7d97b1..40b15af4 100644
--- a/simulator/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts
+++ b/simulator/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts
@@ -37,4 +37,10 @@ dependencies {
implementation(project(":opendc-telemetry:opendc-telemetry-sdk"))
implementation("io.github.microutils:kotlin-logging")
+
+ implementation("org.apache.parquet:parquet-avro:${versions["parquet-avro"]}")
+ implementation("org.apache.hadoop:hadoop-client:${versions["hadoop-client"]}") {
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ exclude(group = "log4j")
+ }
}
diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt
new file mode 100644
index 00000000..e4dc6753
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt
@@ -0,0 +1,130 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.serverless
+
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.runBlockingTest
+import mu.KotlinLogging
+import org.opendc.experiments.serverless.trace.FunctionTraceWorkload
+import org.opendc.experiments.serverless.trace.ServerlessTraceReader
+import org.opendc.harness.dsl.Experiment
+import org.opendc.harness.dsl.anyOf
+import org.opendc.serverless.service.ServerlessService
+import org.opendc.serverless.service.router.RandomRoutingPolicy
+import org.opendc.serverless.simulator.SimFunctionDeployer
+import org.opendc.simulator.compute.SimMachineModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.telemetry.sdk.toOtelClock
+import java.io.File
+import kotlin.math.max
+
+/**
+ * A reproduction of the experiments of Soufiane Jounaid's BSc Computer Science thesis:
+ * OpenDC Serverless: Design, Implementation and Evaluation of a FaaS Platform Simulator.
+ */
+public class ServerlessExperiment : Experiment("Serverless") {
+ /**
+ * The logger for this portfolio instance.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The path to where the traces are located.
+ */
+ private val tracePath by anyOf(File("../../input/traces/serverless"))
+
+ /**
+ * The path to where the output results should be written.
+ */
+ private val outputPath by anyOf(File("output/"))
+
+ /**
+ * The routing policy to test.
+ */
+ private val routingPolicy by anyOf(RandomRoutingPolicy())
+
+ @OptIn(ExperimentalCoroutinesApi::class)
+ override fun doRun(repeat: Int): Unit = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
+ val trace = ServerlessTraceReader().parse(tracePath)
+ val traceById = trace.associateBy { it.id }
+ val deployer = SimFunctionDeployer(clock, this, createMachineModel()) { FunctionTraceWorkload(traceById.getValue(it.name)) }
+ val service = ServerlessService(coroutineContext, clock, meterProvider.get("opendc-serverless"), deployer, routingPolicy)
+ val client = service.newClient()
+
+ coroutineScope {
+ for (entry in trace) {
+ launch {
+ val function = client.newFunction(entry.id, entry.maxMemory.toLong())
+ var offset = Long.MIN_VALUE
+
+ for (sample in entry.samples) {
+ if (sample.invocations == 0) {
+ continue
+ }
+
+ if (offset < 0) {
+ offset = sample.timestamp - clock.millis()
+ }
+
+ delay(max(0, (sample.timestamp - offset) - clock.millis()))
+
+ logger.info { "Invoking function ${entry.id} ${sample.invocations} times [${sample.timestamp}]" }
+
+ repeat(sample.invocations) {
+ function.invoke()
+ }
+ }
+ }
+ }
+ }
+
+ client.close()
+ service.close()
+ }
+
+ /**
+ * Construct the machine model to test with.
+ */
+ private fun createMachineModel(): SimMachineModel {
+ val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
+
+ return SimMachineModel(
+ cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) },
+ memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ )
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTrace.kt b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTrace.kt
index 97c98e2a..4fea6b96 100644
--- a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTrace.kt
+++ b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTrace.kt
@@ -25,4 +25,4 @@ package org.opendc.experiments.serverless.trace
/**
* A trace for a single function
*/
-public data class FunctionTrace(val id: String, val samples: List<FunctionSample>)
+public data class FunctionTrace(val id: String, val maxMemory: Int, val samples: List<FunctionSample>)
diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt
new file mode 100644
index 00000000..7d824857
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.serverless.trace
+
+import org.opendc.serverless.simulator.workload.SimServerlessWorkload
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
+
+/**
+ * A [SimServerlessWorkload] for a [FunctionTrace].
+ */
+public class FunctionTraceWorkload(trace: FunctionTrace) : SimServerlessWorkload, SimWorkload by SimTraceWorkload(trace.samples.asSequence().map { SimTraceWorkload.Fragment(it.duration, it.cpuUsage, 1) }) {
+ override suspend fun invoke() {}
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/ServerlessTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/ServerlessTraceReader.kt
index 87dd33fa..6dc35c59 100644
--- a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/ServerlessTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/ServerlessTraceReader.kt
@@ -22,13 +22,20 @@
package org.opendc.experiments.serverless.trace
+import mu.KotlinLogging
import java.io.File
+import kotlin.math.max
/**
* A trace reader for the serverless workload trace used in the OpenDC Serverless thesis.
*/
public class ServerlessTraceReader {
/**
+ * The logger for this portfolio instance.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
* Parse the traces at the specified [path].
*/
public fun parse(path: File): List<FunctionTrace> {
@@ -37,7 +44,10 @@ public class ServerlessTraceReader {
} else {
path.walk()
.filterNot { it.isDirectory }
- .map { parseSingle(it) }
+ .map { file ->
+ logger.info { "Parsing $file" }
+ parseSingle(file)
+ }
.toList()
}
}
@@ -57,16 +67,17 @@ public class ServerlessTraceReader {
var provMemCol = 0
var cpuUsageCol = 0
var memoryUsageCol = 0
+ var maxMemory = 0
path.forEachLine { line ->
- if (line.startsWith("#") && line.isNotBlank()) {
- return@forEachLine
- }
+ if (line.startsWith("#") && line.isNotBlank()) {
+ return@forEachLine
+ }
val values = line.split(",")
/* Header parsing */
- if (++idx == 0){
+ if (idx++ == 0) {
val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap()
timestampCol = header["Timestamp [ms]"]!!
invocationsCol = header["Invocations"]!!
@@ -78,7 +89,7 @@ public class ServerlessTraceReader {
return@forEachLine
}
- val timestamp = values[timestampCol].trim().toLong()
+ val timestamp = values[timestampCol].trim().toLong()
val invocations = values[invocationsCol].trim().toInt()
val execTime = values[execTimeCol].trim().toLong()
val provisionedCpu = values[provCpuCol].trim().toInt()
@@ -86,9 +97,11 @@ public class ServerlessTraceReader {
val cpuUsage = values[cpuUsageCol].trim().toDouble()
val memoryUsage = values[memoryUsageCol].trim().toDouble()
+ maxMemory = max(maxMemory, provisionedMemory)
+
samples.add(FunctionSample(timestamp, execTime, invocations, provisionedCpu, provisionedMemory, cpuUsage, memoryUsage))
}
- return FunctionTrace(id, samples)
+ return FunctionTrace(id, maxMemory, samples)
}
}
diff --git a/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt
index f28a926b..160f8ebb 100644
--- a/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt
+++ b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/SimFunctionDeployer.kt
@@ -111,33 +111,34 @@ public class SimFunctionDeployer(
internal fun start() {
check(state == FunctionInstanceState.Provisioning) { "Invalid state of function instance" }
job = scope.launch {
- workload.onStart()
+ launch {
+ try {
+ machine.run(workload)
+ } finally {
+ state = FunctionInstanceState.Terminated
+ }
+ }
- try {
- while (isActive) {
- chan.receive()
+ while (isActive) {
+ chan.receive()
- if (queue.isNotEmpty()) {
- state = FunctionInstanceState.Active
- }
+ if (queue.isNotEmpty()) {
+ state = FunctionInstanceState.Active
+ }
- while (queue.isNotEmpty()) {
- val request = queue.poll()
- try {
- machine.run(workload.onInvoke())
- request.cont.resume(Unit)
- } catch (cause: CancellationException) {
- request.cont.resumeWithException(cause)
- throw cause
- } catch (cause: Throwable) {
- request.cont.resumeWithException(cause)
- }
+ while (queue.isNotEmpty()) {
+ val request = queue.poll()
+ try {
+ workload.invoke()
+ request.cont.resume(Unit)
+ } catch (cause: CancellationException) {
+ request.cont.resumeWithException(cause)
+ throw cause
+ } catch (cause: Throwable) {
+ request.cont.resumeWithException(cause)
}
- state = FunctionInstanceState.Idle
}
- } finally {
- state = FunctionInstanceState.Terminated
- workload.onStop()
+ state = FunctionInstanceState.Idle
}
}
}
diff --git a/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/workload/SimServerlessWorkload.kt b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/workload/SimServerlessWorkload.kt
index afdc05af..121bf915 100644
--- a/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/workload/SimServerlessWorkload.kt
+++ b/simulator/opendc-serverless/opendc-serverless-simulator/src/main/kotlin/org/opendc/serverless/simulator/workload/SimServerlessWorkload.kt
@@ -27,19 +27,9 @@ import org.opendc.simulator.compute.workload.SimWorkload
/**
* A model for a serverless workload, which may be invoked multiple times.
*/
-public interface SimServerlessWorkload {
- /**
- * This method is invoked when a function instance is launched.
- */
- public fun onStart() {}
-
+public interface SimServerlessWorkload : SimWorkload {
/**
* This method is invoked when an active function instance is invoked.
*/
- public fun onInvoke(): SimWorkload
-
- /**
- * This method is invoked when the function instance is stopped.
- */
- public fun onStop() {}
+ public suspend fun invoke()
}
diff --git a/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt b/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt
index 597b5ce0..89998585 100644
--- a/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt
+++ b/simulator/opendc-serverless/opendc-serverless-simulator/src/test/kotlin/org/opendc/serverless/simulator/SimServerlessServiceTest.kt
@@ -22,8 +22,8 @@
package org.opendc.serverless.simulator
+import io.mockk.coVerify
import io.mockk.spyk
-import io.mockk.verify
import io.opentelemetry.api.metrics.MeterProvider
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
@@ -65,8 +65,8 @@ internal class SimServerlessServiceTest {
fun testSmoke() = runBlockingTest {
val meter = MeterProvider.noop().get("opendc-serverless")
val clock = DelayControllerClockAdapter(this)
- val workload = spyk(object : SimServerlessWorkload {
- override fun onInvoke(): SimWorkload = SimFlopsWorkload(1000)
+ val workload = spyk(object : SimServerlessWorkload, SimWorkload by SimFlopsWorkload(1000) {
+ override suspend fun invoke() {}
})
val deployer = SimFunctionDeployer(clock, this, machineModel) { workload }
val service = ServerlessService(coroutineContext, clock, meter, deployer, RandomRoutingPolicy())
@@ -82,9 +82,7 @@ internal class SimServerlessServiceTest {
yield()
assertAll(
- { verify { workload.onStart() } },
- { verify { workload.onInvoke() } },
- { verify { workload.onStop() } }
+ { coVerify { workload.invoke() } },
)
}
}