diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-09-21 12:04:15 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-09-21 12:04:15 +0200 |
| commit | 322d91db03a7d74a00ec623ce624f979c0b77c03 (patch) | |
| tree | 73201888564accde4cfa107f4ffdb15e9f93d45c /opendc-trace/opendc-trace-tools | |
| parent | 453c25c4b453fa0af26bebbd8863abfb79218119 (diff) | |
| parent | 68ef3700ed2f69bcf0118bb69eda71e6b1f4d54f (diff) | |
merge: Add support for trace writing
This pull request extends the trace API to support writing new traces.
- Unify columns of different tables
- Support column lookup via index
- Use index lookup in trace loader
- Add property for describing partition keys
- Simplify TraceFormat SPI interface
- Add support for writing traces
**Breaking API Changes**
- `TraceFormat` SPI interface has been redesigned.
Diffstat (limited to 'opendc-trace/opendc-trace-tools')
| -rw-r--r-- | opendc-trace/opendc-trace-tools/build.gradle.kts | 11 | ||||
| -rw-r--r-- | opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt | 105 |
2 files changed, 47 insertions, 69 deletions
diff --git a/opendc-trace/opendc-trace-tools/build.gradle.kts b/opendc-trace/opendc-trace-tools/build.gradle.kts index 35190dba..14a0fc7c 100644 --- a/opendc-trace/opendc-trace-tools/build.gradle.kts +++ b/opendc-trace/opendc-trace-tools/build.gradle.kts @@ -29,19 +29,18 @@ plugins { } application { - mainClass.set("org.opendc.trace.tools.TraceConverterKt") + mainClass.set("org.opendc.trace.tools.TraceConverter") } dependencies { api(platform(projects.opendcPlatform)) - implementation(projects.opendcTrace.opendcTraceParquet) - implementation(projects.opendcTrace.opendcTraceOpendc) - implementation(projects.opendcTrace.opendcTraceAzure) - implementation(projects.opendcTrace.opendcTraceBitbrains) - + implementation(projects.opendcTrace.opendcTraceApi) implementation(libs.kotlin.logging) implementation(libs.clikt) + runtimeOnly(projects.opendcTrace.opendcTraceOpendc) + runtimeOnly(projects.opendcTrace.opendcTraceBitbrains) + runtimeOnly(projects.opendcTrace.opendcTraceAzure) runtimeOnly(libs.log4j.slf4j) } diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt index 322464cd..6fad43be 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt @@ -20,6 +20,7 @@ * SOFTWARE. */ +@file:JvmName("TraceConverter") package org.opendc.trace.tools import com.github.ajalt.clikt.core.CliktCommand @@ -29,28 +30,19 @@ import com.github.ajalt.clikt.parameters.groups.cooccurring import com.github.ajalt.clikt.parameters.options.* import com.github.ajalt.clikt.parameters.types.* import mu.KotlinLogging -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter -import org.apache.parquet.hadoop.ParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.* -import org.opendc.trace.azure.AzureTraceFormat -import org.opendc.trace.bitbrains.BitbrainsExTraceFormat -import org.opendc.trace.bitbrains.BitbrainsTraceFormat -import org.opendc.trace.opendc.OdcVmTraceFormat -import org.opendc.trace.util.parquet.LocalOutputFile import java.io.File +import java.time.Duration +import java.time.Instant import java.util.* import kotlin.math.abs import kotlin.math.max import kotlin.math.min -import kotlin.math.roundToLong /** * A script to convert a trace in text format into a Parquet trace. */ -public fun main(args: Array<String>): Unit = TraceConverterCli().main(args) +fun main(args: Array<String>): Unit = TraceConverterCli().main(args) /** * Represents the command for converting traces @@ -77,15 +69,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { /** * The input format of the trace. */ - private val format by option("-f", "--format", help = "input format of trace") - .choice( - "solvinity" to BitbrainsExTraceFormat(), - "bitbrains" to BitbrainsTraceFormat(), - "azure" to AzureTraceFormat() - ) + private val inputFormat by option("-f", "--input-format", help = "format of output trace") .required() /** + * The format of the output trace. + */ + private val outputFormat by option("--output-format", help = "format of output trace") + .default("opendc-vm") + + /** * The sampling options. */ private val samplingOptions by SamplingOptions().cooccurring() @@ -101,17 +94,14 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { traceParquet.delete() } - val trace = format.open(input.toURI().toURL()) + val inputTrace = Trace.open(input, format = inputFormat) + val outputTrace = Trace.create(output, format = outputFormat) logger.info { "Building resources table" } - val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet)) - .withSchema(OdcVmTraceFormat.RESOURCES_SCHEMA) - .withCompressionCodec(CompressionCodecName.ZSTD) - .enablePageWriteChecksum() - .build() + val metaWriter = outputTrace.getTable(TABLE_RESOURCES)!!.newWriter() - val selectedVms = metaWriter.use { convertResources(trace, it) } + val selectedVms = metaWriter.use { convertResources(inputTrace, it) } if (selectedVms.isEmpty()) { logger.warn { "No VMs selected" } @@ -121,23 +111,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { logger.info { "Wrote ${selectedVms.size} rows" } logger.info { "Building resource states table" } - val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet)) - .withSchema(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA) - .withCompressionCodec(CompressionCodecName.ZSTD) - .withDictionaryEncoding("id", true) - .withBloomFilterEnabled("id", true) - .withBloomFilterNDV("id", selectedVms.size.toLong()) - .enableValidation() - .build() + val writer = outputTrace.getTable(TABLE_RESOURCE_STATES)!!.newWriter() - val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) } + val statesCount = writer.use { convertResourceStates(inputTrace, it, selectedVms) } logger.info { "Wrote $statesCount rows" } } /** * Convert the resources table for the trace. */ - private fun convertResources(trace: Trace, writer: ParquetWriter<GenericData.Record>): Set<String> { + private fun convertResources(trace: Trace, writer: TableWriter): Set<String> { val random = samplingOptions?.let { Random(it.seed) } val samplingFraction = samplingOptions?.fraction ?: 1.0 val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() @@ -154,39 +137,37 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { var stopTime = Long.MIN_VALUE do { - id = reader.get(RESOURCE_STATE_ID) + id = reader.get(RESOURCE_ID) val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() startTime = min(startTime, timestamp) stopTime = max(stopTime, timestamp) - numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_CPU_COUNT)) + numCpus = max(numCpus, reader.getInt(RESOURCE_CPU_COUNT)) - memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY)) + memCapacity = max(memCapacity, reader.getDouble(RESOURCE_MEM_CAPACITY)) if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) { memUsage = max(memUsage, reader.getDouble(RESOURCE_STATE_MEM_USAGE)) } hasNextRow = reader.nextRow() - } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID)) + } while (hasNextRow && id == reader.get(RESOURCE_ID)) // Sample only a fraction of the VMs if (random != null && random.nextDouble() > samplingFraction) { continue } - val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCES_SCHEMA) - - builder["id"] = id - builder["start_time"] = startTime - builder["stop_time"] = stopTime - builder["cpu_count"] = numCpus - builder["mem_capacity"] = max(memCapacity, memUsage).roundToLong() - logger.info { "Selecting VM $id" } - - writer.write(builder.build()) selectedVms.add(id) + + writer.startRow() + writer.set(RESOURCE_ID, id) + writer.set(RESOURCE_START_TIME, Instant.ofEpochMilli(startTime)) + writer.set(RESOURCE_STOP_TIME, Instant.ofEpochMilli(stopTime)) + writer.setInt(RESOURCE_CPU_COUNT, numCpus) + writer.setDouble(RESOURCE_MEM_CAPACITY, max(memCapacity, memUsage)) + writer.endRow() } return selectedVms @@ -195,7 +176,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { /** * Convert the resource states table for the trace. */ - private fun convertResourceStates(trace: Trace, writer: ParquetWriter<GenericData.Record>, selectedVms: Set<String>): Int { + private fun convertResourceStates(trace: Trace, writer: TableWriter, selectedVms: Set<String>): Int { val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() var hasNextRow = reader.nextRow() @@ -204,14 +185,14 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { var lastTimestamp = 0L while (hasNextRow) { - val id = reader.get(RESOURCE_STATE_ID) + val id = reader.get(RESOURCE_ID) if (id !in selectedVms) { hasNextRow = reader.nextRow() continue } - val cpuCount = reader.getInt(RESOURCE_STATE_CPU_COUNT) + val cpuCount = reader.getInt(RESOURCE_CPU_COUNT) val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) val startTimestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() @@ -233,20 +214,18 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { break } - val shouldContinue = id == reader.get(RESOURCE_STATE_ID) && + val shouldContinue = id == reader.get(RESOURCE_ID) && abs(cpuUsage - reader.getDouble(RESOURCE_STATE_CPU_USAGE)) < 0.01 && - cpuCount == reader.getInt(RESOURCE_STATE_CPU_COUNT) + cpuCount == reader.getInt(RESOURCE_CPU_COUNT) } while (shouldContinue) - val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA) - - builder["id"] = id - builder["timestamp"] = startTimestamp - builder["duration"] = duration - builder["cpu_count"] = cpuCount - builder["cpu_usage"] = cpuUsage - - writer.write(builder.build()) + writer.startRow() + writer.set(RESOURCE_ID, id) + writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTimestamp)) + writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration)) + writer.setInt(RESOURCE_CPU_COUNT, cpuCount) + writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage) + writer.endRow() count++ |
