summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-31 16:18:56 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-02 11:30:15 +0200
commit214480d154771f0b783829b6e5ec82b837304ad2 (patch)
tree84d823132bdd0e351ec5a41c210be6551a98273d /opendc-experiments/opendc-experiments-capelin/src/main
parent9fcce6ade8714f7f0a9073fe5b7ddd3f0b35c375 (diff)
refactor(trace): Move Bitbrains format into separate module
This change moves Bitbrains trace support into a separate module and adds support for the new trace api.
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src/main')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt84
1 files changed, 62 insertions, 22 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
index e64f997f..0ded32f3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
@@ -36,8 +36,8 @@ import org.apache.avro.generic.GenericData
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.format.trace.bitbrains.BitbrainsTraceReader
-import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.trace.*
+import org.opendc.trace.bitbrains.BitbrainsTraceFormat
import org.opendc.trace.util.parquet.LocalOutputFile
import java.io.BufferedReader
import java.io.File
@@ -338,33 +338,73 @@ class BitbrainsConversion : TraceConversion("Bitbrains") {
metaWriter: ParquetWriter<GenericData.Record>
): MutableList<Fragment> {
val fragments = mutableListOf<Fragment>()
- BitbrainsTraceReader(traceDirectory).use { reader ->
- reader.forEach { entry ->
- val trace = (entry.workload as SimTraceWorkload).trace
- var maxTime = Long.MIN_VALUE
- trace.forEach { fragment ->
- val flops: Long = (fragment.usage * fragment.duration / 1000).toLong()
+ val trace = BitbrainsTraceFormat().open(traceDirectory.toURI().toURL())
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
+
+ var lastId: String? = null
+ var maxCores = Int.MIN_VALUE
+ var requiredMemory = Long.MIN_VALUE
+ var minTime = Long.MAX_VALUE
+ var maxTime = Long.MIN_VALUE
+ var lastTimestamp = Long.MIN_VALUE
+
+ while (reader.nextRow()) {
+ val id = reader.get(RESOURCE_STATE_ID)
+
+ if (lastId != null && lastId != id) {
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", lastId)
+ metaRecord.put("submissionTime", minTime)
+ metaRecord.put("endTime", maxTime)
+ metaRecord.put("maxCores", maxCores)
+ metaRecord.put("requiredMemory", requiredMemory)
+ metaWriter.write(metaRecord)
+ }
+ lastId = id
+
+ val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP)
+ val timestampMs = timestamp.toEpochMilli()
+ val cpuUsage = reader.getDouble(RESOURCE_STATE_MEM_USAGE)
+ val cores = reader.getInt(RESOURCE_STATE_NCPUS)
+ val memCapacity = reader.getDouble(RESOURCE_STATE_MEM_CAPACITY)
+
+ maxCores = max(maxCores, cores)
+ requiredMemory = max(requiredMemory, (memCapacity / 1000).toLong())
+
+ if (lastTimestamp < 0) {
+ lastTimestamp = timestampMs - 5 * 60 * 1000L
+ minTime = min(minTime, lastTimestamp)
+ }
+
+ if (fragments.isEmpty()) {
+ val duration = 5 * 60 * 1000L
+ val flops: Long = (cpuUsage * duration / 1000).toLong()
+ fragments.add(Fragment(id, lastTimestamp, flops, duration, cpuUsage, cores))
+ } else {
+ val last = fragments.last()
+ val duration = timestampMs - lastTimestamp
+ val flops: Long = (cpuUsage * duration / 1000).toLong()
+
+ // Perform run-length encoding
+ if (last.id == id && (duration == 0L || last.usage == cpuUsage)) {
+ fragments[fragments.size - 1] = last.copy(duration = last.duration + duration)
+ } else {
fragments.add(
Fragment(
- entry.name,
- fragment.timestamp,
+ id,
+ lastTimestamp,
flops,
- fragment.duration,
- fragment.usage,
- fragment.cores
+ duration,
+ cpuUsage,
+ cores
)
)
- maxTime = max(maxTime, fragment.timestamp + fragment.duration)
}
-
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", entry.name)
- metaRecord.put("submissionTime", entry.start)
- metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", entry.meta["cores"])
- metaRecord.put("requiredMemory", entry.meta["required-memory"])
- metaWriter.write(metaRecord)
}
+
+ val last = fragments.last()
+ maxTime = max(maxTime, last.tick + last.duration)
+ lastTimestamp = timestampMs
}
return fragments
}