summaryrefslogtreecommitdiff
path: root/opendc-experiments
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-experiments')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/build.gradle.kts1
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt84
2 files changed, 63 insertions, 22 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
index e597c5ad..97ca97ec 100644
--- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
+++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts
@@ -33,6 +33,7 @@ dependencies {
api(projects.opendcHarness.opendcHarnessApi)
implementation(projects.opendcFormat)
implementation(projects.opendcTrace.opendcTraceParquet)
+ implementation(projects.opendcTrace.opendcTraceBitbrains)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(projects.opendcSimulator.opendcSimulatorCompute)
implementation(projects.opendcSimulator.opendcSimulatorFailures)
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
index e64f997f..0ded32f3 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt
@@ -36,8 +36,8 @@ import org.apache.avro.generic.GenericData
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.format.trace.bitbrains.BitbrainsTraceReader
-import org.opendc.simulator.compute.workload.SimTraceWorkload
+import org.opendc.trace.*
+import org.opendc.trace.bitbrains.BitbrainsTraceFormat
import org.opendc.trace.util.parquet.LocalOutputFile
import java.io.BufferedReader
import java.io.File
@@ -338,33 +338,73 @@ class BitbrainsConversion : TraceConversion("Bitbrains") {
metaWriter: ParquetWriter<GenericData.Record>
): MutableList<Fragment> {
val fragments = mutableListOf<Fragment>()
- BitbrainsTraceReader(traceDirectory).use { reader ->
- reader.forEach { entry ->
- val trace = (entry.workload as SimTraceWorkload).trace
- var maxTime = Long.MIN_VALUE
- trace.forEach { fragment ->
- val flops: Long = (fragment.usage * fragment.duration / 1000).toLong()
+ val trace = BitbrainsTraceFormat().open(traceDirectory.toURI().toURL())
+ val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
+
+ var lastId: String? = null
+ var maxCores = Int.MIN_VALUE
+ var requiredMemory = Long.MIN_VALUE
+ var minTime = Long.MAX_VALUE
+ var maxTime = Long.MIN_VALUE
+ var lastTimestamp = Long.MIN_VALUE
+
+ while (reader.nextRow()) {
+ val id = reader.get(RESOURCE_STATE_ID)
+
+ if (lastId != null && lastId != id) {
+ val metaRecord = GenericData.Record(metaSchema)
+ metaRecord.put("id", lastId)
+ metaRecord.put("submissionTime", minTime)
+ metaRecord.put("endTime", maxTime)
+ metaRecord.put("maxCores", maxCores)
+ metaRecord.put("requiredMemory", requiredMemory)
+ metaWriter.write(metaRecord)
+ }
+ lastId = id
+
+ val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP)
+ val timestampMs = timestamp.toEpochMilli()
+ val cpuUsage = reader.getDouble(RESOURCE_STATE_MEM_USAGE)
+ val cores = reader.getInt(RESOURCE_STATE_NCPUS)
+ val memCapacity = reader.getDouble(RESOURCE_STATE_MEM_CAPACITY)
+
+ maxCores = max(maxCores, cores)
+ requiredMemory = max(requiredMemory, (memCapacity / 1000).toLong())
+
+ if (lastTimestamp < 0) {
+ lastTimestamp = timestampMs - 5 * 60 * 1000L
+ minTime = min(minTime, lastTimestamp)
+ }
+
+ if (fragments.isEmpty()) {
+ val duration = 5 * 60 * 1000L
+ val flops: Long = (cpuUsage * duration / 1000).toLong()
+ fragments.add(Fragment(id, lastTimestamp, flops, duration, cpuUsage, cores))
+ } else {
+ val last = fragments.last()
+ val duration = timestampMs - lastTimestamp
+ val flops: Long = (cpuUsage * duration / 1000).toLong()
+
+ // Perform run-length encoding
+ if (last.id == id && (duration == 0L || last.usage == cpuUsage)) {
+ fragments[fragments.size - 1] = last.copy(duration = last.duration + duration)
+ } else {
fragments.add(
Fragment(
- entry.name,
- fragment.timestamp,
+ id,
+ lastTimestamp,
flops,
- fragment.duration,
- fragment.usage,
- fragment.cores
+ duration,
+ cpuUsage,
+ cores
)
)
- maxTime = max(maxTime, fragment.timestamp + fragment.duration)
}
-
- val metaRecord = GenericData.Record(metaSchema)
- metaRecord.put("id", entry.name)
- metaRecord.put("submissionTime", entry.start)
- metaRecord.put("endTime", maxTime)
- metaRecord.put("maxCores", entry.meta["cores"])
- metaRecord.put("requiredMemory", entry.meta["required-memory"])
- metaWriter.write(metaRecord)
}
+
+ val last = fragments.last()
+ maxTime = max(maxTime, last.tick + last.duration)
+ lastTimestamp = timestampMs
}
return fragments
}