summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-tools
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-21 12:04:15 +0200
committerGitHub <noreply@github.com>2021-09-21 12:04:15 +0200
commit322d91db03a7d74a00ec623ce624f979c0b77c03 (patch)
tree73201888564accde4cfa107f4ffdb15e9f93d45c /opendc-trace/opendc-trace-tools
parent453c25c4b453fa0af26bebbd8863abfb79218119 (diff)
parent68ef3700ed2f69bcf0118bb69eda71e6b1f4d54f (diff)
merge: Add support for trace writing
This pull request extends the trace API to support writing new traces. - Unify columns of different tables - Support column lookup via index - Use index lookup in trace loader - Add property for describing partition keys - Simplify TraceFormat SPI interface - Add support for writing traces **Breaking API Changes** - `TraceFormat` SPI interface has been redesigned.
Diffstat (limited to 'opendc-trace/opendc-trace-tools')
-rw-r--r--opendc-trace/opendc-trace-tools/build.gradle.kts11
-rw-r--r--opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt105
2 files changed, 47 insertions, 69 deletions
diff --git a/opendc-trace/opendc-trace-tools/build.gradle.kts b/opendc-trace/opendc-trace-tools/build.gradle.kts
index 35190dba..14a0fc7c 100644
--- a/opendc-trace/opendc-trace-tools/build.gradle.kts
+++ b/opendc-trace/opendc-trace-tools/build.gradle.kts
@@ -29,19 +29,18 @@ plugins {
}
application {
- mainClass.set("org.opendc.trace.tools.TraceConverterKt")
+ mainClass.set("org.opendc.trace.tools.TraceConverter")
}
dependencies {
api(platform(projects.opendcPlatform))
- implementation(projects.opendcTrace.opendcTraceParquet)
- implementation(projects.opendcTrace.opendcTraceOpendc)
- implementation(projects.opendcTrace.opendcTraceAzure)
- implementation(projects.opendcTrace.opendcTraceBitbrains)
-
+ implementation(projects.opendcTrace.opendcTraceApi)
implementation(libs.kotlin.logging)
implementation(libs.clikt)
+ runtimeOnly(projects.opendcTrace.opendcTraceOpendc)
+ runtimeOnly(projects.opendcTrace.opendcTraceBitbrains)
+ runtimeOnly(projects.opendcTrace.opendcTraceAzure)
runtimeOnly(libs.log4j.slf4j)
}
diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
index 322464cd..6fad43be 100644
--- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
+++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
@@ -20,6 +20,7 @@
* SOFTWARE.
*/
+@file:JvmName("TraceConverter")
package org.opendc.trace.tools
import com.github.ajalt.clikt.core.CliktCommand
@@ -29,28 +30,19 @@ import com.github.ajalt.clikt.parameters.groups.cooccurring
import com.github.ajalt.clikt.parameters.options.*
import com.github.ajalt.clikt.parameters.types.*
import mu.KotlinLogging
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
-import org.apache.parquet.hadoop.ParquetWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.opendc.trace.*
-import org.opendc.trace.azure.AzureTraceFormat
-import org.opendc.trace.bitbrains.BitbrainsExTraceFormat
-import org.opendc.trace.bitbrains.BitbrainsTraceFormat
-import org.opendc.trace.opendc.OdcVmTraceFormat
-import org.opendc.trace.util.parquet.LocalOutputFile
import java.io.File
+import java.time.Duration
+import java.time.Instant
import java.util.*
import kotlin.math.abs
import kotlin.math.max
import kotlin.math.min
-import kotlin.math.roundToLong
/**
* A script to convert a trace in text format into a Parquet trace.
*/
-public fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
+fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
/**
* Represents the command for converting traces
@@ -77,15 +69,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
/**
* The input format of the trace.
*/
- private val format by option("-f", "--format", help = "input format of trace")
- .choice(
- "solvinity" to BitbrainsExTraceFormat(),
- "bitbrains" to BitbrainsTraceFormat(),
- "azure" to AzureTraceFormat()
- )
+ private val inputFormat by option("-f", "--input-format", help = "format of output trace")
.required()
/**
+ * The format of the output trace.
+ */
+ private val outputFormat by option("--output-format", help = "format of output trace")
+ .default("opendc-vm")
+
+ /**
* The sampling options.
*/
private val samplingOptions by SamplingOptions().cooccurring()
@@ -101,17 +94,14 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
traceParquet.delete()
}
- val trace = format.open(input.toURI().toURL())
+ val inputTrace = Trace.open(input, format = inputFormat)
+ val outputTrace = Trace.create(output, format = outputFormat)
logger.info { "Building resources table" }
- val metaWriter = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(metaParquet))
- .withSchema(OdcVmTraceFormat.RESOURCES_SCHEMA)
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .enablePageWriteChecksum()
- .build()
+ val metaWriter = outputTrace.getTable(TABLE_RESOURCES)!!.newWriter()
- val selectedVms = metaWriter.use { convertResources(trace, it) }
+ val selectedVms = metaWriter.use { convertResources(inputTrace, it) }
if (selectedVms.isEmpty()) {
logger.warn { "No VMs selected" }
@@ -121,23 +111,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
logger.info { "Wrote ${selectedVms.size} rows" }
logger.info { "Building resource states table" }
- val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(traceParquet))
- .withSchema(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA)
- .withCompressionCodec(CompressionCodecName.ZSTD)
- .withDictionaryEncoding("id", true)
- .withBloomFilterEnabled("id", true)
- .withBloomFilterNDV("id", selectedVms.size.toLong())
- .enableValidation()
- .build()
+ val writer = outputTrace.getTable(TABLE_RESOURCE_STATES)!!.newWriter()
- val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) }
+ val statesCount = writer.use { convertResourceStates(inputTrace, it, selectedVms) }
logger.info { "Wrote $statesCount rows" }
}
/**
* Convert the resources table for the trace.
*/
- private fun convertResources(trace: Trace, writer: ParquetWriter<GenericData.Record>): Set<String> {
+ private fun convertResources(trace: Trace, writer: TableWriter): Set<String> {
val random = samplingOptions?.let { Random(it.seed) }
val samplingFraction = samplingOptions?.fraction ?: 1.0
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
@@ -154,39 +137,37 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
var stopTime = Long.MIN_VALUE
do {
- id = reader.get(RESOURCE_STATE_ID)
+ id = reader.get(RESOURCE_ID)
val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
startTime = min(startTime, timestamp)
stopTime = max(stopTime, timestamp)
- numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_CPU_COUNT))
+ numCpus = max(numCpus, reader.getInt(RESOURCE_CPU_COUNT))
- memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY))
+ memCapacity = max(memCapacity, reader.getDouble(RESOURCE_MEM_CAPACITY))
if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) {
memUsage = max(memUsage, reader.getDouble(RESOURCE_STATE_MEM_USAGE))
}
hasNextRow = reader.nextRow()
- } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID))
+ } while (hasNextRow && id == reader.get(RESOURCE_ID))
// Sample only a fraction of the VMs
if (random != null && random.nextDouble() > samplingFraction) {
continue
}
- val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCES_SCHEMA)
-
- builder["id"] = id
- builder["start_time"] = startTime
- builder["stop_time"] = stopTime
- builder["cpu_count"] = numCpus
- builder["mem_capacity"] = max(memCapacity, memUsage).roundToLong()
-
logger.info { "Selecting VM $id" }
-
- writer.write(builder.build())
selectedVms.add(id)
+
+ writer.startRow()
+ writer.set(RESOURCE_ID, id)
+ writer.set(RESOURCE_START_TIME, Instant.ofEpochMilli(startTime))
+ writer.set(RESOURCE_STOP_TIME, Instant.ofEpochMilli(stopTime))
+ writer.setInt(RESOURCE_CPU_COUNT, numCpus)
+ writer.setDouble(RESOURCE_MEM_CAPACITY, max(memCapacity, memUsage))
+ writer.endRow()
}
return selectedVms
@@ -195,7 +176,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
/**
* Convert the resource states table for the trace.
*/
- private fun convertResourceStates(trace: Trace, writer: ParquetWriter<GenericData.Record>, selectedVms: Set<String>): Int {
+ private fun convertResourceStates(trace: Trace, writer: TableWriter, selectedVms: Set<String>): Int {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
var hasNextRow = reader.nextRow()
@@ -204,14 +185,14 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
var lastTimestamp = 0L
while (hasNextRow) {
- val id = reader.get(RESOURCE_STATE_ID)
+ val id = reader.get(RESOURCE_ID)
if (id !in selectedVms) {
hasNextRow = reader.nextRow()
continue
}
- val cpuCount = reader.getInt(RESOURCE_STATE_CPU_COUNT)
+ val cpuCount = reader.getInt(RESOURCE_CPU_COUNT)
val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE)
val startTimestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli()
@@ -233,20 +214,18 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
break
}
- val shouldContinue = id == reader.get(RESOURCE_STATE_ID) &&
+ val shouldContinue = id == reader.get(RESOURCE_ID) &&
abs(cpuUsage - reader.getDouble(RESOURCE_STATE_CPU_USAGE)) < 0.01 &&
- cpuCount == reader.getInt(RESOURCE_STATE_CPU_COUNT)
+ cpuCount == reader.getInt(RESOURCE_CPU_COUNT)
} while (shouldContinue)
- val builder = GenericRecordBuilder(OdcVmTraceFormat.RESOURCE_STATES_SCHEMA)
-
- builder["id"] = id
- builder["timestamp"] = startTimestamp
- builder["duration"] = duration
- builder["cpu_count"] = cpuCount
- builder["cpu_usage"] = cpuUsage
-
- writer.write(builder.build())
+ writer.startRow()
+ writer.set(RESOURCE_ID, id)
+ writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTimestamp))
+ writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration))
+ writer.setInt(RESOURCE_CPU_COUNT, cpuCount)
+ writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage)
+ writer.endRow()
count++