From 474044649a67cfcc857615b6a0f8387a2954abbd Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 16 Sep 2021 12:34:53 +0200 Subject: feat(trace): Update OpenDC VM trace format This change optimizes the OpenDC VM trace format by removing unnecessary columns as well as optimizing the writer settings. The new implementation still supports reading the old trace format in case users run OpenDC with older workload traces. --- .../compute/workload/ComputeWorkloadLoader.kt | 6 +- .../compute/workload/trace/TraceConverter.kt | 86 +++++++++++++--------- 2 files changed, 56 insertions(+), 36 deletions(-) (limited to 'opendc-compute/opendc-compute-workload') diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index afc0fce9..c92b212f 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -65,7 +65,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val id = reader.get(RESOURCE_STATE_ID) val time = reader.get(RESOURCE_STATE_TIMESTAMP) val duration = reader.get(RESOURCE_STATE_DURATION) - val cores = reader.getInt(RESOURCE_STATE_NCPUS) + val cores = reader.getInt(RESOURCE_STATE_CPU_COUNT) val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) val fragment = SimTraceWorkload.Fragment( @@ -75,7 +75,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { cores ) - fragments.getOrPut(id) { mutableListOf() }.add(fragment) + fragments.computeIfAbsent(id) { mutableListOf() }.add(fragment) } fragments @@ -103,7 +103,7 @@ public class ComputeWorkloadLoader(private val baseDir: File) { val submissionTime = reader.get(RESOURCE_START_TIME) val endTime = reader.get(RESOURCE_STOP_TIME) - val maxCores = reader.getInt(RESOURCE_NCPUS) + val maxCores = reader.getInt(RESOURCE_CPU_COUNT) val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt index 50f3a669..2d570787 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/trace/TraceConverter.kt @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.workload.vm.trace +package org.opendc.compute.workload.trace import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.arguments.argument @@ -42,6 +42,7 @@ import org.opendc.trace.opendc.OdcVmTraceFormat import org.opendc.trace.util.parquet.LocalOutputFile import java.io.File import java.util.* +import kotlin.math.abs import kotlin.math.max import kotlin.math.min import kotlin.math.roundToLong @@ -112,16 +113,21 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { val selectedVms = metaWriter.use { convertResources(trace, it) } + if (selectedVms.isEmpty()) { + logger.warn { "No VMs selected" } + return + } + logger.info { "Wrote ${selectedVms.size} rows" } logger.info { "Building resource states table" } val writer = AvroParquetWriter.builder(LocalOutputFile(traceParquet)) .withSchema(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA) .withCompressionCodec(CompressionCodecName.ZSTD) - .enableDictionaryEncoding() - .enablePageWriteChecksum() + .withDictionaryEncoding("id", true) .withBloomFilterEnabled("id", true) .withBloomFilterNDV("id", selectedVms.size.toLong()) + .enableValidation() .build() val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) } @@ -154,7 +160,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { startTime = min(startTime, timestamp) stopTime = max(stopTime, timestamp) - numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_NCPUS)) + numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_CPU_COUNT)) memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY)) if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) { @@ -172,10 +178,10 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCES_SCHEMA) builder["id"] = id - builder["submissionTime"] = startTime - builder["endTime"] = stopTime - builder["maxCores"] = numCpus - builder["requiredMemory"] = max(memCapacity, memUsage).roundToLong() + builder["start_time"] = startTime + builder["stop_time"] = stopTime + builder["cpu_count"] = numCpus + builder["mem_capacity"] = max(memCapacity, memUsage).roundToLong() logger.info { "Selecting VM $id" } @@ -194,44 +200,58 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { var hasNextRow = reader.nextRow() var count = 0 + var lastId: String? = null + var lastTimestamp = 0L while (hasNextRow) { - var lastTimestamp = Long.MIN_VALUE + val id = reader.get(RESOURCE_STATE_ID) - do { - val id = reader.get(RESOURCE_STATE_ID) + if (id !in selectedVms) { + hasNextRow = reader.nextRow() + continue + } - if (id !in selectedVms) { - hasNextRow = reader.nextRow() - continue - } + val cpuCount = reader.getInt(RESOURCE_STATE_CPU_COUNT) + val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) - val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA) - builder["id"] = id + val startTimestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() + var timestamp = startTimestamp + var duration: Long - val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() - if (lastTimestamp < 0) { - lastTimestamp = timestamp - 5 * 60 * 1000L + // Check whether the previous entry is from a different VM + if (id != lastId) { + lastTimestamp = timestamp - 5 * 60 * 1000L + } + + do { + timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() + + duration = timestamp - lastTimestamp + hasNextRow = reader.nextRow() + + if (!hasNextRow) { + break } - val duration = timestamp - lastTimestamp - val cores = reader.getInt(RESOURCE_STATE_NCPUS) - val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) - val flops = (cpuUsage * duration / 1000.0).roundToLong() + val shouldContinue = id == reader.get(RESOURCE_STATE_ID) && + abs(cpuUsage - reader.getDouble(RESOURCE_STATE_CPU_USAGE)) < 0.01 && + cpuCount == reader.getInt(RESOURCE_STATE_CPU_COUNT) + } while (shouldContinue) - builder["time"] = timestamp - builder["duration"] = duration - builder["cores"] = cores - builder["cpuUsage"] = cpuUsage - builder["flops"] = flops + val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA) - writer.write(builder.build()) + builder["id"] = id + builder["timestamp"] = startTimestamp + builder["duration"] = duration + builder["cpu_count"] = cpuCount + builder["cpu_usage"] = cpuUsage - lastTimestamp = timestamp - hasNextRow = reader.nextRow() - } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID)) + writer.write(builder.build()) count++ + + lastId = id + lastTimestamp = timestamp } return count -- cgit v1.2.3