summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-tools/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-21 11:34:34 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-21 11:34:34 +0200
commit68ef3700ed2f69bcf0118bb69eda71e6b1f4d54f (patch)
tree73201888564accde4cfa107f4ffdb15e9f93d45c /opendc-trace/opendc-trace-tools/src
parentc7fff03408ee3109d0a39a96c043584a2d8f67ca (diff)
feat(trace): Add support for writing traces
This change adds a new API for writing traces in a trace format. Currently, writing is only supported by the OpenDC VM format, but over time the other formats will also have support for writing added.
Diffstat (limited to 'opendc-trace/opendc-trace-tools/src')
-rw-r--r--opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt82
1 files changed, 34 insertions, 48 deletions
diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
index cd5d287f..6fad43be 100644
--- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
+++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
@@ -20,6 +20,7 @@
* SOFTWARE.
*/
+@file:JvmName("TraceConverter")
package org.opendc.trace.tools
import com.github.ajalt.clikt.core.CliktCommand
@@ -29,25 +30,19 @@ import com.github.ajalt.clikt.parameters.groups.cooccurring
import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.*
import mu.KotlinLogging
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.ParquetWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.opendc.trace.*
-import org.opendc.trace.opendc.OdcVmTraceFormat
-import org.opendc.trace.util.parquet.LocalOutputFile
import java.io.File
+import java.time.Duration
+import java.time.Instant
import java.util.*
import kotlin.math.abs
import kotlin.math.max
import kotlin.math.min
-import kotlin.math.roundToLong
/**
* A script to convert a trace in text format into a Parquet trace.
*/
-public fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
+fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
/**
* Represents the command for converting traces
@@ -74,11 +69,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
/**
* The input format of the trace.
*/
- private val format by option("-f", "--format", help = "input format of trace")
- .choice("bitbrains-ex", "bitbrains", "azure")
+ private val inputFormat by option("-f", "--input-format", help = "format of output trace")
.required()
/**
+ * The format of the output trace.
+ */
+ private val outputFormat by option("--output-format", help = "format of output trace")
+ .default("opendc-vm")
+
+ /**
* The sampling options.
*/
private val samplingOptions by SamplingOptions().cooccurring()
@@ -94,17 +94,14 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
traceParquet.delete()
}
- val trace = Trace.open(input, format = format)
+ val inputTrace = Trace.open(input, format = inputFormat)
+ val outputTrace = Trace.create(output, format = outputFormat)
logger.info { "Building resources table" }
- val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet))
- .withSchema(OdcVmTraceFormat.RESOURCES_SCHEMA)
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .enablePageWriteChecksum()
- .build()
+ val metaWriter = outputTrace.getTable(TABLE_RESOURCES)!!.newWriter()
- val selectedVms = metaWriter.use { convertResources(trace, it) }
+ val selectedVms = metaWriter.use { convertResources(inputTrace, it) }
if (selectedVms.isEmpty()) {
logger.warn { "No VMs selected" }
@@ -114,23 +111,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
logger.info { "Wrote ${selectedVms.size} rows" }
logger.info { "Building resource states table" }
- val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet))
- .withSchema(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA)
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .withDictionaryEncoding("id", true)
- .withBloomFilterEnabled("id", true)
- .withBloomFilterNDV("id", selectedVms.size.toLong())
- .enableValidation()
- .build()
+ val writer = outputTrace.getTable(TABLE_RESOURCE_STATES)!!.newWriter()
- val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) }
+ val statesCount = writer.use { convertResourceStates(inputTrace, it, selectedVms) }
logger.info { "Wrote $statesCount rows" }
}
/**
* Convert the resources table for the trace.
*/
- private fun convertResources(trace: Trace, writer: ParquetWriter<GenericData.Record>): Set<String> {
+ private fun convertResources(trace: Trace, writer: TableWriter): Set<String> {
val random = samplingOptions?.let { Random(it.seed) }
val samplingFraction = samplingOptions?.fraction ?: 1.0
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
@@ -168,18 +158,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
continue
}
- val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCES_SCHEMA)
-
- builder["id"] = id
- builder["start_time"] = startTime
- builder["stop_time"] = stopTime
- builder["cpu_count"] = numCpus
- builder["mem_capacity"] = max(memCapacity, memUsage).roundToLong()
-
logger.info { "Selecting VM $id" }
-
- writer.write(builder.build())
selectedVms.add(id)
+
+ writer.startRow()
+ writer.set(RESOURCE_ID, id)
+ writer.set(RESOURCE_START_TIME, Instant.ofEpochMilli(startTime))
+ writer.set(RESOURCE_STOP_TIME, Instant.ofEpochMilli(stopTime))
+ writer.setInt(RESOURCE_CPU_COUNT, numCpus)
+ writer.setDouble(RESOURCE_MEM_CAPACITY, max(memCapacity, memUsage))
+ writer.endRow()
}
return selectedVms
@@ -188,7 +176,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
/**
* Convert the resource states table for the trace.
*/
- private fun convertResourceStates(trace: Trace, writer: ParquetWriter<GenericData.Record>, selectedVms: Set<String>): Int {
+ private fun convertResourceStates(trace: Trace, writer: TableWriter, selectedVms: Set<String>): Int {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
var hasNextRow = reader.nextRow()
@@ -231,15 +219,13 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
cpuCount == reader.getInt(RESOURCE_CPU_COUNT)
} while (shouldContinue)
- val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA)
-
- builder["id"] = id
- builder["timestamp"] = startTimestamp
- builder["duration"] = duration
- builder["cpu_count"] = cpuCount
- builder["cpu_usage"] = cpuUsage
-
- writer.write(builder.build())
+ writer.startRow()
+ writer.set(RESOURCE_ID, id)
+ writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTimestamp))
+ writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration))
+ writer.setInt(RESOURCE_CPU_COUNT, cpuCount)
+ writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage)
+ writer.endRow()
count++