summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-24 13:15:26 +0200
committerGitHub <noreply@github.com>2021-08-24 13:15:26 +0200
commitac48fa12f36180de31154a7c828b4dc281dac94b (patch)
tree3a1117b62e627094ea2859a8b1cb910ef7046851 /opendc-experiments/opendc-experiments-capelin/src/main
parent51515bb255b3b32ca3020419a0c84130a4d8d370 (diff)
parent5266ecd476a18f601cb4eb6166f4c8338c440210 (diff)
merge: Add tests for interference and failures in Capelin
This pull request updates the Capelin experiments to test for interference and failure scenarios. This allows us to track regressions in these subsystems more easily. * Clean up Bitbrains trace reader to enable re-use * Keep trace order after sampling * Update Bitbrains trace tests * Add support for reporting interfered work * Add support for SimHost failure * Add tests for interference and failures
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src/main')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt4
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt2
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt3
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt127
4 files changed, 32 insertions, 104 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index fa9fa2fc..d7df4454 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -255,6 +255,8 @@ suspend fun processTrace(
offset = entry.start - clock.millis()
}
+ // Make sure the trace entries are ordered by submission time
+ assert(entry.start - offset >= 0) { "Invalid trace order" }
delay(max(0, (entry.start - offset) - clock.millis()))
launch {
chan.send(Unit)
@@ -275,7 +277,7 @@ suspend fun processTrace(
override fun onStateChanged(server: Server, newState: ServerState) {
monitor.reportVmStateChange(clock.millis(), server, newState)
- if (newState == ServerState.TERMINATED || newState == ServerState.ERROR) {
+ if (newState == ServerState.TERMINATED) {
cont.resume(Unit)
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
index f520a28c..7fb2f83c 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
@@ -74,7 +74,7 @@ public class ExperimentMetricExporter(
m.overcommissionedBurst = v.toLong()
}
- mapDoubleSummary(metrics["cpu.work.interfered"], hostMetrics) { m, v ->
+ mapDoubleSummary(metrics["cpu.work.interference"], hostMetrics) { m, v ->
m.interferedBurst = v.toLong()
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
index 5ad75565..0f49ecd2 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt
@@ -53,8 +53,7 @@ public class ParquetTraceReader(
this.zip(listOf(workload))
}
}
- .map { sampleWorkload(it.first, workload, it.second, seed) }
- .flatten()
+ .flatMap { sampleWorkload(it.first, workload, it.second, seed).sortedBy(TraceEntry<SimWorkload>::start) }
.iterator()
override fun hasNext(): Boolean = iterator.hasNext()
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
index 7cd1f159..d7daa35b 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
@@ -26,12 +26,7 @@ import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.arguments.argument
import com.github.ajalt.clikt.parameters.groups.OptionGroup
import com.github.ajalt.clikt.parameters.groups.groupChoice
-import com.github.ajalt.clikt.parameters.options.convert
-import com.github.ajalt.clikt.parameters.options.default
-import com.github.ajalt.clikt.parameters.options.defaultLazy
-import com.github.ajalt.clikt.parameters.options.option
-import com.github.ajalt.clikt.parameters.options.required
-import com.github.ajalt.clikt.parameters.options.split
+import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.file
import com.github.ajalt.clikt.parameters.types.long
import me.tongfei.progressbar.ProgressBar
@@ -41,11 +36,13 @@ import org.apache.avro.generic.GenericData
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.opendc.format.trace.bitbrains.BitbrainsTraceReader
import org.opendc.format.util.LocalOutputFile
+import org.opendc.simulator.compute.workload.SimTraceWorkload
import java.io.BufferedReader
import java.io.File
import java.io.FileReader
-import java.util.Random
+import java.util.*
import kotlin.math.max
import kotlin.math.min
@@ -340,106 +337,36 @@ class BitbrainsConversion : TraceConversion("Bitbrains") {
metaSchema: Schema,
metaWriter: ParquetWriter<GenericData.Record>
): MutableList<Fragment> {
- val timestampCol = 0
- val cpuUsageCol = 3
- val coreCol = 1
- val provisionedMemoryCol = 5
- val traceInterval = 5 * 60 * 1000L
-
- val allFragments = mutableListOf<Fragment>()
-
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
- .forEach { vmFile ->
- println(vmFile)
-
- var vmId = ""
- var maxCores = -1
- var requiredMemory = -1L
- var cores: Int
- var minTime = Long.MAX_VALUE
-
- val flopsFragments = sequence {
- var last: Fragment? = null
-
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .drop(1)
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val values = line.split(";\t")
-
- vmId = vmFile.name
-
- val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
-
- cores = values[coreCol].trim().toInt()
- val provisionedMemory = values[provisionedMemoryCol].trim().toDouble() // KB
- requiredMemory = max(requiredMemory, (provisionedMemory / 1000).toLong())
- maxCores = max(maxCores, cores)
- minTime = min(minTime, timestamp)
- val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
-
- val flops: Long = (cpuUsage * 5 * 60).toLong()
-
- last = if (last != null && last!!.flops == 0L && flops == 0L) {
- val oldFragment = last!!
- Fragment(
- vmId,
- oldFragment.tick,
- oldFragment.flops + flops,
- oldFragment.duration + traceInterval,
- cpuUsage,
- cores
- )
- } else {
- val fragment =
- Fragment(
- vmId,
- timestamp,
- flops,
- traceInterval,
- cpuUsage,
- cores
- )
- if (last != null) {
- yield(last!!)
- }
- fragment
- }
- }
- }
- }
-
- if (last != null) {
- yield(last!!)
- }
- }
-
+ val fragments = mutableListOf<Fragment>()
+ BitbrainsTraceReader(traceDirectory).use { reader ->
+ reader.forEach { entry ->
+ val trace = (entry.workload as SimTraceWorkload).trace
var maxTime = Long.MIN_VALUE
- flopsFragments.forEach { fragment ->
- allFragments.add(fragment)
- maxTime = max(maxTime, fragment.tick)
+ trace.forEach { fragment ->
+ val flops: Long = (fragment.usage * fragment.duration / 1000).toLong()
+ fragments.add(
+ Fragment(
+ entry.name,
+ fragment.timestamp,
+ flops,
+ fragment.duration,
+ fragment.usage,
+ fragment.cores
+ )
+ )
+ maxTime = max(maxTime, fragment.timestamp + fragment.duration)
}
val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", vmId)
- metaRecord.put("submissionTime", minTime)
+ metaRecord.put("id", entry.name)
+ metaRecord.put("submissionTime", entry.start)
metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", maxCores)
- metaRecord.put("requiredMemory", requiredMemory)
+ metaRecord.put("maxCores", entry.meta["cores"])
+ metaRecord.put("requiredMemory", entry.meta["required-memory"])
metaWriter.write(metaRecord)
}
-
- return allFragments
+ }
+ return fragments
}
}