summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt120
1 files changed, 117 insertions, 3 deletions
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
index d6726910..6c599517 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/trace/Sc20TraceConverter.kt
@@ -104,10 +104,15 @@ fun main(args: Array<String>) {
val vmPlacements = Sc20VmPlacementReader(vmPlacementFile.inputStream().buffered()).construct()
readSolvinityTrace(traceDirectory, metaSchema, metaWriter, clusters, vmPlacements)
- } else {
+ } else if (traceType == "bitbrains") {
+ readBitbrainsTrace(traceDirectory, metaSchema, metaWriter)
+ } else if (traceType == "azure") {
val seed = args[3].toLong()
readAzureTrace(traceDirectory, metaSchema, metaWriter, seed)
+ } else {
+ throw IllegalArgumentException("Unsupported trace type")
}
+
allFragments.sortWith(compareBy<Fragment> { it.tick }.thenBy { it.id })
for (fragment in allFragments) {
@@ -176,7 +181,7 @@ fun readSolvinityTrace(
continue
}
- val values = line.split(" ")
+ val values = line.split("\t")
val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
if (timestamp < minTimestamp) {
@@ -221,7 +226,7 @@ fun readSolvinityTrace(
continue
}
- val values = line.split(" ")
+ val values = line.split("\t")
vmId = vmFile.name
@@ -301,6 +306,115 @@ fun readSolvinityTrace(
}
/**
+ * Reads the confidential Solvinity trace.
+ */
+fun readBitbrainsTrace(
+ traceDirectory: File,
+ metaSchema: Schema,
+ metaWriter: ParquetWriter<GenericData.Record>
+): MutableList<Fragment> {
+ val timestampCol = 0
+ val cpuUsageCol = 3
+ val coreCol = 1
+ val provisionedMemoryCol = 5
+ val traceInterval = 5 * 60 * 1000L
+
+ val allFragments = mutableListOf<Fragment>()
+
+ traceDirectory.walk()
+ .filterNot { it.isDirectory }
+ .filter { it.extension == "csv" || it.extension == "txt" }
+ .toList()
+ .forEachIndexed { idx, vmFile ->
+ println(vmFile)
+
+ var vmId = ""
+ var maxCores = -1
+ var requiredMemory = -1L
+ var cores = -1
+ var minTime = Long.MAX_VALUE
+
+ val flopsFragments = sequence {
+ var last: Fragment? = null
+
+ BufferedReader(FileReader(vmFile)).use { reader ->
+ reader.lineSequence()
+ .drop(1)
+ .chunked(128)
+ .forEach { lines ->
+ for (line in lines) {
+ // Ignore comments in the trace
+ if (line.startsWith("#") || line.isBlank()) {
+ continue
+ }
+
+ val values = line.split(";\t")
+
+ vmId = vmFile.name
+
+ val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
+
+ cores = values[coreCol].trim().toInt()
+ requiredMemory = max(requiredMemory, values[provisionedMemoryCol].trim().toDouble().toLong())
+ maxCores = max(maxCores, cores)
+ minTime = min(minTime, timestamp)
+ val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
+
+ val flops: Long = (cpuUsage * 5 * 60).toLong()
+
+ last = if (last != null && last!!.flops == 0L && flops == 0L) {
+ val oldFragment = last!!
+ Fragment(
+ vmId,
+ oldFragment.tick,
+ oldFragment.flops + flops,
+ oldFragment.duration + traceInterval,
+ cpuUsage,
+ cores
+ )
+ } else {
+ val fragment =
+ Fragment(
+ vmId,
+ timestamp,
+ flops,
+ traceInterval,
+ cpuUsage,
+ cores
+ )
+ if (last != null) {
+ yield(last!!)
+ }
+ fragment
+ }
+ }
+ }
+ }
+
+ if (last != null) {
+ yield(last!!)
+ }
+ }
+
+ var maxTime = Long.MIN_VALUE
+ flopsFragments.forEach { fragment ->
+ allFragments.add(fragment)
+ maxTime = max(maxTime, fragment.tick)
+ }
+
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", vmId)
+ metaRecord.put("submissionTime", minTime)
+ metaRecord.put("endTime", maxTime)
+ metaRecord.put("maxCores", maxCores)
+ metaRecord.put("requiredMemory", requiredMemory)
+ metaWriter.write(metaRecord)
+ }
+
+ return allFragments
+}
+
+/**
* Reads the Azure cloud trace.
*
* See https://github.com/Azure/AzurePublicDataset/ for a definition of the trace.