diff options
Diffstat (limited to 'opendc-trace/opendc-trace-tools/src')
| -rw-r--r-- | opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt | 82 |
1 files changed, 34 insertions, 48 deletions
diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt index cd5d287f..6fad43be 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt @@ -20,6 +20,7 @@ * SOFTWARE. */ +@file:JvmName("TraceConverter") package org.opendc.trace.tools import com.github.ajalt.clikt.core.CliktCommand @@ -29,25 +30,19 @@ import com.github.ajalt.clikt.parameters.groups.cooccurring import com.github.ajalt.clikt.parameters.options.* import com.github.ajalt.clikt.parameters.types.* import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.* -import org.opendc.trace.opendc.OdcVmTraceFormat -import org.opendc.trace.util.parquet.LocalOutputFile import java.io.File +import java.time.Duration +import java.time.Instant import java.util.* import kotlin.math.abs import kotlin.math.max import kotlin.math.min -import kotlin.math.roundToLong /** * A script to convert a trace in text format into a Parquet trace. */ -public fun main(args: Array<String>): Unit = TraceConverterCli().main(args) +fun main(args: Array<String>): Unit = TraceConverterCli().main(args) /** * Represents the command for converting traces @@ -74,11 +69,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { /** * The input format of the trace. */ - private val format by option("-f", "--format", help = "input format of trace") - .choice("bitbrains-ex", "bitbrains", "azure") + private val inputFormat by option("-f", "--input-format", help = "format of output trace") .required() /** + * The format of the output trace. + */ + private val outputFormat by option("--output-format", help = "format of output trace") + .default("opendc-vm") + + /** * The sampling options. */ private val samplingOptions by SamplingOptions().cooccurring() @@ -94,17 +94,14 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { traceParquet.delete() } - val trace = Trace.open(input, format = format) + val inputTrace = Trace.open(input, format = inputFormat) + val outputTrace = Trace.create(output, format = outputFormat) logger.info { "Building resources table" } - val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet)) - .withSchema(OdcVmTraceFormat.RESOURCES_SCHEMA) - .withCompressionCodec(CompressionCodecName.ZSTD) - .enablePageWriteChecksum() - .build() + val metaWriter = outputTrace.getTable(TABLE_RESOURCES)!!.newWriter() - val selectedVms = metaWriter.use { convertResources(trace, it) } + val selectedVms = metaWriter.use { convertResources(inputTrace, it) } if (selectedVms.isEmpty()) { logger.warn { "No VMs selected" } @@ -114,23 +111,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { logger.info { "Wrote ${selectedVms.size} rows" } logger.info { "Building resource states table" } - val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet)) - .withSchema(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withDictionaryEncoding("id", true) - .withBloomFilterEnabled("id", true) - .withBloomFilterNDV("id", selectedVms.size.toLong()) - .enableValidation() - .build() + val writer = outputTrace.getTable(TABLE_RESOURCE_STATES)!!.newWriter() - val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) } + val statesCount = writer.use { convertResourceStates(inputTrace, it, selectedVms) } logger.info { "Wrote $statesCount rows" } } /** * Convert the resources table for the trace. */ - private fun convertResources(trace: Trace, writer: ParquetWriter<GenericData.Record>): Set<String> { + private fun convertResources(trace: Trace, writer: TableWriter): Set<String> { val random = samplingOptions?.let { Random(it.seed) } val samplingFraction = samplingOptions?.fraction ?: 1.0 val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() @@ -168,18 +158,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { continue } - val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCES_SCHEMA) - - builder["id"] = id - builder["start_time"] = startTime - builder["stop_time"] = stopTime - builder["cpu_count"] = numCpus - builder["mem_capacity"] = max(memCapacity, memUsage).roundToLong() - logger.info { "Selecting VM $id" } - - writer.write(builder.build()) selectedVms.add(id) + + writer.startRow() + writer.set(RESOURCE_ID, id) + writer.set(RESOURCE_START_TIME, Instant.ofEpochMilli(startTime)) + writer.set(RESOURCE_STOP_TIME, Instant.ofEpochMilli(stopTime)) + writer.setInt(RESOURCE_CPU_COUNT, numCpus) + writer.setDouble(RESOURCE_MEM_CAPACITY, max(memCapacity, memUsage)) + writer.endRow() } return selectedVms @@ -188,7 +176,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { /** * Convert the resource states table for the trace. */ - private fun convertResourceStates(trace: Trace, writer: ParquetWriter<GenericData.Record>, selectedVms: Set<String>): Int { + private fun convertResourceStates(trace: Trace, writer: TableWriter, selectedVms: Set<String>): Int { val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() var hasNextRow = reader.nextRow() @@ -231,15 +219,13 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { cpuCount == reader.getInt(RESOURCE_CPU_COUNT) } while (shouldContinue) - val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA) - - builder["id"] = id - builder["timestamp"] = startTimestamp - builder["duration"] = duration - builder["cpu_count"] = cpuCount - builder["cpu_usage"] = cpuUsage - - writer.write(builder.build()) + writer.startRow() + writer.set(RESOURCE_ID, id) + writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTimestamp)) + writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) + writer.setInt(RESOURCE_CPU_COUNT, cpuCount) + writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) + writer.endRow() count++ |
