From db3b8a63b66f5a34d17483bcb29a85bdd8b75598 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 23 Jul 2020 20:55:25 +0200 Subject: Make HPC sampling strategy consistent --- .../experiments/sc20/trace/WorkloadSampler.kt | 51 ++++++++++++++++------ 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt index b24d6de1..d03d556b 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt @@ -109,6 +109,24 @@ fun sampleHpcWorkload( name.matches(pattern) } + val hpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf>() + hpc.mapTo(res) { sample(it, index) } + res.shuffle(random) + res + } + .flatten() + + val nonHpcSequence = generateSequence(0) { it + 1 } + .map { index -> + val res = mutableListOf>() + nonHpc.mapTo(res) { sample(it, index) } + res.shuffle(random) + res + } + .flatten() + logger.debug { "Found ${hpc.size} HPC workloads and ${nonHpc.size} non-HPC workloads" } val totalLoad = if (workload is CompositeWorkload) { @@ -117,15 +135,14 @@ fun sampleHpcWorkload( trace.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } } + logger.debug { "Total trace load: $totalLoad" } + val res = mutableListOf>() if (sampleOnLoad) { var currentLoad = 0.0 var i = 0 - while (true) { - // Sample random HPC entry with replacement - val entry = sample(hpc.random(random), i++) - + for (entry in hpcSequence) { val entryLoad = entry.workload.image.tags.getValue("total-load") as Double if ((currentLoad + entryLoad) / totalLoad > fraction || res.size > trace.size) { break @@ -135,8 +152,7 @@ fun sampleHpcWorkload( res += entry } - (nonHpc as MutableList>).shuffle(random) - for (entry in nonHpc) { + for (entry in nonHpcSequence) { val entryLoad = entry.workload.image.tags.getValue("total-load") as Double if ((currentLoad + entryLoad) / totalLoad > 1 || res.size > trace.size) { break @@ -146,14 +162,23 @@ fun sampleHpcWorkload( res += entry } } else { - repeat((fraction * trace.size).toInt()) { i -> - // Sample random HPC entry with replacement - val entry = sample(hpc.random(random), i) - res.add(entry) - } + var hpcLoad = 0.0 + hpcSequence + .take((fraction * trace.size).toInt()) + .forEach { entry -> + hpcLoad += entry.workload.image.tags.getValue("total-load") as Double + res.add(entry) + } + + var nonHpcLoad = 0.0 + nonHpcSequence + .take(((1 - fraction) * trace.size).toInt()) + .forEach { entry -> + nonHpcLoad += entry.workload.image.tags.getValue("total-load") as Double + res.add(entry) + } - (nonHpc as MutableList>).shuffle(random) - res.addAll(nonHpc.subList(0, ((1 - fraction) * trace.size).toInt())) + logger.debug { "HPC load $hpcLoad and non-HPC load $nonHpcLoad" } } logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } -- cgit v1.2.3 From 7b76fd389bf6e5f19487dcf3033efc3508d1f9b5 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 23 Jul 2020 21:00:18 +0200 Subject: Add missing HPC scenarios --- .../kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt index b8dfb1be..09a6ce40 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt @@ -203,6 +203,7 @@ public class MoreHpcPortfolio(parent: Experiment, id: Int) : Portfolio(parent, i ) override val workloads = listOf( + Workload("solvinity", 0.0, samplingStrategy = SamplingStrategy.HPC), Workload("solvinity", 0.25, samplingStrategy = SamplingStrategy.HPC), Workload("solvinity", 0.5, samplingStrategy = SamplingStrategy.HPC), Workload("solvinity", 1.0, samplingStrategy = SamplingStrategy.HPC), -- cgit v1.2.3 From 77c195945c89187addb14c2b9273813687abae95 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 23 Jul 2020 21:23:30 +0200 Subject: Report additional workload sampling information --- simulator/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml b/simulator/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml index f47a6da8..6906bfc3 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml +++ b/simulator/opendc/opendc-experiments-sc20/src/main/resources/log4j2.xml @@ -39,6 +39,9 @@ + + + -- cgit v1.2.3 From ab98dfe71ddda7b5cd2b57a24c118097ba5db0ac Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 24 Jul 2020 11:56:48 +0200 Subject: Allow load sampling result to exceed original trace size --- .../opendc/experiments/sc20/trace/WorkloadSampler.kt | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt index d03d556b..99830717 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt @@ -136,6 +136,10 @@ fun sampleHpcWorkload( } logger.debug { "Total trace load: $totalLoad" } + var hpcCount = 0 + var hpcLoad = 0.0 + var nonHpcCount = 0 + var nonHpcLoad = 0.0 val res = mutableListOf>() @@ -144,43 +148,47 @@ fun sampleHpcWorkload( var i = 0 for (entry in hpcSequence) { val entryLoad = entry.workload.image.tags.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > fraction || res.size > trace.size) { + if ((currentLoad + entryLoad) / totalLoad > fraction) { break } + hpcLoad += entryLoad + hpcCount += 1 currentLoad += entryLoad res += entry } for (entry in nonHpcSequence) { val entryLoad = entry.workload.image.tags.getValue("total-load") as Double - if ((currentLoad + entryLoad) / totalLoad > 1 || res.size > trace.size) { + if ((currentLoad + entryLoad) / totalLoad > 1) { break } + nonHpcLoad += entryLoad + nonHpcCount += 1 currentLoad += entryLoad res += entry } } else { - var hpcLoad = 0.0 hpcSequence .take((fraction * trace.size).toInt()) .forEach { entry -> hpcLoad += entry.workload.image.tags.getValue("total-load") as Double + hpcCount += 1 res.add(entry) } - var nonHpcLoad = 0.0 nonHpcSequence .take(((1 - fraction) * trace.size).toInt()) .forEach { entry -> nonHpcLoad += entry.workload.image.tags.getValue("total-load") as Double + nonHpcCount += 1 res.add(entry) } - - logger.debug { "HPC load $hpcLoad and non-HPC load $nonHpcLoad" } } + logger.debug { "HPC $hpcCount (load $hpcLoad) and non-HPC $nonHpcCount (load $nonHpcLoad)" } + logger.debug { "Total sampled load: ${hpcLoad + nonHpcLoad}" } logger.info { "Sampled ${trace.size} VMs (fraction $fraction) into subset of ${res.size} VMs" } return res -- cgit v1.2.3 From bbf77c59f5b2532ebca8daf8e67012205d764b97 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 24 Jul 2020 13:31:22 +0200 Subject: Fix performance interference in HPC experiments --- .../compute/core/workload/PerformanceInterferenceModel.kt | 11 +++++------ .../atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt | 4 ++-- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt index f458877b..e1f03d21 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt @@ -25,8 +25,7 @@ package com.atlarge.opendc.compute.core.workload import com.atlarge.opendc.compute.core.Server -import java.util.SortedSet -import java.util.TreeSet +import java.util.* import kotlin.random.Random /** @@ -44,21 +43,21 @@ class PerformanceInterferenceModel( val random: Random = Random(0) ) { private var intersectingItems: List = emptyList() - private val colocatedWorkloads = TreeSet() + private val colocatedWorkloads = TreeMap() fun vmStarted(server: Server) { - colocatedWorkloads.add(server.image.name) + colocatedWorkloads.merge(server.image.name, 1, Int::plus) intersectingItems = items.filter { item -> doesMatch(item) } } fun vmStopped(server: Server) { - colocatedWorkloads.remove(server.image.name) + colocatedWorkloads.computeIfPresent(server.image.name) { _, v -> (v - 1).takeUnless { it == 0 } } intersectingItems = items.filter { item -> doesMatch(item) } } private fun doesMatch(item: PerformanceInterferenceModelItem): Boolean { var count = 0 - for (name in item.workloadNames.subSet(colocatedWorkloads.first(), colocatedWorkloads.last() + "\u0000")) { + for (name in item.workloadNames.subSet(colocatedWorkloads.firstKey(), colocatedWorkloads.lastKey() + "\u0000")) { if (name in colocatedWorkloads) count++ if (count > 1) diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt index 99830717..a46bb3e6 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt @@ -201,13 +201,13 @@ private fun sample(entry: TraceEntry, i: Int): TraceEntry Date: Fri, 24 Jul 2020 13:36:55 +0200 Subject: Consider duplicate VMs for performance interference --- .../opendc/compute/core/workload/PerformanceInterferenceModel.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt index e1f03d21..3f885f89 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/workload/PerformanceInterferenceModel.kt @@ -58,8 +58,7 @@ class PerformanceInterferenceModel( private fun doesMatch(item: PerformanceInterferenceModelItem): Boolean { var count = 0 for (name in item.workloadNames.subSet(colocatedWorkloads.firstKey(), colocatedWorkloads.lastKey() + "\u0000")) { - if (name in colocatedWorkloads) - count++ + count += colocatedWorkloads.getOrDefault(name, 0) if (count > 1) return true } -- cgit v1.2.3 From a00e13acbee5942165e344fa68d87356d6f92844 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 23 Jul 2020 11:09:58 +0200 Subject: Add Bitbrains trace converter --- .../experiments/sc20/trace/Sc20TraceConverter.kt | 120 ++++++++++++++++++++- 1 file changed, 117 insertions(+), 3 deletions(-) diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index d6726910..6c599517 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -104,10 +104,15 @@ fun main(args: Array) { val vmPlacements = Sc20VmPlacementReader(vmPlacementFile.inputStream().buffered()).construct() readSolvinityTrace(traceDirectory, metaSchema, metaWriter, clusters, vmPlacements) - } else { + } else if (traceType == "bitbrains") { + readBitbrainsTrace(traceDirectory, metaSchema, metaWriter) + } else if (traceType == "azure") { val seed = args[3].toLong() readAzureTrace(traceDirectory, metaSchema, metaWriter, seed) + } else { + throw IllegalArgumentException("Unsupported trace type") } + allFragments.sortWith(compareBy { it.tick }.thenBy { it.id }) for (fragment in allFragments) { @@ -176,7 +181,7 @@ fun readSolvinityTrace( continue } - val values = line.split(" ") + val values = line.split("\t") val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L if (timestamp < minTimestamp) { @@ -221,7 +226,7 @@ fun readSolvinityTrace( continue } - val values = line.split(" ") + val values = line.split("\t") vmId = vmFile.name @@ -300,6 +305,115 @@ fun readSolvinityTrace( return allFragments } +/** + * Reads the confidential Solvinity trace. + */ +fun readBitbrainsTrace( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter +): MutableList { + val timestampCol = 0 + val cpuUsageCol = 3 + val coreCol = 1 + val provisionedMemoryCol = 5 + val traceInterval = 5 * 60 * 1000L + + val allFragments = mutableListOf() + + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + .forEachIndexed { idx, vmFile -> + println(vmFile) + + var vmId = "" + var maxCores = -1 + var requiredMemory = -1L + var cores = -1 + var minTime = Long.MAX_VALUE + + val flopsFragments = sequence { + var last: Fragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .drop(1) + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(";\t") + + vmId = vmFile.name + + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + + cores = values[coreCol].trim().toInt() + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toDouble().toLong()) + maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + + val flops: Long = (cpuUsage * 5 * 60).toLong() + + last = if (last != null && last!!.flops == 0L && flops == 0L) { + val oldFragment = last!! + Fragment( + vmId, + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval, + cpuUsage, + cores + ) + } else { + val fragment = + Fragment( + vmId, + timestamp, + flops, + traceInterval, + cpuUsage, + cores + ) + if (last != null) { + yield(last!!) + } + fragment + } + } + } + } + + if (last != null) { + yield(last!!) + } + } + + var maxTime = Long.MIN_VALUE + flopsFragments.forEach { fragment -> + allFragments.add(fragment) + maxTime = max(maxTime, fragment.tick) + } + + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", vmId) + metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) + metaRecord.put("maxCores", maxCores) + metaRecord.put("requiredMemory", requiredMemory) + metaWriter.write(metaRecord) + } + + return allFragments +} + /** * Reads the Azure cloud trace. * -- cgit v1.2.3 From 537a9405998af49cdcd437dd54e8fcfaa9fe9aaa Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 23 Jul 2020 11:39:18 +0200 Subject: Add proper command-line interface for trace converter --- .../experiments/sc20/trace/Sc20TraceConverter.kt | 909 +++++++++++---------- 1 file changed, 478 insertions(+), 431 deletions(-) diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index 6c599517..56ddbb6d 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -25,6 +25,18 @@ package com.atlarge.opendc.experiments.sc20.trace import com.atlarge.opendc.format.trace.sc20.Sc20VmPlacementReader +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.arguments.argument +import com.github.ajalt.clikt.parameters.groups.OptionGroup +import com.github.ajalt.clikt.parameters.groups.groupChoice +import com.github.ajalt.clikt.parameters.options.convert +import com.github.ajalt.clikt.parameters.options.default +import com.github.ajalt.clikt.parameters.options.defaultLazy +import com.github.ajalt.clikt.parameters.options.option +import com.github.ajalt.clikt.parameters.options.required +import com.github.ajalt.clikt.parameters.options.split +import com.github.ajalt.clikt.parameters.types.file +import com.github.ajalt.clikt.parameters.types.long import java.io.BufferedReader import java.io.File import java.io.FileReader @@ -41,194 +53,154 @@ import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName /** - * A script to convert a trace in text format into a Parquet trace. + * Represents the command for converting traces */ -fun main(args: Array) { - if (args.size < 4) { - println("error: expected ") - return - } - - val metaSchema = SchemaBuilder - .record("meta") - .namespace("com.atlarge.opendc.format.sc20") - .fields() - .name("id").type().stringType().noDefault() - .name("submissionTime").type().longType().noDefault() - .name("endTime").type().longType().noDefault() - .name("maxCores").type().intType().noDefault() - .name("requiredMemory").type().longType().noDefault() - .endRecord() - val schema = SchemaBuilder - .record("trace") - .namespace("com.atlarge.opendc.format.sc20") - .fields() - .name("id").type().stringType().noDefault() - .name("time").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("cores").type().intType().noDefault() - .name("cpuUsage").type().doubleType().noDefault() - .name("flops").type().longType().noDefault() - .endRecord() - - val dest = File(args[0]) - val traceDirectory = File(args[1]) - val metaParquet = File(dest.absolutePath, "meta.parquet") - val traceParquet = File(dest.absolutePath, "trace.parquet") - - if (metaParquet.exists()) { - metaParquet.delete() - } - if (traceParquet.exists()) { - traceParquet.delete() - } - - val metaWriter = AvroParquetWriter.builder(Path(metaParquet.toURI())) - .withSchema(metaSchema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - - val writer = AvroParquetWriter.builder(Path(traceParquet.toURI())) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() - - val traceType = args[2] - val allFragments = if (traceType == "solvinity") { - val clusters = args[3].split(",") - val vmPlacementFile = File(args[4]) - val vmPlacements = Sc20VmPlacementReader(vmPlacementFile.inputStream().buffered()).construct() - - readSolvinityTrace(traceDirectory, metaSchema, metaWriter, clusters, vmPlacements) - } else if (traceType == "bitbrains") { - readBitbrainsTrace(traceDirectory, metaSchema, metaWriter) - } else if (traceType == "azure") { - val seed = args[3].toLong() - readAzureTrace(traceDirectory, metaSchema, metaWriter, seed) - } else { - throw IllegalArgumentException("Unsupported trace type") - } - - allFragments.sortWith(compareBy { it.tick }.thenBy { it.id }) - - for (fragment in allFragments) { - val record = GenericData.Record(schema) - record.put("id", fragment.id) - record.put("time", fragment.tick) - record.put("duration", fragment.duration) - record.put("cores", fragment.cores) - record.put("cpuUsage", fragment.usage) - record.put("flops", fragment.flops) +class TraceConverterCli : CliktCommand(name = "trace-converter") { + /** + * The directory where the trace should be stored. + */ + private val outputPath by option("-O", "--output", help = "path to store the trace") + .file(canBeFile = false, mustExist = false) + .defaultLazy { File("output") } + + /** + * The directory where the input trace is located. + */ + private val inputPath by argument("input", help = "path to the input trace") + .file(canBeFile = false) + + /** + * The input type of the trace. + */ + val type by option("-t", "--type", help = "input type of trace").groupChoice( + "solvinity" to SolvinityConversion(), + "bitbrains" to BitbrainsConversion(), + "azure" to AzureConversion() + ) + + override fun run() { + val metaSchema = SchemaBuilder + .record("meta") + .namespace("com.atlarge.opendc.format.sc20") + .fields() + .name("id").type().stringType().noDefault() + .name("submissionTime").type().longType().noDefault() + .name("endTime").type().longType().noDefault() + .name("maxCores").type().intType().noDefault() + .name("requiredMemory").type().longType().noDefault() + .endRecord() + val schema = SchemaBuilder + .record("trace") + .namespace("com.atlarge.opendc.format.sc20") + .fields() + .name("id").type().stringType().noDefault() + .name("time").type().longType().noDefault() + .name("duration").type().longType().noDefault() + .name("cores").type().intType().noDefault() + .name("cpuUsage").type().doubleType().noDefault() + .name("flops").type().longType().noDefault() + .endRecord() + + val metaParquet = File(outputPath, "meta.parquet") + val traceParquet = File(outputPath, "trace.parquet") + + if (metaParquet.exists()) { + metaParquet.delete() + } + if (traceParquet.exists()) { + traceParquet.delete() + } - writer.write(record) + val metaWriter = AvroParquetWriter.builder(Path(metaParquet.toURI())) + .withSchema(metaSchema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + val writer = AvroParquetWriter.builder(Path(traceParquet.toURI())) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + try { + val type = type ?: throw IllegalArgumentException("Invalid trace conversion") + val allFragments = type.read(inputPath, metaSchema, metaWriter) + allFragments.sortWith(compareBy { it.tick }.thenBy { it.id }) + + for (fragment in allFragments) { + val record = GenericData.Record(schema) + record.put("id", fragment.id) + record.put("time", fragment.tick) + record.put("duration", fragment.duration) + record.put("cores", fragment.cores) + record.put("cpuUsage", fragment.usage) + record.put("flops", fragment.flops) + + writer.write(record) + } + } finally { + writer.close() + metaWriter.close() + } } - - writer.close() - metaWriter.close() } -data class Fragment( - val id: String, - val tick: Long, - val flops: Long, - val duration: Long, - val usage: Double, - val cores: Int -) - /** - * Reads the confidential Solvinity trace. + * The supported trace conversions. */ -fun readSolvinityTrace( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter, - clusters: List, - vmPlacements: Map -): MutableList { - val timestampCol = 0 - val cpuUsageCol = 1 - val coreCol = 12 - val provisionedMemoryCol = 20 - val traceInterval = 5 * 60 * 1000L - - // Identify start time of the entire trace - var minTimestamp = Long.MAX_VALUE - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEach { vmFile -> - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEachIndexed { idx, lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val vmId = vmFile.name - - // Check if VM in topology - val clusterName = vmPlacements[vmId] - if (clusterName == null || !clusters.contains(clusterName)) { - continue - } - - val values = line.split("\t") - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - - if (timestamp < minTimestamp) { - minTimestamp = timestamp - } - return@forEach - } - } - } - } - - println("Start of trace at $minTimestamp") - - val allFragments = mutableListOf() - - val begin = 15 * 24 * 60 * 60 * 1000L - val end = 45 * 24 * 60 * 60 * 1000L - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEachIndexed { idx, vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var cores = -1 - var minTime = Long.MAX_VALUE - - val flopsFragments = sequence { - var last: Fragment? = null +sealed class TraceConversion(name: String) : OptionGroup(name) { + /** + * Read the fragments of the trace. + */ + abstract fun read( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter + ): MutableList +} +class SolvinityConversion : TraceConversion("Solvinity") { + val clusters by option() + .split(",") + + val vmPlacements by option("--vm-placements", help = "file containing the VM placements") + .file(canBeDir = false) + .convert { it.inputStream().buffered().use { Sc20VmPlacementReader(it).construct() } } + .required() + + override fun read( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter + ): MutableList { + val clusters = clusters?.toSet() ?: emptySet() + val timestampCol = 0 + val cpuUsageCol = 1 + val coreCol = 12 + val provisionedMemoryCol = 20 + val traceInterval = 5 * 60 * 1000L + + // Identify start time of the entire trace + var minTimestamp = Long.MAX_VALUE + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + .forEach { vmFile -> BufferedReader(FileReader(vmFile)).use { reader -> reader.lineSequence() .chunked(128) - .forEach { lines -> + .forEachIndexed { idx, lines -> for (line in lines) { // Ignore comments in the trace if (line.startsWith("#") || line.isBlank()) { continue } - val values = line.split("\t") - - vmId = vmFile.name + val vmId = vmFile.name // Check if VM in topology val clusterName = vmPlacements[vmId] @@ -236,339 +208,414 @@ fun readSolvinityTrace( continue } - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp - if (begin > timestamp || timestamp > end) { - continue + val values = line.split("\t") + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + + if (timestamp < minTimestamp) { + minTimestamp = timestamp } + return@forEach + } + } + } + } - cores = values[coreCol].trim().toInt() - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) - maxCores = max(maxCores, cores) + println("Start of trace at $minTimestamp") + + val allFragments = mutableListOf() + + val begin = 15 * 24 * 60 * 60 * 1000L + val end = 45 * 24 * 60 * 60 * 1000L + + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + .forEachIndexed { idx, vmFile -> + println(vmFile) + + var vmId = "" + var maxCores = -1 + var requiredMemory = -1L + var cores = -1 + var minTime = Long.MAX_VALUE + + val flopsFragments = sequence { + var last: Fragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } - val flops: Long = (cpuUsage * 5 * 60).toLong() + val values = line.split("\t") - last = if (last != null && last!!.flops == 0L && flops == 0L) { - val oldFragment = last!! - Fragment( - vmId, - oldFragment.tick, - oldFragment.flops + flops, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = + vmId = vmFile.name + + // Check if VM in topology + val clusterName = vmPlacements[vmId] + if (clusterName == null || !clusters.contains(clusterName)) { + continue + } + + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp + if (begin > timestamp || timestamp > end) { + continue + } + + cores = values[coreCol].trim().toInt() + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) + maxCores = max(maxCores, cores) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + + last = if (last != null && last!!.flops == 0L && flops == 0L) { + val oldFragment = last!! Fragment( vmId, - timestamp, - flops, - traceInterval, + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval, cpuUsage, cores ) - if (last != null) { - yield(last!!) + } else { + val fragment = + Fragment( + vmId, + timestamp, + flops, + traceInterval, + cpuUsage, + cores + ) + if (last != null) { + yield(last!!) + } + fragment } - fragment } } - } - } + } - if (last != null) { - yield(last!!) + if (last != null) { + yield(last!!) + } } - } - var maxTime = Long.MIN_VALUE - flopsFragments.filter { it.tick in begin until end }.forEach { fragment -> - allFragments.add(fragment) - maxTime = max(maxTime, fragment.tick) - } + var maxTime = Long.MIN_VALUE + flopsFragments.filter { it.tick in begin until end }.forEach { fragment -> + allFragments.add(fragment) + maxTime = max(maxTime, fragment.tick) + } - if (minTime in begin until end) { - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", vmId) - metaRecord.put("submissionTime", minTime) - metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", maxCores) - metaRecord.put("requiredMemory", requiredMemory) - metaWriter.write(metaRecord) + if (minTime in begin until end) { + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", vmId) + metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) + metaRecord.put("maxCores", maxCores) + metaRecord.put("requiredMemory", requiredMemory) + metaWriter.write(metaRecord) + } } - } - return allFragments + return allFragments + } } /** - * Reads the confidential Solvinity trace. + * Conversion of the Bitbrains public trace. */ -fun readBitbrainsTrace( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter -): MutableList { - val timestampCol = 0 - val cpuUsageCol = 3 - val coreCol = 1 - val provisionedMemoryCol = 5 - val traceInterval = 5 * 60 * 1000L - - val allFragments = mutableListOf() - - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() - .forEachIndexed { idx, vmFile -> - println(vmFile) - - var vmId = "" - var maxCores = -1 - var requiredMemory = -1L - var cores = -1 - var minTime = Long.MAX_VALUE - - val flopsFragments = sequence { - var last: Fragment? = null - - BufferedReader(FileReader(vmFile)).use { reader -> - reader.lineSequence() - .drop(1) - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } +class BitbrainsConversion : TraceConversion("Bitbrains") { + override fun read( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter + ): MutableList { + val timestampCol = 0 + val cpuUsageCol = 3 + val coreCol = 1 + val provisionedMemoryCol = 5 + val traceInterval = 5 * 60 * 1000L + + val allFragments = mutableListOf() + + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + .forEachIndexed { idx, vmFile -> + println(vmFile) + + var vmId = "" + var maxCores = -1 + var requiredMemory = -1L + var cores = -1 + var minTime = Long.MAX_VALUE + + val flopsFragments = sequence { + var last: Fragment? = null + + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .drop(1) + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } - val values = line.split(";\t") + val values = line.split(";\t") - vmId = vmFile.name + vmId = vmFile.name - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - cores = values[coreCol].trim().toInt() - requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toDouble().toLong()) - maxCores = max(maxCores, cores) - minTime = min(minTime, timestamp) - val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz + cores = values[coreCol].trim().toInt() + requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toDouble().toLong()) + maxCores = max(maxCores, cores) + minTime = min(minTime, timestamp) + val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz - val flops: Long = (cpuUsage * 5 * 60).toLong() + val flops: Long = (cpuUsage * 5 * 60).toLong() - last = if (last != null && last!!.flops == 0L && flops == 0L) { - val oldFragment = last!! - Fragment( - vmId, - oldFragment.tick, - oldFragment.flops + flops, - oldFragment.duration + traceInterval, - cpuUsage, - cores - ) - } else { - val fragment = + last = if (last != null && last!!.flops == 0L && flops == 0L) { + val oldFragment = last!! Fragment( vmId, - timestamp, - flops, - traceInterval, + oldFragment.tick, + oldFragment.flops + flops, + oldFragment.duration + traceInterval, cpuUsage, cores ) - if (last != null) { - yield(last!!) + } else { + val fragment = + Fragment( + vmId, + timestamp, + flops, + traceInterval, + cpuUsage, + cores + ) + if (last != null) { + yield(last!!) + } + fragment } - fragment } } - } + } + + if (last != null) { + yield(last!!) + } } - if (last != null) { - yield(last!!) + var maxTime = Long.MIN_VALUE + flopsFragments.forEach { fragment -> + allFragments.add(fragment) + maxTime = max(maxTime, fragment.tick) } - } - var maxTime = Long.MIN_VALUE - flopsFragments.forEach { fragment -> - allFragments.add(fragment) - maxTime = max(maxTime, fragment.tick) + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", vmId) + metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) + metaRecord.put("maxCores", maxCores) + metaRecord.put("requiredMemory", requiredMemory) + metaWriter.write(metaRecord) } - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", vmId) - metaRecord.put("submissionTime", minTime) - metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", maxCores) - metaRecord.put("requiredMemory", requiredMemory) - metaWriter.write(metaRecord) - } - - return allFragments + return allFragments + } } /** - * Reads the Azure cloud trace. - * - * See https://github.com/Azure/AzurePublicDataset/ for a definition of the trace. + * Conversion of the Azure public VM trace. */ -fun readAzureTrace( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter, - seed: Long -): MutableList { - val random = Random(seed) - val fraction = 0.01 - - // Read VM table - val vmIdTableCol = 0 - val coreTableCol = 9 - val provisionedMemoryTableCol = 10 - - var vmId: String - var cores: Int - var requiredMemory: Long - - val vmIds = mutableSetOf() - val vmIdToMetadata = mutableMapOf() - - BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader -> - reader.lineSequence() - .chunked(1024) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - // Sample only a fraction of the VMs - if (random.nextDouble() > fraction) { - continue - } - - val values = line.split(",") - - // Exclude VMs with a large number of cores (not specified exactly) - if (values[coreTableCol].contains(">")) { - continue - } - - vmId = values[vmIdTableCol].trim() - cores = values[coreTableCol].trim().toInt() - requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB - - vmIds.add(vmId) - vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L) - } - } - } - - // Read VM metric reading files - val timestampCol = 0 - val vmIdCol = 1 - val cpuUsageCol = 4 - val traceInterval = 5 * 60 * 1000L - - val vmIdToFragments = mutableMapOf>() - val vmIdToLastFragment = mutableMapOf() - val allFragments = mutableListOf() - - for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) { - val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv") - var timestamp: Long - var cpuUsage: Double - - BufferedReader(FileReader(readingsFile)).use { reader -> +class AzureConversion : TraceConversion("Azure") { + val seed by option(help = "seed for trace sampling") + .long() + .default(0) + + override fun read( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter + ): MutableList { + val random = Random(seed) + val fraction = 0.01 + + // Read VM table + val vmIdTableCol = 0 + val coreTableCol = 9 + val provisionedMemoryTableCol = 10 + + var vmId: String + var cores: Int + var requiredMemory: Long + + val vmIds = mutableSetOf() + val vmIdToMetadata = mutableMapOf() + + BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader -> reader.lineSequence() - .chunked(128) + .chunked(1024) .forEach { lines -> for (line in lines) { // Ignore comments in the trace if (line.startsWith("#") || line.isBlank()) { continue } + // Sample only a fraction of the VMs + if (random.nextDouble() > fraction) { + continue + } val values = line.split(",") - vmId = values[vmIdCol].trim() - // Ignore readings for VMs not in the sample - if (!vmIds.contains(vmId)) { + // Exclude VMs with a large number of cores (not specified exactly) + if (values[coreTableCol].contains(">")) { continue } - timestamp = values[timestampCol].trim().toLong() * 1000L - vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp) - cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz - vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp) - - val flops: Long = (cpuUsage * 5 * 60).toLong() - val lastFragment = vmIdToLastFragment[vmId] - - vmIdToLastFragment[vmId] = - if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) { - Fragment( - vmId, - lastFragment.tick, - lastFragment.flops + flops, - lastFragment.duration + traceInterval, - cpuUsage, - vmIdToMetadata[vmId]!!.cores - ) - } else { - val fragment = + vmId = values[vmIdTableCol].trim() + cores = values[coreTableCol].trim().toInt() + requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB + + vmIds.add(vmId) + vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L) + } + } + } + + // Read VM metric reading files + val timestampCol = 0 + val vmIdCol = 1 + val cpuUsageCol = 4 + val traceInterval = 5 * 60 * 1000L + + val vmIdToFragments = mutableMapOf>() + val vmIdToLastFragment = mutableMapOf() + val allFragments = mutableListOf() + + for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) { + val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv") + var timestamp: Long + var cpuUsage: Double + + BufferedReader(FileReader(readingsFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(",") + vmId = values[vmIdCol].trim() + + // Ignore readings for VMs not in the sample + if (!vmIds.contains(vmId)) { + continue + } + + timestamp = values[timestampCol].trim().toLong() * 1000L + vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp) + cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz + vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + val lastFragment = vmIdToLastFragment[vmId] + + vmIdToLastFragment[vmId] = + if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) { Fragment( vmId, - timestamp, - flops, - traceInterval, + lastFragment.tick, + lastFragment.flops + flops, + lastFragment.duration + traceInterval, cpuUsage, vmIdToMetadata[vmId]!!.cores ) - if (lastFragment != null) { - if (vmIdToFragments[vmId] == null) { - vmIdToFragments[vmId] = mutableListOf() + } else { + val fragment = + Fragment( + vmId, + timestamp, + flops, + traceInterval, + cpuUsage, + vmIdToMetadata[vmId]!!.cores + ) + if (lastFragment != null) { + if (vmIdToFragments[vmId] == null) { + vmIdToFragments[vmId] = mutableListOf() + } + vmIdToFragments[vmId]!!.add(lastFragment) + allFragments.add(lastFragment) } - vmIdToFragments[vmId]!!.add(lastFragment) - allFragments.add(lastFragment) + fragment } - fragment - } + } } - } + } } - } - for (entry in vmIdToLastFragment) { - if (entry.value != null) { - if (vmIdToFragments[entry.key] == null) { - vmIdToFragments[entry.key] = mutableListOf() + for (entry in vmIdToLastFragment) { + if (entry.value != null) { + if (vmIdToFragments[entry.key] == null) { + vmIdToFragments[entry.key] = mutableListOf() + } + vmIdToFragments[entry.key]!!.add(entry.value!!) } - vmIdToFragments[entry.key]!!.add(entry.value!!) } - } - println("Read ${vmIdToLastFragment.size} VMs") - - for (entry in vmIdToMetadata) { - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", entry.key) - metaRecord.put("submissionTime", entry.value.minTime) - metaRecord.put("endTime", entry.value.maxTime) - println("${entry.value.minTime} - ${entry.value.maxTime}") - metaRecord.put("maxCores", entry.value.cores) - metaRecord.put("requiredMemory", entry.value.requiredMemory) - metaWriter.write(metaRecord) - } + println("Read ${vmIdToLastFragment.size} VMs") - return allFragments + for (entry in vmIdToMetadata) { + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", entry.key) + metaRecord.put("submissionTime", entry.value.minTime) + metaRecord.put("endTime", entry.value.maxTime) + println("${entry.value.minTime} - ${entry.value.maxTime}") + metaRecord.put("maxCores", entry.value.cores) + metaRecord.put("requiredMemory", entry.value.requiredMemory) + metaWriter.write(metaRecord) + } + + return allFragments + } } +data class Fragment( + val id: String, + val tick: Long, + val flops: Long, + val duration: Long, + val usage: Double, + val cores: Int +) + class VmInfo(val cores: Int, val requiredMemory: Long, var minTime: Long, var maxTime: Long) + +/** + * A script to convert a trace in text format into a Parquet trace. + */ +fun main(args: Array) = TraceConverterCli().main(args) -- cgit v1.2.3