From a23ad09d5a1c4033781bd5403ad766cae83a2beb Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 18 Aug 2021 12:11:22 +0200 Subject: refactor(format): Clean up Bitbrains trace reader to enable re-use This change updates the code for the Bitbrains trace reader and upgrades the TraceConverter to re-use existing code of the Bitbrains trace reader. --- .../experiments/capelin/trace/TraceConverter.kt | 127 +++++---------------- 1 file changed, 27 insertions(+), 100 deletions(-) (limited to 'opendc-experiments/opendc-experiments-capelin/src/main') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt index 7cd1f159..d7daa35b 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt @@ -26,12 +26,7 @@ import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.arguments.argument import com.github.ajalt.clikt.parameters.groups.OptionGroup import com.github.ajalt.clikt.parameters.groups.groupChoice -import com.github.ajalt.clikt.parameters.options.convert -import com.github.ajalt.clikt.parameters.options.default -import com.github.ajalt.clikt.parameters.options.defaultLazy -import com.github.ajalt.clikt.parameters.options.option -import com.github.ajalt.clikt.parameters.options.required -import com.github.ajalt.clikt.parameters.options.split +import com.github.ajalt.clikt.parameters.options.* import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.long import me.tongfei.progressbar.ProgressBar @@ -41,11 +36,13 @@ import org.apache.avro.generic.GenericData import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.format.trace.bitbrains.BitbrainsTraceReader import org.opendc.format.util.LocalOutputFile +import org.opendc.simulator.compute.workload.SimTraceWorkload import java.io.BufferedReader import java.io.File import java.io.FileReader -import java.util.Random +import java.util.* import kotlin.math.max import kotlin.math.min @@ -340,106 +337,36 @@ class BitbrainsConversion : TraceConversion("Bitbrains") { metaSchema: Schema, metaWriter: ParquetWriter ): MutableList { - val timestampCol = 0 - val cpuUsageCol = 3 - val coreCol = 1 - val provisionedMemoryCol = 5 - val traceInterval = 5 * 60 * 1000L - - val allFragments = mutableListOf() - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach { vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var cores: Int - var minTime = Long.MAX_VALUE - - val flopsFragments = sequence { - var last: Fragment? = null - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .drop(1) - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split(";\t") - - vmId = vmFile.name - - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - - cores = values[coreCol].trim().toInt() - val provisionedMemory = values[provisionedMemoryCol].trim().toDouble() // KB - requiredMemory = max(requiredMemory, (provisionedMemory / 1000).toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - - val flops: Long = (cpuUsage * 5 * 60).toLong() - - last = if (last != null && last!!.flops == 0L && flops == 0L) { - val oldFragment = last!! - Fragment( - vmId, - oldFragment.tick, - oldFragment.flops + flops, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - cores - ) - if (last != null) { - yield(last!!) - } - fragment - } - } - } - } - - if (last != null) { - yield(last!!) - } - } - + val fragments = mutableListOf() + BitbrainsTraceReader(traceDirectory).use { reader -> + reader.forEach { entry -> + val trace = (entry.workload as SimTraceWorkload).trace var maxTime = Long.MIN_VALUE - flopsFragments.forEach { fragment -> - allFragments.add(fragment) - maxTime = max(maxTime, fragment.tick) + trace.forEach { fragment -> + val flops: Long = (fragment.usage * fragment.duration / 1000).toLong() + fragments.add( + Fragment( + entry.name, + fragment.timestamp, + flops, + fragment.duration, + fragment.usage, + fragment.cores + ) + ) + maxTime = max(maxTime, fragment.timestamp + fragment.duration) } val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", vmId) - metaRecord.put("submissionTime", minTime) + metaRecord.put("id", entry.name) + metaRecord.put("submissionTime", entry.start) metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", maxCores) - metaRecord.put("requiredMemory", requiredMemory) + metaRecord.put("maxCores", entry.meta["cores"]) + metaRecord.put("requiredMemory", entry.meta["required-memory"]) metaWriter.write(metaRecord) } - - return allFragments + } + return fragments } } -- cgit v1.2.3 From 1af7b83695d997381163f2b72c67ed26d5b4891f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 18 Aug 2021 12:16:02 +0200 Subject: fix(capelin): Keep trace order after sampling This change fixes an issue with the workload sampler where the resulting workload entries would not be ordered properly according to their submission time. --- .../main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt | 2 ++ .../kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt | 3 +-- 2 files changed, 3 insertions(+), 2 deletions(-) (limited to 'opendc-experiments/opendc-experiments-capelin/src/main') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index fa9fa2fc..b5090119 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -255,6 +255,8 @@ suspend fun processTrace( offset = entry.start - clock.millis() } + // Make sure the trace entries are ordered by submission time + assert(entry.start - offset >= 0) { "Invalid trace order" } delay(max(0, (entry.start - offset) - clock.millis())) launch { chan.send(Unit) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt index 5ad75565..0f49ecd2 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/ParquetTraceReader.kt @@ -53,8 +53,7 @@ public class ParquetTraceReader( this.zip(listOf(workload)) } } - .map { sampleWorkload(it.first, workload, it.second, seed) } - .flatten() + .flatMap { sampleWorkload(it.first, workload, it.second, seed).sortedBy(TraceEntry::start) } .iterator() override fun hasNext(): Boolean = iterator.hasNext() -- cgit v1.2.3 From 5266ecd476a18f601cb4eb6166f4c8338c440210 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 18 Aug 2021 21:54:55 +0200 Subject: test(capelin): Add tests for interference and failures This change adds tests to the Capelin integration suite for performance interference as well as failures. These test more accurately the experiment setup. --- .../src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt | 2 +- .../org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'opendc-experiments/opendc-experiments-capelin/src/main') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index b5090119..d7df4454 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -277,7 +277,7 @@ suspend fun processTrace( override fun onStateChanged(server: Server, newState: ServerState) { monitor.reportVmStateChange(clock.millis(), server, newState) - if (newState == ServerState.TERMINATED || newState == ServerState.ERROR) { + if (newState == ServerState.TERMINATED) { cont.resume(Unit) } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt index f520a28c..7fb2f83c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt @@ -74,7 +74,7 @@ public class ExperimentMetricExporter( m.overcommissionedBurst = v.toLong() } - mapDoubleSummary(metrics["cpu.work.interfered"], hostMetrics) { m, v -> + mapDoubleSummary(metrics["cpu.work.interference"], hostMetrics) { m, v -> m.interferedBurst = v.toLong() } -- cgit v1.2.3