From 5e2e040cecddd3c9c048b0992d4635ed0515f490 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Thu, 14 May 2020 18:05:18 +0200 Subject: Add Azure trace reader --- .../experiments/sc20/trace/Sc20TraceConverter.kt | 242 ++++++++++++++++++--- 1 file changed, 210 insertions(+), 32 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index 04cdd302..e28a497d 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -24,14 +24,17 @@ package com.atlarge.opendc.experiments.sc20.trace +import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import java.io.BufferedReader import java.io.File import java.io.FileReader +import java.util.Random import kotlin.math.max import kotlin.math.min @@ -66,20 +69,8 @@ fun main(args: Array) { .name("flops").type().longType().noDefault() .endRecord() - val timestampCol = 0 - val cpuUsageCol = 1 - val coreCol = 12 - val vmIdCol = 19 - val provisionedMemoryCol = 20 - val traceInterval = 5 * 60 * 1000L - val dest = File(args[0]) val traceDirectory = File(args[1]) - val vms = - traceDirectory.walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" || it.extension == "txt" } - .toList() val metaWriter = AvroParquetWriter.builder(Path(dest.absolutePath, "meta.parquet")) .withSchema(metaSchema) @@ -88,9 +79,68 @@ fun main(args: Array) { .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) .build() + val writer = AvroParquetWriter.builder(Path(dest.absolutePath, "trace.parquet")) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withPageSize(4 * 1024 * 1024) // For compression + .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .build() + + val traceType = args[2] + val allFragments = if (traceType == "solvinity") { + readSolvinityTrace(traceDirectory, metaSchema, metaWriter) + } else { + readAzureTrace(traceDirectory, metaSchema, metaWriter) + } + allFragments.sortWith(compareBy { it.tick }.thenBy { it.id }) + + for (fragment in allFragments) { + val record = GenericData.Record(schema) + record.put("id", fragment.id) + record.put("time", fragment.tick) + record.put("duration", fragment.duration) + record.put("cores", fragment.cores) + record.put("cpuUsage", fragment.usage) + record.put("flops", fragment.flops) + + writer.write(record) + } + + writer.close() + metaWriter.close() +} + +data class Fragment( + val id: String, + val tick: Long, + val flops: Long, + val duration: Long, + val usage: Double, + val cores: Int +) + +/** + * Reads the confidential Solvinity trace. + */ +fun readSolvinityTrace( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter +): MutableList { + val timestampCol = 0 + val cpuUsageCol = 1 + val coreCol = 12 + val vmIdCol = 19 + val provisionedMemoryCol = 20 + val traceInterval = 5 * 60 * 1000L + val allFragments = mutableListOf() - vms + + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() .forEachIndexed { idx, vmFile -> println(vmFile) @@ -176,29 +226,157 @@ fun main(args: Array) { metaWriter.write(metaRecord) } - val writer = AvroParquetWriter.builder(Path(dest.absolutePath, "trace.parquet")) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) - .build() + return allFragments +} - allFragments.sortWith(compareBy { it.tick }.thenBy { it.id }) +/** + * Reads the Azure cloud trace. + * + * See https://github.com/Azure/AzurePublicDataset/ for a definition of the trace. + */ +fun readAzureTrace( + traceDirectory: File, + metaSchema: Schema, + metaWriter: ParquetWriter +): MutableList { + val random = Random(0) + val fraction = 0.01 - for (fragment in allFragments) { - val record = GenericData.Record(schema) - record.put("id", fragment.id) - record.put("time", fragment.tick) - record.put("duration", fragment.duration) - record.put("cores", fragment.cores) - record.put("cpuUsage", fragment.usage) - record.put("flops", fragment.flops) + // Read VM table + val vmIdTableCol = 0 + val coreTableCol = 9 + val provisionedMemoryTableCol = 10 - writer.write(record) + var vmId: String + var cores: Int + var requiredMemory: Long + + val vmIds = mutableSetOf() + val vmIdToMetadata = mutableMapOf() + + BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader -> + reader.lineSequence() + .chunked(1024) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + // Sample only a fraction of the VMs + if (random.nextDouble() > fraction) { + continue + } + + val values = line.split(",") + + vmId = values[vmIdTableCol].trim() + cores = values[coreTableCol].trim().toInt() + requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB + + vmIds.add(vmId) + vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, -1L, Long.MAX_VALUE) + } + } } - writer.close() - metaWriter.close() + // Read VM metric reading files + val timestampCol = 0 + val vmIdCol = 1 + val cpuUsageCol = 4 + val traceInterval = 5 * 60 * 1000L + + val vmIdToFragments = mutableMapOf>() + val vmIdToLastFragment = mutableMapOf() + val allFragments = mutableListOf() + + File(traceDirectory, "readings").walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" } + .toList() + .forEach { readingsFile -> + var timestamp: Long + var vmId: String + var cpuUsage: Double + var minTime = Long.MAX_VALUE + + BufferedReader(FileReader(readingsFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(",") + vmId = values[vmIdCol].trim() + + // Ignore readings for VMs not in the sample + if (!vmIds.contains(vmId)) { + continue + } + + timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp) + cpuUsage = values[cpuUsageCol].trim().toDouble() * 4_000 // MHz + vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + val lastFragment = vmIdToLastFragment[vmId] + + vmIdToLastFragment[vmId] = + if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) { + Fragment( + vmId, + lastFragment.tick, + lastFragment.flops + flops, + lastFragment.duration + traceInterval, + cpuUsage, + vmIdToMetadata[vmId]!!.cores + ) + } else { + val fragment = + Fragment( + vmId, + timestamp, + flops, + traceInterval, + cpuUsage, + vmIdToMetadata[vmId]!!.cores + ) + if (lastFragment != null) { + if (vmIdToFragments[vmId] == null) { + vmIdToFragments[vmId] = mutableListOf() + } + vmIdToFragments[vmId]!!.add(lastFragment) + allFragments.add(lastFragment) + } + fragment + } + } + } + } + } + + for (entry in vmIdToLastFragment) { + if (entry.value != null) { + vmIdToFragments[entry.key]!!.add(entry.value!!) + } + } + + for (entry in vmIdToMetadata) { + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", entry.key) + metaRecord.put("submissionTime", entry.value.minTime) + metaRecord.put("endTime", entry.value.maxTime) + metaRecord.put("maxCores", entry.value.cores) + metaRecord.put("requiredMemory", entry.value.requiredMemory) + metaWriter.write(metaRecord) + } + + return allFragments } -data class Fragment(val id: String, val tick: Long, val flops: Long, val duration: Long, val usage: Double, val cores: Int) +class VmInfo(val cores: Int, val requiredMemory: Long, var minTime: Long, var maxTime: Long) -- cgit v1.2.3 From b85ef3f0a27b88515c7d4d23e3c569ecb55f529c Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 19 May 2020 11:00:40 +0200 Subject: Fix issues in Azure trace reader --- .../experiments/sc20/trace/Sc20TraceConverter.kt | 33 ++++++++++++++++++---- 1 file changed, 27 insertions(+), 6 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index e28a497d..dc0a1ee4 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -43,7 +43,7 @@ import kotlin.math.min */ fun main(args: Array) { if (args.size < 2) { - println("error: expected ") + println("error: expected ") return } @@ -71,15 +71,24 @@ fun main(args: Array) { val dest = File(args[0]) val traceDirectory = File(args[1]) + val metaParquet = File(dest.absolutePath, "meta.parquet") + val traceParquet = File(dest.absolutePath, "trace.parquet") - val metaWriter = AvroParquetWriter.builder(Path(dest.absolutePath, "meta.parquet")) + if (metaParquet.exists()) { + metaParquet.delete() + } + if (traceParquet.exists()) { + traceParquet.delete() + } + + val metaWriter = AvroParquetWriter.builder(Path(metaParquet.toURI())) .withSchema(metaSchema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withPageSize(4 * 1024 * 1024) // For compression .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) .build() - val writer = AvroParquetWriter.builder(Path(dest.absolutePath, "trace.parquet")) + val writer = AvroParquetWriter.builder(Path(traceParquet.toURI())) .withSchema(schema) .withCompressionCodec(CompressionCodecName.SNAPPY) .withPageSize(4 * 1024 * 1024) // For compression @@ -87,12 +96,14 @@ fun main(args: Array) { .build() val traceType = args[2] + val startTime = System.currentTimeMillis() val allFragments = if (traceType == "solvinity") { readSolvinityTrace(traceDirectory, metaSchema, metaWriter) } else { readAzureTrace(traceDirectory, metaSchema, metaWriter) } allFragments.sortWith(compareBy { it.tick }.thenBy { it.id }) + println("Reading trace took ${(System.currentTimeMillis() - startTime) / 1000} seconds") for (fragment in allFragments) { val record = GenericData.Record(schema) @@ -240,7 +251,7 @@ fun readAzureTrace( metaWriter: ParquetWriter ): MutableList { val random = Random(0) - val fraction = 0.01 + val fraction = 0.001 // Read VM table val vmIdTableCol = 0 @@ -270,12 +281,17 @@ fun readAzureTrace( val values = line.split(",") + // Exclude VMs with a large number of cores (not specified exactly) + if (values[coreTableCol].contains(">")) { + continue + } + vmId = values[vmIdTableCol].trim() cores = values[coreTableCol].trim().toInt() requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB vmIds.add(vmId) - vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, -1L, Long.MAX_VALUE) + vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L) } } } @@ -298,7 +314,6 @@ fun readAzureTrace( var timestamp: Long var vmId: String var cpuUsage: Double - var minTime = Long.MAX_VALUE BufferedReader(FileReader(readingsFile)).use { reader -> reader.lineSequence() @@ -362,15 +377,21 @@ fun readAzureTrace( for (entry in vmIdToLastFragment) { if (entry.value != null) { + if (vmIdToFragments[entry.key] == null) { + vmIdToFragments[entry.key] = mutableListOf() + } vmIdToFragments[entry.key]!!.add(entry.value!!) } } + println("Read ${vmIdToLastFragment.size} VMs") + for (entry in vmIdToMetadata) { val metaRecord = GenericData.Record(metaSchema) metaRecord.put("id", entry.key) metaRecord.put("submissionTime", entry.value.minTime) metaRecord.put("endTime", entry.value.maxTime) + println("${entry.value.minTime} - ${entry.value.maxTime}") metaRecord.put("maxCores", entry.value.cores) metaRecord.put("requiredMemory", entry.value.requiredMemory) metaWriter.write(metaRecord) -- cgit v1.2.3 From 50bf4b2c97dc1788035a05a8307d2c900796ecb3 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 19 May 2020 12:25:40 +0200 Subject: Fix double line --- .../com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index dc0a1ee4..593ad8ba 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -147,7 +147,6 @@ fun readSolvinityTrace( val allFragments = mutableListOf() - traceDirectory.walk() .filterNot { it.isDirectory } .filter { it.extension == "csv" || it.extension == "txt" } @@ -177,7 +176,7 @@ fun readSolvinityTrace( val values = line.split(" ") vmId = vmFile.name - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + val timestamp = values[timestampCol].trim().toLong() * 1000L cores = values[coreCol].trim().toInt() requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) maxCores = max(maxCores, cores) -- cgit v1.2.3 From 0df72a9032882abe132ab7683ff917f49a3c6645 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 19 May 2020 12:26:28 +0200 Subject: Fix timestamps --- .../com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index 593ad8ba..07b200a6 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -176,7 +176,7 @@ fun readSolvinityTrace( val values = line.split(" ") vmId = vmFile.name - val timestamp = values[timestampCol].trim().toLong() * 1000L + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L cores = values[coreCol].trim().toInt() requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) maxCores = max(maxCores, cores) @@ -332,7 +332,7 @@ fun readAzureTrace( continue } - timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + timestamp = values[timestampCol].trim().toLong() * 1000L vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp) cpuUsage = values[cpuUsageCol].trim().toDouble() * 4_000 // MHz vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp) -- cgit v1.2.3 From 82ce4b1dc83309b5f72a58617f4ab944c4479478 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 19 May 2020 12:28:22 +0200 Subject: Increase fraction --- .../com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index 07b200a6..39aa2a1f 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -250,7 +250,7 @@ fun readAzureTrace( metaWriter: ParquetWriter ): MutableList { val random = Random(0) - val fraction = 0.001 + val fraction = 0.005 // Read VM table val vmIdTableCol = 0 -- cgit v1.2.3 From 86cc481d6c28c32ee9b775edda4d3a956daa7938 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 19 May 2020 12:37:31 +0200 Subject: Make sure files are read in order --- .../experiments/sc20/trace/Sc20TraceConverter.kt | 109 ++++++++++----------- 1 file changed, 53 insertions(+), 56 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index 39aa2a1f..c381036b 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -305,74 +305,71 @@ fun readAzureTrace( val vmIdToLastFragment = mutableMapOf() val allFragments = mutableListOf() - File(traceDirectory, "readings").walk() - .filterNot { it.isDirectory } - .filter { it.extension == "csv" } - .toList() - .forEach { readingsFile -> - var timestamp: Long - var vmId: String - var cpuUsage: Double - - BufferedReader(FileReader(readingsFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split(",") - vmId = values[vmIdCol].trim() - - // Ignore readings for VMs not in the sample - if (!vmIds.contains(vmId)) { - continue - } + for (i in 1..195) { + val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv") + var timestamp: Long + var vmId: String + var cpuUsage: Double + + BufferedReader(FileReader(readingsFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEach { lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } - timestamp = values[timestampCol].trim().toLong() * 1000L - vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp) - cpuUsage = values[cpuUsageCol].trim().toDouble() * 4_000 // MHz - vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp) + val values = line.split(",") + vmId = values[vmIdCol].trim() - val flops: Long = (cpuUsage * 5 * 60).toLong() - val lastFragment = vmIdToLastFragment[vmId] + // Ignore readings for VMs not in the sample + if (!vmIds.contains(vmId)) { + continue + } - vmIdToLastFragment[vmId] = - if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) { + timestamp = values[timestampCol].trim().toLong() * 1000L + vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp) + cpuUsage = values[cpuUsageCol].trim().toDouble() * 4_000 // MHz + vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp) + + val flops: Long = (cpuUsage * 5 * 60).toLong() + val lastFragment = vmIdToLastFragment[vmId] + + vmIdToLastFragment[vmId] = + if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) { + Fragment( + vmId, + lastFragment.tick, + lastFragment.flops + flops, + lastFragment.duration + traceInterval, + cpuUsage, + vmIdToMetadata[vmId]!!.cores + ) + } else { + val fragment = Fragment( vmId, - lastFragment.tick, - lastFragment.flops + flops, - lastFragment.duration + traceInterval, + timestamp, + flops, + traceInterval, cpuUsage, vmIdToMetadata[vmId]!!.cores ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - vmIdToMetadata[vmId]!!.cores - ) - if (lastFragment != null) { - if (vmIdToFragments[vmId] == null) { - vmIdToFragments[vmId] = mutableListOf() - } - vmIdToFragments[vmId]!!.add(lastFragment) - allFragments.add(lastFragment) + if (lastFragment != null) { + if (vmIdToFragments[vmId] == null) { + vmIdToFragments[vmId] = mutableListOf() } - fragment + vmIdToFragments[vmId]!!.add(lastFragment) + allFragments.add(lastFragment) } - } + fragment + } } - } + } } + } for (entry in vmIdToLastFragment) { if (entry.value != null) { -- cgit v1.2.3 From 1c92520d86bc9d10143909844b46f610368d3333 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 19 May 2020 12:41:45 +0200 Subject: Add seeding --- .../opendc/experiments/sc20/trace/Sc20TraceConverter.kt | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index c381036b..e10baa61 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -42,8 +42,8 @@ import kotlin.math.min * A script to convert a trace in text format into a Parquet trace. */ fun main(args: Array) { - if (args.size < 2) { - println("error: expected ") + if (args.size < 3) { + println("error: expected []") return } @@ -100,7 +100,8 @@ fun main(args: Array) { val allFragments = if (traceType == "solvinity") { readSolvinityTrace(traceDirectory, metaSchema, metaWriter) } else { - readAzureTrace(traceDirectory, metaSchema, metaWriter) + val seed = args[3].toLong() + readAzureTrace(traceDirectory, metaSchema, metaWriter, seed) } allFragments.sortWith(compareBy { it.tick }.thenBy { it.id }) println("Reading trace took ${(System.currentTimeMillis() - startTime) / 1000} seconds") @@ -247,9 +248,10 @@ fun readSolvinityTrace( fun readAzureTrace( traceDirectory: File, metaSchema: Schema, - metaWriter: ParquetWriter + metaWriter: ParquetWriter, + seed: Long ): MutableList { - val random = Random(0) + val random = Random(seed) val fraction = 0.005 // Read VM table @@ -308,7 +310,6 @@ fun readAzureTrace( for (i in 1..195) { val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv") var timestamp: Long - var vmId: String var cpuUsage: Double BufferedReader(FileReader(readingsFile)).use { reader -> -- cgit v1.2.3 From b772682d93c851e14b2a724ecbb5508dc2846bdf Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 19 May 2020 12:56:53 +0200 Subject: Add progress bar --- .../com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index e10baa61..9664bc5f 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -24,6 +24,7 @@ package com.atlarge.opendc.experiments.sc20.trace +import me.tongfei.progressbar.ProgressBar import org.apache.avro.Schema import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData @@ -307,7 +308,7 @@ fun readAzureTrace( val vmIdToLastFragment = mutableMapOf() val allFragments = mutableListOf() - for (i in 1..195) { + for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) { val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv") var timestamp: Long var cpuUsage: Double -- cgit v1.2.3 From cb5c8ba35954e1fc012b82724433e283aaf4acbe Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 19 May 2020 12:57:20 +0200 Subject: REmove time measurement --- .../com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt | 2 -- 1 file changed, 2 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index 9664bc5f..bfa217e9 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -97,7 +97,6 @@ fun main(args: Array) { .build() val traceType = args[2] - val startTime = System.currentTimeMillis() val allFragments = if (traceType == "solvinity") { readSolvinityTrace(traceDirectory, metaSchema, metaWriter) } else { @@ -105,7 +104,6 @@ fun main(args: Array) { readAzureTrace(traceDirectory, metaSchema, metaWriter, seed) } allFragments.sortWith(compareBy { it.tick }.thenBy { it.id }) - println("Reading trace took ${(System.currentTimeMillis() - startTime) / 1000} seconds") for (fragment in allFragments) { val record = GenericData.Record(schema) -- cgit v1.2.3 From 458c431223adc22693eec7b5f0b679ff464bcb1b Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 19 May 2020 16:44:12 +0200 Subject: Add normalization for bitbrains trace --- .../experiments/sc20/trace/Sc20TraceConverter.kt | 34 ++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index bfa217e9..82aeda43 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -141,10 +141,40 @@ fun readSolvinityTrace( val timestampCol = 0 val cpuUsageCol = 1 val coreCol = 12 - val vmIdCol = 19 val provisionedMemoryCol = 20 val traceInterval = 5 * 60 * 1000L + // Identify start time of the entire trace + var minTimestamp = Long.MAX_VALUE + traceDirectory.walk() + .filterNot { it.isDirectory } + .filter { it.extension == "csv" || it.extension == "txt" } + .toList() + .forEach { vmFile -> + BufferedReader(FileReader(vmFile)).use { reader -> + reader.lineSequence() + .chunked(128) + .forEachIndexed { idx, lines -> + for (line in lines) { + // Ignore comments in the trace + if (line.startsWith("#") || line.isBlank()) { + continue + } + + val values = line.split(" ") + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + + if (timestamp < minTimestamp) { + minTimestamp = timestamp + } + return@forEach + } + } + } + } + + println("Start of trace at $minTimestamp") + val allFragments = mutableListOf() traceDirectory.walk() @@ -176,7 +206,7 @@ fun readSolvinityTrace( val values = line.split(" ") vmId = vmFile.name - val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L + val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L - minTimestamp cores = values[coreCol].trim().toInt() requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toLong()) maxCores = max(maxCores, cores) -- cgit v1.2.3 From a59e642c9d98c6c5189c17c74df257f3ea075e7c Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 19 May 2020 18:46:37 +0200 Subject: Add composite workload notion --- .../com/atlarge/opendc/experiments/sc20/Main.kt | 11 +++++-- .../experiments/sc20/experiment/Portfolios.kt | 36 ++++++++++++++++++---- .../opendc/experiments/sc20/experiment/Run.kt | 23 +++++++++----- .../experiments/sc20/experiment/model/Workload.kt | 8 ++++- .../sc20/trace/Sc20ParquetTraceReader.kt | 18 ++++++++--- .../sc20/trace/Sc20RawParquetTraceReader.kt | 2 -- .../experiments/sc20/trace/WorkloadSampler.kt | 20 ++++++++++-- 7 files changed, 92 insertions(+), 26 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt index 14de52b8..21cd2eca 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt @@ -24,9 +24,9 @@ package com.atlarge.opendc.experiments.sc20 +import com.atlarge.opendc.experiments.sc20.experiment.CompositeWorkloadPortfolio import com.atlarge.opendc.experiments.sc20.experiment.Experiment import com.atlarge.opendc.experiments.sc20.experiment.HorVerPortfolio -import com.atlarge.opendc.experiments.sc20.experiment.MoreHpcPortfolio import com.atlarge.opendc.experiments.sc20.experiment.MoreVelocityPortfolio import com.atlarge.opendc.experiments.sc20.experiment.OperationalPhenomenaPortfolio import com.atlarge.opendc.experiments.sc20.experiment.Portfolio @@ -96,9 +96,14 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { */ private val portfolios by option("--portfolio") .choice( - "hor-ver" to { experiment: Experiment, i: Int -> HorVerPortfolio(experiment, i) } as (Experiment, Int) -> Portfolio, + "hor-ver" to { experiment: Experiment, i: Int -> + HorVerPortfolio( + experiment, + i + ) + } as (Experiment, Int) -> Portfolio, "more-velocity" to { experiment, i -> MoreVelocityPortfolio(experiment, i) }, - "more-hpc" to { experiment, i -> MoreHpcPortfolio(experiment, i) }, + "composite-workload" to { experiment, i -> CompositeWorkloadPortfolio(experiment, i) }, "operational-phenomena" to { experiment, i -> OperationalPhenomenaPortfolio(experiment, i) }, "test" to { experiment, i -> TestPortfolio(experiment, i) }, ignoreCase = true diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt index 362144ae..77c0194b 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt @@ -24,6 +24,7 @@ package com.atlarge.opendc.experiments.sc20.experiment +import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload import com.atlarge.opendc.experiments.sc20.experiment.model.OperationalPhenomena import com.atlarge.opendc.experiments.sc20.experiment.model.Topology import com.atlarge.opendc.experiments.sc20.experiment.model.Workload @@ -82,7 +83,9 @@ public class MoreVelocityPortfolio(parent: Experiment, id: Int) : Portfolio(pare ) } -public class MoreHpcPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "more_hpc") { +public class CompositeWorkloadPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "composite-workload") { + private val totalSampleLoad = 3425709788935.9976 + override val topologies = listOf( Topology("base"), Topology("exp-vol-hor-hom"), @@ -91,14 +94,35 @@ public class MoreHpcPortfolio(parent: Experiment, id: Int) : Portfolio(parent, i ) override val workloads = listOf( - Workload("solvinity", 0.1), - Workload("solvinity", 0.25), - Workload("solvinity", 0.5), - Workload("solvinity", 1.0) + CompositeWorkload( + "all-solvinity", + listOf(Workload("solvinity", 0.0), Workload("azure", 1.0)), + totalSampleLoad + ), + CompositeWorkload( + "solvinity-25-azure-75", + listOf(Workload("solvinity", 0.25), Workload("azure", 0.75)), + totalSampleLoad + ), + CompositeWorkload( + "solvinity-50-azure-50", + listOf(Workload("solvinity", 0.5), Workload("azure", 0.5)), + totalSampleLoad + ), + CompositeWorkload( + "solvinity-75-azure-25", + listOf(Workload("solvinity", 0.75), Workload("azure", 0.25)), + totalSampleLoad + ), + CompositeWorkload( + "all-azure", + listOf(Workload("solvinity", 1.0), Workload("azure", 0.0)), + totalSampleLoad + ) ) override val operationalPhenomenas = listOf( - OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = true) + OperationalPhenomena(failureFrequency = 24.0 * 7, hasInterference = false) ) override val allocationPolicies = listOf( diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt index fd3e29c8..49941cf4 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt @@ -31,6 +31,7 @@ import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersA import com.atlarge.opendc.compute.virt.service.allocation.ProvisionedCoresAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.RandomAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.ReplayAllocationPolicy +import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload import com.atlarge.opendc.experiments.sc20.experiment.monitor.ParquetExperimentMonitor import com.atlarge.opendc.experiments.sc20.runner.TrialExperimentDescriptor import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionContext @@ -83,18 +84,26 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) } @Suppress("UNCHECKED_CAST") - val rawTraceReaders = context.cache.computeIfAbsent("raw-trace-readers") { mutableMapOf() } as MutableMap - val raw = synchronized(rawTraceReaders) { - val name = parent.workload.name - rawTraceReaders.computeIfAbsent(name) { - logger.info { "Loading trace $name" } - Sc20RawParquetTraceReader(File(experiment.traces, name)) + val rawTraceReaders = + context.cache.computeIfAbsent("raw-trace-readers") { mutableMapOf() } as MutableMap + val rawReaders = synchronized(rawTraceReaders) { + val workloadNames = if (parent.workload is CompositeWorkload) { + parent.workload.workloads.map { it.name } + } else { + listOf(parent.workload.name) + } + + workloadNames.map { workloadName -> + rawTraceReaders.computeIfAbsent(workloadName) { + logger.info { "Loading trace $workloadName" } + Sc20RawParquetTraceReader(File(experiment.traces, workloadName)) + } } } val performanceInterferenceModel = experiment.performanceInterferenceModel ?.takeIf { parent.operationalPhenomena.hasInterference } ?.construct(seeder) ?: emptyMap() - val trace = Sc20ParquetTraceReader(raw, performanceInterferenceModel, parent.workload, seed) + val trace = Sc20ParquetTraceReader(rawReaders, performanceInterferenceModel, parent.workload, seed) val monitor = ParquetExperimentMonitor(this) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt index 2dbdf570..cc3c448a 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/model/Workload.kt @@ -27,4 +27,10 @@ package com.atlarge.opendc.experiments.sc20.experiment.model /** * A workload that is considered for a scenario. */ -public class Workload(val name: String, val fraction: Double) +public open class Workload(open val name: String, val fraction: Double) + +/** + * A workload that is composed of multiple workloads. + */ +public class CompositeWorkload(override val name: String, val workloads: List, val totalLoad: Double) : + Workload(name, -1.0) diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt index ad50bf18..06bececf 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20ParquetTraceReader.kt @@ -28,6 +28,7 @@ import com.atlarge.opendc.compute.core.image.VmImage import com.atlarge.opendc.compute.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload import com.atlarge.opendc.experiments.sc20.experiment.model.Workload import com.atlarge.opendc.format.trace.TraceEntry import com.atlarge.opendc.format.trace.TraceReader @@ -42,7 +43,7 @@ import java.util.TreeSet */ @OptIn(ExperimentalStdlibApi::class) class Sc20ParquetTraceReader( - raw: Sc20RawParquetTraceReader, + rawReaders: List, performanceInterferenceModel: Map, workload: Workload, seed: Int @@ -51,8 +52,17 @@ class Sc20ParquetTraceReader( * The iterator over the actual trace. */ private val iterator: Iterator> = - raw.read() - .run { sampleWorkload(this, workload, seed) } + rawReaders + .map { it.read() } + .run { + if (workload is CompositeWorkload) { + this.zip(workload.workloads) + } else { + this.zip(listOf(workload)) + } + } + .map { sampleWorkload(it.first, workload, it.second, seed) } + .flatten() .run { // Apply performance interference model if (performanceInterferenceModel.isEmpty()) @@ -62,7 +72,7 @@ class Sc20ParquetTraceReader( val image = entry.workload.image val id = image.name val relevantPerformanceInterferenceModelItems = - performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) + performanceInterferenceModel[id] ?: PerformanceInterferenceModel(TreeSet()) val newImage = VmImage( diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt index 3b480d33..b390a753 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt @@ -105,8 +105,6 @@ class Sc20RawParquetTraceReader(private val path: File) { val requiredMemory = record["requiredMemory"] as Long val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) - logger.info { "VM $id" } - val vmFragments = fragments.getValue(id).asSequence() val totalLoad = vmFragments.sumByDouble { it.usage } * 5 * 60 // avg MHz * duration = MFLOPs val vmWorkload = VmWorkload( diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt index a8580686..dcb7190d 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt @@ -25,6 +25,7 @@ package com.atlarge.opendc.experiments.sc20.trace import com.atlarge.opendc.compute.core.workload.VmWorkload +import com.atlarge.opendc.experiments.sc20.experiment.model.CompositeWorkload import com.atlarge.opendc.experiments.sc20.experiment.model.Workload import com.atlarge.opendc.format.trace.TraceEntry import mu.KotlinLogging @@ -35,8 +36,17 @@ private val logger = KotlinLogging.logger {} /** * Sample the workload for the specified [run]. */ -fun sampleWorkload(trace: List>, workload: Workload, seed: Int): List> { - return sampleRegularWorkload(trace, workload, seed) +fun sampleWorkload( + trace: List>, + workload: Workload, + subWorkload: Workload, + seed: Int +): List> { + return if (workload is CompositeWorkload) { + sampleRegularWorkload(trace, subWorkload, seed) + } else { + sampleRegularWorkload(trace, workload, seed) + } } /** @@ -50,7 +60,11 @@ fun sampleRegularWorkload(trace: List>, workload: Workloa val shuffled = trace.shuffled(Random(seed)) val res = mutableListOf>() - val totalLoad = shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } + val totalLoad = if (workload is CompositeWorkload) { + workload.totalLoad + } else { + shuffled.sumByDouble { it.workload.image.tags.getValue("total-load") as Double } + } var currentLoad = 0.0 for (entry in shuffled) { -- cgit v1.2.3 From 125ffab62a5d2c9dd775257ae5c418825da9a7f5 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 19 May 2020 20:07:45 +0200 Subject: Fix integration test --- .../kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index 68c2cbc5..7f8eb585 100644 --- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -204,7 +204,7 @@ class Sc20IntegrationTest { */ private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader { return Sc20ParquetTraceReader( - Sc20RawParquetTraceReader(File("src/test/resources/trace")), + listOf(Sc20RawParquetTraceReader(File("src/test/resources/trace"))), emptyMap(), Workload("test", fraction), seed -- cgit v1.2.3 From 48cedf7107a1992575b9d7db3f8130807db196da Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 19 May 2020 21:16:16 +0200 Subject: Fix issue --- .../src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt | 8 ++------ .../kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt | 1 + .../opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt | 8 ++++++++ 3 files changed, 11 insertions(+), 6 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt index 21cd2eca..8d2e9ba8 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt @@ -96,12 +96,8 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { */ private val portfolios by option("--portfolio") .choice( - "hor-ver" to { experiment: Experiment, i: Int -> - HorVerPortfolio( - experiment, - i - ) - } as (Experiment, Int) -> Portfolio, + "hor-ver" to { experiment: Experiment, i: Int -> HorVerPortfolio(experiment, i) } + as (Experiment, Int) -> Portfolio, "more-velocity" to { experiment, i -> MoreVelocityPortfolio(experiment, i) }, "composite-workload" to { experiment, i -> CompositeWorkloadPortfolio(experiment, i) }, "operational-phenomena" to { experiment, i -> OperationalPhenomenaPortfolio(experiment, i) }, diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt index 49941cf4..9928f223 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt @@ -100,6 +100,7 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) } } } + val performanceInterferenceModel = experiment.performanceInterferenceModel ?.takeIf { parent.operationalPhenomena.hasInterference } ?.construct(seeder) ?: emptyMap() diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt index b390a753..652f7746 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20RawParquetTraceReader.kt @@ -98,7 +98,12 @@ class Sc20RawParquetTraceReader(private val path: File) { return try { while (true) { val record = metaReader.read() ?: break + val id = record["id"].toString() + if (!fragments.containsKey(id)) { + continue + } + val submissionTime = record["submissionTime"] as Long val endTime = record["endTime"] as Long val maxCores = record["maxCores"] as Int @@ -127,6 +132,9 @@ class Sc20RawParquetTraceReader(private val path: File) { } entries + } catch (e: Exception) { + e.printStackTrace() + throw e } finally { metaReader.close() } -- cgit v1.2.3 From 294d6d5e5ce9fdc68f98434b3c7f1dc970c06647 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 19 May 2020 21:20:22 +0200 Subject: Remove full load sampling --- .../com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt | 3 --- 1 file changed, 3 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt index dcb7190d..589854b6 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt @@ -54,9 +54,6 @@ fun sampleWorkload( */ fun sampleRegularWorkload(trace: List>, workload: Workload, seed: Int): List> { val fraction = workload.fraction - if (fraction >= 1) { - return trace - } val shuffled = trace.shuffled(Random(seed)) val res = mutableListOf>() -- cgit v1.2.3 From 57d02e62a66534146b90a8e66e18609da495ba67 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 19 May 2020 21:28:15 +0200 Subject: Fix sampling issue --- .../opendc/experiments/sc20/trace/WorkloadSampler.kt | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt index 589854b6..dd70d4f1 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/WorkloadSampler.kt @@ -43,17 +43,22 @@ fun sampleWorkload( seed: Int ): List> { return if (workload is CompositeWorkload) { - sampleRegularWorkload(trace, subWorkload, seed) + sampleRegularWorkload(trace, workload, subWorkload, seed) } else { - sampleRegularWorkload(trace, workload, seed) + sampleRegularWorkload(trace, workload, workload, seed) } } /** * Sample a regular (non-HPC) workload. */ -fun sampleRegularWorkload(trace: List>, workload: Workload, seed: Int): List> { - val fraction = workload.fraction +fun sampleRegularWorkload( + trace: List>, + workload: Workload, + subWorkload: Workload, + seed: Int +): List> { + val fraction = subWorkload.fraction val shuffled = trace.shuffled(Random(seed)) val res = mutableListOf>() -- cgit v1.2.3 From 7b9c023cfba1533be1bea534cfe20647856ffa1a Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 19 May 2020 21:42:52 +0200 Subject: Adjust integration test --- .../com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt | 2 +- .../kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index 82aeda43..317a57b3 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -281,7 +281,7 @@ fun readAzureTrace( seed: Long ): MutableList { val random = Random(seed) - val fraction = 0.005 + val fraction = 0.02 // Read VM table val vmIdTableCol = 0 diff --git a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index 7f8eb585..5ecf7605 100644 --- a/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -145,8 +145,8 @@ class Sc20IntegrationTest { assertEquals(50, scheduler.submittedVms, "The trace contains 50 VMs") assertEquals(50, scheduler.finishedVms, "All VMs should finish after a run") assertEquals(207379117949, monitor.totalRequestedBurst) - assertEquals(207102919834, monitor.totalGrantedBurst) - assertEquals(276198896, monitor.totalOvercommissionedBurst) + assertEquals(203388071813, monitor.totalGrantedBurst) + assertEquals(3991046136, monitor.totalOvercommissionedBurst) assertEquals(0, monitor.totalInterferedBurst) } -- cgit v1.2.3 From d4542ff6b3be302c0b89704d55c8673499d2e498 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Tue, 19 May 2020 22:57:16 +0200 Subject: Reduce fraction --- .../com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index 317a57b3..e2d0c1d6 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -281,7 +281,7 @@ fun readAzureTrace( seed: Long ): MutableList { val random = Random(seed) - val fraction = 0.02 + val fraction = 0.01 // Read VM table val vmIdTableCol = 0 -- cgit v1.2.3 From b4e5aa96ba565eee156173e806b607c9ec179dad Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Wed, 20 May 2020 11:08:01 +0200 Subject: Fix workload names --- .../com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt index 77c0194b..0302402b 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt @@ -95,7 +95,7 @@ public class CompositeWorkloadPortfolio(parent: Experiment, id: Int) : Portfolio override val workloads = listOf( CompositeWorkload( - "all-solvinity", + "all-azure", listOf(Workload("solvinity", 0.0), Workload("azure", 1.0)), totalSampleLoad ), @@ -115,7 +115,7 @@ public class CompositeWorkloadPortfolio(parent: Experiment, id: Int) : Portfolio totalSampleLoad ), CompositeWorkload( - "all-azure", + "all-solvinity", listOf(Workload("solvinity", 1.0), Workload("azure", 0.0)), totalSampleLoad ) -- cgit v1.2.3 From cf100886cf9cae5503d7e5df88845a93912d4677 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Wed, 20 May 2020 14:41:00 +0200 Subject: Add replay portfolio --- .../opendc/experiments/sc20/experiment/Portfolios.kt | 19 +++++++++++++++++++ .../atlarge/opendc/experiments/sc20/experiment/Run.kt | 2 +- 2 files changed, 20 insertions(+), 1 deletion(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt index 0302402b..89012036 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Portfolios.kt @@ -160,6 +160,25 @@ public class OperationalPhenomenaPortfolio(parent: Experiment, id: Int) : Portfo ) } +public class ReplayPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "replay") { + override val topologies = listOf( + Topology("base") + ) + + override val workloads = listOf( + Workload("solvinity", 1.0) + ) + + override val operationalPhenomenas = listOf( + OperationalPhenomena(failureFrequency = 0.0, hasInterference = false) + ) + + override val allocationPolicies = listOf( + "replay", + "active-servers" + ) +} + public class TestPortfolio(parent: Experiment, id: Int) : Portfolio(parent, id, "test") { override val repetitions: Int = 1 diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt index 9928f223..5d1c29e2 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt @@ -79,7 +79,7 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) "provisioned-cores" -> ProvisionedCoresAllocationPolicy() "provisioned-cores-inv" -> ProvisionedCoresAllocationPolicy(true) "random" -> RandomAllocationPolicy(Random(seeder.nextInt())) - "replay" -> ReplayAllocationPolicy(emptyMap()) + "replay" -> ReplayAllocationPolicy(experiment.vmPlacements) else -> throw IllegalArgumentException("Unknown policy ${parent.allocationPolicy}") } -- cgit v1.2.3 From 077d9da22a7877b56e967ed41a8e2f8dcd210b0e Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Wed, 20 May 2020 14:51:21 +0200 Subject: Add replay portfolio to the list --- .../src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt | 2 ++ 1 file changed, 2 insertions(+) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt index 8d2e9ba8..677af381 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Main.kt @@ -30,6 +30,7 @@ import com.atlarge.opendc.experiments.sc20.experiment.HorVerPortfolio import com.atlarge.opendc.experiments.sc20.experiment.MoreVelocityPortfolio import com.atlarge.opendc.experiments.sc20.experiment.OperationalPhenomenaPortfolio import com.atlarge.opendc.experiments.sc20.experiment.Portfolio +import com.atlarge.opendc.experiments.sc20.experiment.ReplayPortfolio import com.atlarge.opendc.experiments.sc20.experiment.TestPortfolio import com.atlarge.opendc.experiments.sc20.reporter.ConsoleExperimentReporter import com.atlarge.opendc.experiments.sc20.runner.ExperimentDescriptor @@ -101,6 +102,7 @@ class ExperimentCli : CliktCommand(name = "sc20-experiment") { "more-velocity" to { experiment, i -> MoreVelocityPortfolio(experiment, i) }, "composite-workload" to { experiment, i -> CompositeWorkloadPortfolio(experiment, i) }, "operational-phenomena" to { experiment, i -> OperationalPhenomenaPortfolio(experiment, i) }, + "replay" to { experiment, i -> ReplayPortfolio(experiment, i) }, "test" to { experiment, i -> TestPortfolio(experiment, i) }, ignoreCase = true ) -- cgit v1.2.3 From 1e24e8934396bf2947c0d3bd244c29c0ecff98a1 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Wed, 20 May 2020 15:05:25 +0200 Subject: Add fallback for placement info --- .../virt/service/allocation/ReplayAllocationPolicy.kt | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/ReplayAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/ReplayAllocationPolicy.kt index f88eaed8..9e675e80 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/ReplayAllocationPolicy.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/ReplayAllocationPolicy.kt @@ -2,14 +2,18 @@ package com.atlarge.opendc.compute.virt.service.allocation import com.atlarge.opendc.compute.virt.service.HypervisorView import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService +import mu.KotlinLogging +import kotlin.random.Random + +private val logger = KotlinLogging.logger {} /** - * Policy replaying VM-cluster assignnment. + * Policy replaying VM-cluster assignment. * * Within each cluster, the active servers on each node determine which node gets * assigned the VM image. */ -class ReplayAllocationPolicy(val vmPlacements: Map) : AllocationPolicy { +class ReplayAllocationPolicy(val vmPlacements: Map, val random: Random = Random(0)) : AllocationPolicy { override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic { override fun select( hypervisors: Set, @@ -18,8 +22,14 @@ class ReplayAllocationPolicy(val vmPlacements: Map) : Allocation val clusterName = vmPlacements[image.name] ?: throw IllegalStateException("Could not find placement data in VM placement file for VM ${image.name}") val machinesInCluster = hypervisors.filter { it.server.name.contains(clusterName) } + + if (machinesInCluster.isEmpty()) { + logger.info { "Could not find any machines belonging to cluster $clusterName for image ${image.name}, assigning randomly." } + return hypervisors.random(random) + } + return machinesInCluster.maxBy { it.availableMemory } - ?: throw IllegalStateException("Cloud not find any machines belonging to cluster $clusterName for image ${image.name}") + ?: throw IllegalStateException("Cloud not find any machine and could not randomly assign") } } } -- cgit v1.2.3 From 78bf43a33ade79ccd412ee849c15069fdb728608 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Wed, 20 May 2020 15:10:15 +0200 Subject: Make replay smarter --- .../opendc/compute/virt/service/allocation/ReplayAllocationPolicy.kt | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/ReplayAllocationPolicy.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/ReplayAllocationPolicy.kt index 9e675e80..59acfce2 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/ReplayAllocationPolicy.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/allocation/ReplayAllocationPolicy.kt @@ -3,7 +3,6 @@ package com.atlarge.opendc.compute.virt.service.allocation import com.atlarge.opendc.compute.virt.service.HypervisorView import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import mu.KotlinLogging -import kotlin.random.Random private val logger = KotlinLogging.logger {} @@ -13,7 +12,7 @@ private val logger = KotlinLogging.logger {} * Within each cluster, the active servers on each node determine which node gets * assigned the VM image. */ -class ReplayAllocationPolicy(val vmPlacements: Map, val random: Random = Random(0)) : AllocationPolicy { +class ReplayAllocationPolicy(val vmPlacements: Map) : AllocationPolicy { override fun invoke(): AllocationPolicy.Logic = object : AllocationPolicy.Logic { override fun select( hypervisors: Set, @@ -25,7 +24,7 @@ class ReplayAllocationPolicy(val vmPlacements: Map, val random: if (machinesInCluster.isEmpty()) { logger.info { "Could not find any machines belonging to cluster $clusterName for image ${image.name}, assigning randomly." } - return hypervisors.random(random) + return hypervisors.maxBy { it.availableMemory } } return machinesInCluster.maxBy { it.availableMemory } -- cgit v1.2.3 From 93d97d05a4956463f855bd567b8e47491b4e18cc Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Wed, 20 May 2020 20:06:27 +0200 Subject: Fix Azure VM filtering issue --- .../com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index 9d2b0247..be40e3ca 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -216,7 +216,7 @@ suspend fun processTrace(reader: TraceReader, scheduler: SimpleVirtP while (reader.hasNext()) { val (time, workload) = reader.next() - if (vmPlacements.isNotEmpty()) { + if (vmPlacements.isNotEmpty() && workload.name.contains(".txt")) { val vmId = workload.name.replace("VM Workload ", "") // Check if VM in topology val clusterName = vmPlacements[vmId] -- cgit v1.2.3 From d6fba71ad1052ddbb1619e520a17f5f1d3e0c3ed Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Wed, 20 May 2020 21:13:37 +0200 Subject: Decrease Azure CPU max --- .../com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt index e2d0c1d6..32a188b5 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt @@ -361,7 +361,7 @@ fun readAzureTrace( timestamp = values[timestampCol].trim().toLong() * 1000L vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp) - cpuUsage = values[cpuUsageCol].trim().toDouble() * 4_000 // MHz + cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp) val flops: Long = (cpuUsage * 5 * 60).toLong() -- cgit v1.2.3 From f6f685196d6579d9866d2a04c2c01a63e8c169d7 Mon Sep 17 00:00:00 2001 From: Georgios Andreadis Date: Wed, 20 May 2020 21:37:34 +0200 Subject: Report host cores --- .../sc20/experiment/monitor/ParquetExperimentMonitor.kt | 9 ++++++--- .../com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt | 3 ++- .../experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt | 2 ++ 3 files changed, 10 insertions(+), 4 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt index 7f71eb3e..be60e5b7 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/monitor/ParquetExperimentMonitor.kt @@ -131,7 +131,8 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0 + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount ) currentHostEvent[hostServer] = event @@ -148,7 +149,8 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0 + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount ) currentHostEvent[hostServer] = event @@ -167,7 +169,8 @@ class ParquetExperimentMonitor(val run: Run) : ExperimentMonitor { interferedBurst, cpuUsage, cpuDemand, - lastPowerConsumption[hostServer] ?: 200.0 + lastPowerConsumption[hostServer] ?: 200.0, + hostServer.flavor.cpuCount ) currentHostEvent[hostServer] = event diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt index 8e91bca2..b9030172 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/HostEvent.kt @@ -40,5 +40,6 @@ data class HostEvent( val interferedBurst: Long, val cpuUsage: Double, val cpuDemand: Double, - val powerDraw: Double + val powerDraw: Double, + val cores: Int ) : Event("host-metrics") diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt index 523897b0..3bc09435 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/telemetry/parquet/ParquetHostEventWriter.kt @@ -55,6 +55,7 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) : record.put("cpu_usage", event.cpuUsage) record.put("cpu_demand", event.cpuDemand) record.put("power_draw", event.powerDraw * (1.0 / 12)) + record.put("cores", event.cores) } val schema: Schema = SchemaBuilder @@ -76,6 +77,7 @@ public class ParquetHostEventWriter(path: File, bufferSize: Int) : .name("cpu_usage").type().doubleType().noDefault() .name("cpu_demand").type().doubleType().noDefault() .name("power_draw").type().doubleType().noDefault() + .name("cores").type().intType().noDefault() .endRecord() } } -- cgit v1.2.3