summaryrefslogtreecommitdiff
path: root/simulator/opendc-experiments/opendc-experiments-serverless20
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-serverless20')
-rw-r--r--simulator/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts6
-rw-r--r--simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt130
-rw-r--r--simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTrace.kt2
-rw-r--r--simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt34
-rw-r--r--simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/ServerlessTraceReader.kt27
5 files changed, 191 insertions, 8 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts b/simulator/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts
index 4c7d97b1..40b15af4 100644
--- a/simulator/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts
+++ b/simulator/opendc-experiments/opendc-experiments-serverless20/build.gradle.kts
@@ -37,4 +37,10 @@ dependencies {
implementation(project(":opendc-telemetry:opendc-telemetry-sdk"))
implementation("io.github.microutils:kotlin-logging")
+
+ implementation("org.apache.parquet:parquet-avro:${versions["parquet-avro"]}")
+ implementation("org.apache.hadoop:hadoop-client:${versions["hadoop-client"]}") {
+ exclude(group = "org.slf4j", module = "slf4j-log4j12")
+ exclude(group = "log4j")
+ }
}
diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt
new file mode 100644
index 00000000..e4dc6753
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/ServerlessExperiment.kt
@@ -0,0 +1,130 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.serverless
+
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.runBlockingTest
+import mu.KotlinLogging
+import org.opendc.experiments.serverless.trace.FunctionTraceWorkload
+import org.opendc.experiments.serverless.trace.ServerlessTraceReader
+import org.opendc.harness.dsl.Experiment
+import org.opendc.harness.dsl.anyOf
+import org.opendc.serverless.service.ServerlessService
+import org.opendc.serverless.service.router.RandomRoutingPolicy
+import org.opendc.serverless.simulator.SimFunctionDeployer
+import org.opendc.simulator.compute.SimMachineModel
+import org.opendc.simulator.compute.model.MemoryUnit
+import org.opendc.simulator.compute.model.ProcessingNode
+import org.opendc.simulator.compute.model.ProcessingUnit
+import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.telemetry.sdk.toOtelClock
+import java.io.File
+import kotlin.math.max
+
+/**
+ * A reproduction of the experiments of Soufiane Jounaid's BSc Computer Science thesis:
+ * OpenDC Serverless: Design, Implementation and Evaluation of a FaaS Platform Simulator.
+ */
+public class ServerlessExperiment : Experiment("Serverless") {
+ /**
+ * The logger for this portfolio instance.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The path to where the traces are located.
+ */
+ private val tracePath by anyOf(File("../../input/traces/serverless"))
+
+ /**
+ * The path to where the output results should be written.
+ */
+ private val outputPath by anyOf(File("output/"))
+
+ /**
+ * The routing policy to test.
+ */
+ private val routingPolicy by anyOf(RandomRoutingPolicy())
+
+ @OptIn(ExperimentalCoroutinesApi::class)
+ override fun doRun(repeat: Int): Unit = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
+ val trace = ServerlessTraceReader().parse(tracePath)
+ val traceById = trace.associateBy { it.id }
+ val deployer = SimFunctionDeployer(clock, this, createMachineModel()) { FunctionTraceWorkload(traceById.getValue(it.name)) }
+ val service = ServerlessService(coroutineContext, clock, meterProvider.get("opendc-serverless"), deployer, routingPolicy)
+ val client = service.newClient()
+
+ coroutineScope {
+ for (entry in trace) {
+ launch {
+ val function = client.newFunction(entry.id, entry.maxMemory.toLong())
+ var offset = Long.MIN_VALUE
+
+ for (sample in entry.samples) {
+ if (sample.invocations == 0) {
+ continue
+ }
+
+ if (offset < 0) {
+ offset = sample.timestamp - clock.millis()
+ }
+
+ delay(max(0, (sample.timestamp - offset) - clock.millis()))
+
+ logger.info { "Invoking function ${entry.id} ${sample.invocations} times [${sample.timestamp}]" }
+
+ repeat(sample.invocations) {
+ function.invoke()
+ }
+ }
+ }
+ }
+ }
+
+ client.close()
+ service.close()
+ }
+
+ /**
+ * Construct the machine model to test with.
+ */
+ private fun createMachineModel(): SimMachineModel {
+ val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
+
+ return SimMachineModel(
+ cpus = List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) },
+ memory = List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) }
+ )
+ }
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTrace.kt b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTrace.kt
index 97c98e2a..4fea6b96 100644
--- a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTrace.kt
+++ b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTrace.kt
@@ -25,4 +25,4 @@ package org.opendc.experiments.serverless.trace
/**
* A trace for a single function
*/
-public data class FunctionTrace(val id: String, val samples: List<FunctionSample>)
+public data class FunctionTrace(val id: String, val maxMemory: Int, val samples: List<FunctionSample>)
diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt
new file mode 100644
index 00000000..7d824857
--- /dev/null
+++ b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/FunctionTraceWorkload.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.experiments.serverless.trace
+
+import org.opendc.serverless.simulator.workload.SimServerlessWorkload
+import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.simulator.compute.workload.SimWorkload
+
+/**
+ * A [SimServerlessWorkload] for a [FunctionTrace].
+ */
+public class FunctionTraceWorkload(trace: FunctionTrace) : SimServerlessWorkload, SimWorkload by SimTraceWorkload(trace.samples.asSequence().map { SimTraceWorkload.Fragment(it.duration, it.cpuUsage, 1) }) {
+ override suspend fun invoke() {}
+}
diff --git a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/ServerlessTraceReader.kt b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/ServerlessTraceReader.kt
index 87dd33fa..6dc35c59 100644
--- a/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/ServerlessTraceReader.kt
+++ b/simulator/opendc-experiments/opendc-experiments-serverless20/src/main/kotlin/org/opendc/experiments/serverless/trace/ServerlessTraceReader.kt
@@ -22,13 +22,20 @@
package org.opendc.experiments.serverless.trace
+import mu.KotlinLogging
import java.io.File
+import kotlin.math.max
/**
* A trace reader for the serverless workload trace used in the OpenDC Serverless thesis.
*/
public class ServerlessTraceReader {
/**
+ * The logger for this portfolio instance.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
* Parse the traces at the specified [path].
*/
public fun parse(path: File): List<FunctionTrace> {
@@ -37,7 +44,10 @@ public class ServerlessTraceReader {
} else {
path.walk()
.filterNot { it.isDirectory }
- .map { parseSingle(it) }
+ .map { file ->
+ logger.info { "Parsing $file" }
+ parseSingle(file)
+ }
.toList()
}
}
@@ -57,16 +67,17 @@ public class ServerlessTraceReader {
var provMemCol = 0
var cpuUsageCol = 0
var memoryUsageCol = 0
+ var maxMemory = 0
path.forEachLine { line ->
- if (line.startsWith("#") && line.isNotBlank()) {
- return@forEachLine
- }
+ if (line.startsWith("#") && line.isNotBlank()) {
+ return@forEachLine
+ }
val values = line.split(",")
/* Header parsing */
- if (++idx == 0){
+ if (idx++ == 0) {
val header = values.mapIndexed { col, name -> Pair(name.trim(), col) }.toMap()
timestampCol = header["Timestamp [ms]"]!!
invocationsCol = header["Invocations"]!!
@@ -78,7 +89,7 @@ public class ServerlessTraceReader {
return@forEachLine
}
- val timestamp = values[timestampCol].trim().toLong()
+ val timestamp = values[timestampCol].trim().toLong()
val invocations = values[invocationsCol].trim().toInt()
val execTime = values[execTimeCol].trim().toLong()
val provisionedCpu = values[provCpuCol].trim().toInt()
@@ -86,9 +97,11 @@ public class ServerlessTraceReader {
val cpuUsage = values[cpuUsageCol].trim().toDouble()
val memoryUsage = values[memoryUsageCol].trim().toDouble()
+ maxMemory = max(maxMemory, provisionedMemory)
+
samples.add(FunctionSample(timestamp, execTime, invocations, provisionedCpu, provisionedMemory, cpuUsage, memoryUsage))
}
- return FunctionTrace(id, samples)
+ return FunctionTrace(id, maxMemory, samples)
}
}