summaryrefslogtreecommitdiff
path: root/opendc-experiments
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-experiments')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt127
1 files changed, 27 insertions, 100 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
index 7cd1f159..d7daa35b 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
@@ -26,12 +26,7 @@ import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.parameters.arguments.argument
import com.github.ajalt.clikt.parameters.groups.OptionGroup
import com.github.ajalt.clikt.parameters.groups.groupChoice
-import com.github.ajalt.clikt.parameters.options.convert
-import com.github.ajalt.clikt.parameters.options.default
-import com.github.ajalt.clikt.parameters.options.defaultLazy
-import com.github.ajalt.clikt.parameters.options.option
-import com.github.ajalt.clikt.parameters.options.required
-import com.github.ajalt.clikt.parameters.options.split
+import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.file
import com.github.ajalt.clikt.parameters.types.long
import me.tongfei.progressbar.ProgressBar
@@ -41,11 +36,13 @@ import org.apache.avro.generic.GenericData
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.opendc.format.trace.bitbrains.BitbrainsTraceReader
import org.opendc.format.util.LocalOutputFile
+import org.opendc.simulator.compute.workload.SimTraceWorkload
import java.io.BufferedReader
import java.io.File
import java.io.FileReader
-import java.util.Random
+import java.util.*
import kotlin.math.max
import kotlin.math.min
@@ -340,106 +337,36 @@ class BitbrainsConversion : TraceConversion("Bitbrains") {
metaSchema: Schema,
metaWriter: ParquetWriter<GenericData.Record>
): MutableList<Fragment> {
- val timestampCol = 0
- val cpuUsageCol = 3
- val coreCol = 1
- val provisionedMemoryCol = 5
- val traceInterval = 5 * 60 * 1000L
-
- val allFragments = mutableListOf<Fragment>()
-
- traceDirectory.walk()
- .filterNot { it.isDirectory }
- .filter { it.extension == "csv" || it.extension == "txt" }
- .toList()
- .forEach { vmFile ->
- println(vmFile)
-
- var vmId = ""
- var maxCores = -1
- var requiredMemory = -1L
- var cores: Int
- var minTime = Long.MAX_VALUE
-
- val flopsFragments = sequence {
- var last: Fragment? = null
-
- BufferedReader(FileReader(vmFile)).use { reader ->
- reader.lineSequence()
- .drop(1)
- .chunked(128)
- .forEach { lines ->
- for (line in lines) {
- // Ignore comments in the trace
- if (line.startsWith("#") || line.isBlank()) {
- continue
- }
-
- val values = line.split(";\t")
-
- vmId = vmFile.name
-
- val timestamp = (values[timestampCol].trim().toLong() - 5 * 60) * 1000L
-
- cores = values[coreCol].trim().toInt()
- val provisionedMemory = values[provisionedMemoryCol].trim().toDouble() // KB
- requiredMemory = max(requiredMemory, (provisionedMemory / 1000).toLong())
- maxCores = max(maxCores, cores)
- minTime = min(minTime, timestamp)
- val cpuUsage = values[cpuUsageCol].trim().toDouble() // MHz
-
- val flops: Long = (cpuUsage * 5 * 60).toLong()
-
- last = if (last != null && last!!.flops == 0L && flops == 0L) {
- val oldFragment = last!!
- Fragment(
- vmId,
- oldFragment.tick,
- oldFragment.flops + flops,
- oldFragment.duration + traceInterval,
- cpuUsage,
- cores
- )
- } else {
- val fragment =
- Fragment(
- vmId,
- timestamp,
- flops,
- traceInterval,
- cpuUsage,
- cores
- )
- if (last != null) {
- yield(last!!)
- }
- fragment
- }
- }
- }
- }
-
- if (last != null) {
- yield(last!!)
- }
- }
-
+ val fragments = mutableListOf<Fragment>()
+ BitbrainsTraceReader(traceDirectory).use { reader ->
+ reader.forEach { entry ->
+ val trace = (entry.workload as SimTraceWorkload).trace
var maxTime = Long.MIN_VALUE
- flopsFragments.forEach { fragment ->
- allFragments.add(fragment)
- maxTime = max(maxTime, fragment.tick)
+ trace.forEach { fragment ->
+ val flops: Long = (fragment.usage * fragment.duration / 1000).toLong()
+ fragments.add(
+ Fragment(
+ entry.name,
+ fragment.timestamp,
+ flops,
+ fragment.duration,
+ fragment.usage,
+ fragment.cores
+ )
+ )
+ maxTime = max(maxTime, fragment.timestamp + fragment.duration)
}
val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", vmId)
- metaRecord.put("submissionTime", minTime)
+ metaRecord.put("id", entry.name)
+ metaRecord.put("submissionTime", entry.start)
metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", maxCores)
- metaRecord.put("requiredMemory", requiredMemory)
+ metaRecord.put("maxCores", entry.meta["cores"])
+ metaRecord.put("requiredMemory", entry.meta["required-memory"])
metaWriter.write(metaRecord)
}
-
- return allFragments
+ }
+ return fragments
}
}