diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-08-24 13:15:26 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-08-24 13:15:26 +0200 |
| commit | ac48fa12f36180de31154a7c828b4dc281dac94b (patch) | |
| tree | 3a1117b62e627094ea2859a8b1cb910ef7046851 /opendc-experiments/opendc-experiments-capelin/src/main | |
| parent | 51515bb255b3b32ca3020419a0c84130a4d8d370 (diff) | |
| parent | 5266ecd476a18f601cb4eb6166f4c8338c440210 (diff) | |
merge: Add tests for interference and failures in Capelin
This pull request updates the Capelin experiments to test for interference and failure scenarios.
This allows us to track regressions in these subsystems more easily.
* Clean up Bitbrains trace reader to enable re-use
* Keep trace order after sampling
* Update Bitbrains trace tests
* Add support for reporting interfered work
* Add support for SimHost failure
* Add tests for interference and failures
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src/main')
4 files changed, 32 insertions, 104 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index fa9fa2fc..d7df4454 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -255,6 +255,8 @@ suspend fun processTrace( offset = entry.start - clock.millis() } + // Make sure the trace entries are ordered by submission time + assert(entry.start - offset >= 0) { "Invalid trace order" } delay(max(0, (entry.start - offset) - clock.millis())) launch { chan.send(Unit) @@ -275,7 +277,7 @@ suspend fun processTrace( override fun onStateChanged(server: Server, newState: ServerState) { monitor.reportVmStateChange(clock.millis(), server, newState) - if (newState == ServerState.TERMINATED || newState == ServerState.ERROR) { + if (newState == ServerState.TERMINATED) { cont.resume(Unit) } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt index f520a28c..7fb2f83c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt @@ -74,7 +74,7 @@ public class ExperimentMetricExporter( m.overcommissionedBurst = v.toLong() } - mapDoubleSummary(metrics["cpu.work.interfered"], hostMetrics) { m, v -> + mapDoubleSummary(metrics["cpu.work.interference"], hostMetrics) { m, v -> m.interferedBurst = v.toLong() } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt index 5ad75565..0f49ecd2 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt @@ -53,8 +53,7 @@ public class ParquetTraceReader( this.zip(listOf(workload)) } } - .map { sampleWorkload(it.first, workload, it.second, seed) } - .flatten() + .flatMap { sampleWorkload(it.first, workload, it.second, seed).sortedBy(TraceEntry<SimWorkload>::start) } .iterator() override fun hasNext(): Boolean = iterator.hasNext() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt index 7cd1f159..d7daa35b 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt @@ -26,12 +26,7 @@ import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.arguments.argument import com.github.ajalt.clikt.parameters.groups.OptionGroup import com.github.ajalt.clikt.parameters.groups.groupChoice -import com.github.ajalt.clikt.parameters.options.convert -import com.github.ajalt.clikt.parameters.options.default -import com.github.ajalt.clikt.parameters.options.defaultLazy -import com.github.ajalt.clikt.parameters.options.option -import com.github.ajalt.clikt.parameters.options.required -import com.github.ajalt.clikt.parameters.options.split +import com.github.ajalt.clikt.parameters.options.* import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.long import me.tongfei.progressbar.ProgressBar @@ -41,11 +36,13 @@ import org.apache.avro.generic.GenericData import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.format.trace.bitbrains.BitbrainsTraceReader import org.opendc.format.util.LocalOutputFile +import org.opendc.simulator.compute.workload.SimTraceWorkload import java.io.BufferedReader import java.io.File import java.io.FileReader -import java.util.Random +import java.util.* import kotlin.math.max import kotlin.math.min @@ -340,106 +337,36 @@ class BitbrainsConversion : TraceConversion("Bitbrains") { metaSchema: Schema, metaWriter: ParquetWriter<GenericData.Record> ): MutableList<Fragment> { - val timestampCol = 0 - val cpuUsageCol = 3 - val coreCol = 1 - val provisionedMemoryCol = 5 - val traceInterval = 5 * 60 * 1000L - - val allFragments = mutableListOf<Fragment>() - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach { vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var cores: Int - var minTime = Long.MAX_VALUE - - val flopsFragments = sequence { - var last: Fragment? = null - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .drop(1) - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split(";\t") - - vmId = vmFile.name - - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - - cores = values[coreCol].trim().toInt() - val provisionedMemory = values[provisionedMemoryCol].trim().toDouble() // KB - requiredMemory = max(requiredMemory, (provisionedMemory / 1000).toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - - val flops: Long = (cpuUsage * 5 * 60).toLong() - - last = if (last != null && last!!.flops == 0L && flops == 0L) { - val oldFragment = last!! - Fragment( - vmId, - oldFragment.tick, - oldFragment.flops + flops, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - cores - ) - if (last != null) { - yield(last!!) - } - fragment - } - } - } - } - - if (last != null) { - yield(last!!) - } - } - + val fragments = mutableListOf<Fragment>() + BitbrainsTraceReader(traceDirectory).use { reader -> + reader.forEach { entry -> + val trace = (entry.workload as SimTraceWorkload).trace var maxTime = Long.MIN_VALUE - flopsFragments.forEach { fragment -> - allFragments.add(fragment) - maxTime = max(maxTime, fragment.tick) + trace.forEach { fragment -> + val flops: Long = (fragment.usage * fragment.duration / 1000).toLong() + fragments.add( + Fragment( + entry.name, + fragment.timestamp, + flops, + fragment.duration, + fragment.usage, + fragment.cores + ) + ) + maxTime = max(maxTime, fragment.timestamp + fragment.duration) } val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", vmId) - metaRecord.put("submissionTime", minTime) + metaRecord.put("id", entry.name) + metaRecord.put("submissionTime", entry.start) metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", maxCores) - metaRecord.put("requiredMemory", requiredMemory) + metaRecord.put("maxCores", entry.meta["cores"]) + metaRecord.put("requiredMemory", entry.meta["required-memory"]) metaWriter.write(metaRecord) } - - return allFragments + } + return fragments } } |
