diff options
Diffstat (limited to 'opendc-experiments')
2 files changed, 63 insertions, 22 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index e597c5ad..97ca97ec 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -33,6 +33,7 @@ dependencies { api(projects.opendcHarness.opendcHarnessApi) implementation(projects.opendcFormat) implementation(projects.opendcTrace.opendcTraceParquet) + implementation(projects.opendcTrace.opendcTraceBitbrains) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) implementation(projects.opendcSimulator.opendcSimulatorFailures) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt index e64f997f..0ded32f3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt @@ -36,8 +36,8 @@ import org.apache.avro.generic.GenericData import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.format.trace.bitbrains.BitbrainsTraceReader -import org.opendc.simulator.compute.workload.SimTraceWorkload +import org.opendc.trace.* +import org.opendc.trace.bitbrains.BitbrainsTraceFormat import org.opendc.trace.util.parquet.LocalOutputFile import java.io.BufferedReader import java.io.File @@ -338,33 +338,73 @@ class BitbrainsConversion : TraceConversion("Bitbrains") { metaWriter: ParquetWriter<GenericData.Record> ): MutableList<Fragment> { val fragments = mutableListOf<Fragment>() - BitbrainsTraceReader(traceDirectory).use { reader -> - reader.forEach { entry -> - val trace = (entry.workload as SimTraceWorkload).trace - var maxTime = Long.MIN_VALUE - trace.forEach { fragment -> - val flops: Long = (fragment.usage * fragment.duration / 1000).toLong() + val trace = BitbrainsTraceFormat().open(traceDirectory.toURI().toURL()) + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() + + var lastId: String? = null + var maxCores = Int.MIN_VALUE + var requiredMemory = Long.MIN_VALUE + var minTime = Long.MAX_VALUE + var maxTime = Long.MIN_VALUE + var lastTimestamp = Long.MIN_VALUE + + while (reader.nextRow()) { + val id = reader.get(RESOURCE_STATE_ID) + + if (lastId != null && lastId != id) { + val metaRecord = GenericData.Record(metaSchema) + metaRecord.put("id", lastId) + metaRecord.put("submissionTime", minTime) + metaRecord.put("endTime", maxTime) + metaRecord.put("maxCores", maxCores) + metaRecord.put("requiredMemory", requiredMemory) + metaWriter.write(metaRecord) + } + lastId = id + + val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP) + val timestampMs = timestamp.toEpochMilli() + val cpuUsage = reader.getDouble(RESOURCE_STATE_MEM_USAGE) + val cores = reader.getInt(RESOURCE_STATE_NCPUS) + val memCapacity = reader.getDouble(RESOURCE_STATE_MEM_CAPACITY) + + maxCores = max(maxCores, cores) + requiredMemory = max(requiredMemory, (memCapacity / 1000).toLong()) + + if (lastTimestamp < 0) { + lastTimestamp = timestampMs - 5 * 60 * 1000L + minTime = min(minTime, lastTimestamp) + } + + if (fragments.isEmpty()) { + val duration = 5 * 60 * 1000L + val flops: Long = (cpuUsage * duration / 1000).toLong() + fragments.add(Fragment(id, lastTimestamp, flops, duration, cpuUsage, cores)) + } else { + val last = fragments.last() + val duration = timestampMs - lastTimestamp + val flops: Long = (cpuUsage * duration / 1000).toLong() + + // Perform run-length encoding + if (last.id == id && (duration == 0L || last.usage == cpuUsage)) { + fragments[fragments.size - 1] = last.copy(duration = last.duration + duration) + } else { fragments.add( Fragment( - entry.name, - fragment.timestamp, + id, + lastTimestamp, flops, - fragment.duration, - fragment.usage, - fragment.cores + duration, + cpuUsage, + cores ) ) - maxTime = max(maxTime, fragment.timestamp + fragment.duration) } - - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", entry.name) - metaRecord.put("submissionTime", entry.start) - metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", entry.meta["cores"]) - metaRecord.put("requiredMemory", entry.meta["required-memory"]) - metaWriter.write(metaRecord) } + + val last = fragments.last() + maxTime = max(maxTime, last.tick + last.duration) + lastTimestamp = timestampMs } return fragments } |
