From fa08b63bd749e9fbe1a1d04ef2ebd7a86453fa4b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 11 Sep 2021 10:52:30 +0200 Subject: perf(trace): Keep reader state in own class This change removes the external class that holds the state of the reader and instead puts the state in the reader implementation. Maintaining a separate class for the state increases the complexity and has worse performance characteristics due to the bytecode produced by Kotlin for property accesses. --- .../capelin/trace/RawParquetTraceReader.kt | 4 +- .../capelin/trace/bp/BPResourceTable.kt | 2 +- .../capelin/trace/bp/BPResourceTableReader.kt | 10 +- .../capelin/trace/sv/SvResourceStateTable.kt | 5 +- .../capelin/trace/sv/SvResourceStateTableReader.kt | 142 +++++++++------------ 5 files changed, 73 insertions(+), 90 deletions(-) (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt index fa4e9ed8..ca937328 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/RawParquetTraceReader.kt @@ -90,9 +90,9 @@ class RawParquetTraceReader(private val path: File) { } val submissionTime = reader.get(RESOURCE_START_TIME) - val endTime = reader.get(RESOURCE_END_TIME) + val endTime = reader.get(RESOURCE_STOP_TIME) val maxCores = reader.getInt(RESOURCE_NCPUS) - val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) + val requiredMemory = reader.getDouble(RESOURCE_MEM_CAPACITY) / 1000.0 // Convert from KB to MB val uid = UUID.nameUUIDFromBytes("$id-${counter++}".toByteArray()) val vmFragments = fragments.getValue(id).asSequence() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt index 74d1e574..bff8c55e 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt @@ -38,7 +38,7 @@ internal class BPResourceTable(private val path: Path) : Table { return when (column) { RESOURCE_ID -> true RESOURCE_START_TIME -> true - RESOURCE_END_TIME -> true + RESOURCE_STOP_TIME -> true RESOURCE_NCPUS -> true RESOURCE_MEM_CAPACITY -> true else -> false diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt index 0a105783..4416aae8 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTableReader.kt @@ -45,7 +45,7 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader true RESOURCE_START_TIME -> true - RESOURCE_END_TIME -> true + RESOURCE_STOP_TIME -> true RESOURCE_NCPUS -> true RESOURCE_MEM_CAPACITY -> true else -> false @@ -59,9 +59,9 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader record["id"].toString() RESOURCE_START_TIME -> Instant.ofEpochMilli(record["submissionTime"] as Long) - RESOURCE_END_TIME -> Instant.ofEpochMilli(record["endTime"] as Long) - RESOURCE_NCPUS -> record["maxCores"] - RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() + RESOURCE_STOP_TIME -> Instant.ofEpochMilli(record["endTime"] as Long) + RESOURCE_NCPUS -> getInt(RESOURCE_NCPUS) + RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_MEM_CAPACITY) else -> throw IllegalArgumentException("Invalid column") } @@ -90,7 +90,7 @@ internal class BPResourceTableReader(private val reader: LocalParquetReader (record["requiredMemory"] as Number).toDouble() + RESOURCE_MEM_CAPACITY -> (record["requiredMemory"] as Number).toDouble() * 1000.0 // MB to KB else -> throw IllegalArgumentException("Invalid column") } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt index 24abb109..3a9bda69 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt @@ -31,7 +31,7 @@ import kotlin.io.path.extension import kotlin.io.path.nameWithoutExtension /** - * The resource state [Table] in the Bitbrains format. + * The resource state [Table] in the extended Bitbrains format. */ internal class SvResourceStateTable(path: Path) : Table { /** @@ -40,6 +40,7 @@ internal class SvResourceStateTable(path: Path) : Table { private val partitions = Files.walk(path, 1) .filter { !Files.isDirectory(it) && it.extension == "txt" } .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() override val name: String = TABLE_RESOURCE_STATES @@ -126,7 +127,7 @@ internal class SvResourceStateTable(path: Path) : Table { } } - override fun toString(): String = "BitbrainsCompositeTableReader" + override fun toString(): String = "SvCompositeTableReader" } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt index 1a556f8d..a7d2d70a 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt @@ -30,13 +30,8 @@ import java.time.Instant * A [TableReader] for the Bitbrains resource state table. */ internal class SvResourceStateTableReader(private val reader: BufferedReader) : TableReader { - /** - * The current parser state. - */ - private val state = RowState() - override fun nextRow(): Boolean { - state.reset() + reset() var line: String var num = 0 @@ -75,18 +70,18 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : val field = line.subSequence(start, end) as String when (col++) { - COL_TIMESTAMP -> state.timestamp = Instant.ofEpochSecond(field.toLong(10)) - COL_CPU_USAGE -> state.cpuUsage = field.toDouble() - COL_CPU_DEMAND -> state.cpuDemand = field.toDouble() - COL_DISK_READ -> state.diskRead = field.toDouble() - COL_DISK_WRITE -> state.diskWrite = field.toDouble() - COL_CLUSTER_ID -> state.cluster = field.trim() - COL_NCPUS -> state.cpuCores = field.toInt(10) - COL_CPU_READY_PCT -> state.cpuReadyPct = field.toDouble() - COL_POWERED_ON -> state.poweredOn = field.toInt(10) == 1 - COL_CPU_CAPACITY -> state.cpuCapacity = field.toDouble() - COL_ID -> state.id = field.trim() - COL_MEM_CAPACITY -> state.memCapacity = field.toDouble() + COL_TIMESTAMP -> timestamp = Instant.ofEpochSecond(field.toLong(10)) + COL_CPU_USAGE -> cpuUsage = field.toDouble() + COL_CPU_DEMAND -> cpuDemand = field.toDouble() + COL_DISK_READ -> diskRead = field.toDouble() + COL_DISK_WRITE -> diskWrite = field.toDouble() + COL_CLUSTER_ID -> cluster = field.trim() + COL_NCPUS -> cpuCores = field.toInt(10) + COL_CPU_READY_PCT -> cpuReadyPct = field.toDouble() + COL_POWERED_ON -> poweredOn = field.toInt(10) == 1 + COL_CPU_CAPACITY -> cpuCapacity = field.toDouble() + COL_ID -> id = field.trim() + COL_MEM_CAPACITY -> memCapacity = field.toDouble() } } @@ -113,16 +108,16 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : override fun get(column: TableColumn): T { val res: Any? = when (column) { - RESOURCE_STATE_ID -> state.id - RESOURCE_STATE_CLUSTER_ID -> state.cluster - RESOURCE_STATE_TIMESTAMP -> state.timestamp - RESOURCE_STATE_NCPUS -> state.cpuCores - RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity - RESOURCE_STATE_CPU_USAGE -> state.cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity - RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity - RESOURCE_STATE_DISK_READ -> state.diskRead - RESOURCE_STATE_DISK_WRITE -> state.diskWrite + RESOURCE_STATE_ID -> id + RESOURCE_STATE_CLUSTER_ID -> cluster + RESOURCE_STATE_TIMESTAMP -> timestamp + RESOURCE_STATE_NCPUS -> getInt(RESOURCE_STATE_NCPUS) + RESOURCE_STATE_CPU_CAPACITY -> getDouble(RESOURCE_STATE_CPU_CAPACITY) + RESOURCE_STATE_CPU_USAGE -> getDouble(RESOURCE_STATE_CPU_USAGE) + RESOURCE_STATE_CPU_USAGE_PCT -> getDouble(RESOURCE_STATE_CPU_USAGE_PCT) + RESOURCE_STATE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY) + RESOURCE_STATE_DISK_READ -> getDouble(RESOURCE_STATE_DISK_READ) + RESOURCE_STATE_DISK_WRITE -> getDouble(RESOURCE_STATE_DISK_WRITE) else -> throw IllegalArgumentException("Invalid column") } @@ -132,14 +127,14 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : override fun getBoolean(column: TableColumn): Boolean { return when (column) { - RESOURCE_STATE_POWERED_ON -> state.poweredOn + RESOURCE_STATE_POWERED_ON -> poweredOn else -> throw IllegalArgumentException("Invalid column") } } override fun getInt(column: TableColumn): Int { return when (column) { - RESOURCE_STATE_NCPUS -> state.cpuCores + RESOURCE_STATE_NCPUS -> cpuCores else -> throw IllegalArgumentException("Invalid column") } } @@ -150,12 +145,13 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : override fun getDouble(column: TableColumn): Double { return when (column) { - RESOURCE_STATE_CPU_CAPACITY -> state.cpuCapacity - RESOURCE_STATE_CPU_USAGE -> state.cpuUsage - RESOURCE_STATE_CPU_USAGE_PCT -> state.cpuUsage / state.cpuCapacity - RESOURCE_STATE_MEM_CAPACITY -> state.memCapacity - RESOURCE_STATE_DISK_READ -> state.diskRead - RESOURCE_STATE_DISK_WRITE -> state.diskWrite + RESOURCE_STATE_CPU_CAPACITY -> cpuCapacity + RESOURCE_STATE_CPU_USAGE -> cpuUsage + RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsage / cpuCapacity + RESOURCE_STATE_CPU_DEMAND -> cpuDemand + RESOURCE_STATE_MEM_CAPACITY -> memCapacity + RESOURCE_STATE_DISK_READ -> diskRead + RESOURCE_STATE_DISK_WRITE -> diskWrite else -> throw IllegalArgumentException("Invalid column") } } @@ -165,51 +161,37 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : } /** - * The current row state. + * State fields of the reader. */ - private class RowState { - @JvmField - var id: String? = null - @JvmField - var cluster: String? = null - @JvmField - var timestamp: Instant? = null - @JvmField - var cpuCores = -1 - @JvmField - var cpuCapacity = Double.NaN - @JvmField - var cpuUsage = Double.NaN - @JvmField - var cpuDemand = Double.NaN - @JvmField - var cpuReadyPct = Double.NaN - @JvmField - var memCapacity = Double.NaN - @JvmField - var diskRead = Double.NaN - @JvmField - var diskWrite = Double.NaN - @JvmField - var poweredOn: Boolean = false - - /** - * Reset the state. - */ - fun reset() { - id = null - timestamp = null - cluster = null - cpuCores = -1 - cpuCapacity = Double.NaN - cpuUsage = Double.NaN - cpuDemand = Double.NaN - cpuReadyPct = Double.NaN - memCapacity = Double.NaN - diskRead = Double.NaN - diskWrite = Double.NaN - poweredOn = false - } + private var id: String? = null + private var cluster: String? = null + private var timestamp: Instant? = null + private var cpuCores = -1 + private var cpuCapacity = Double.NaN + private var cpuUsage = Double.NaN + private var cpuDemand = Double.NaN + private var cpuReadyPct = Double.NaN + private var memCapacity = Double.NaN + private var diskRead = Double.NaN + private var diskWrite = Double.NaN + private var poweredOn: Boolean = false + + /** + * Reset the state of the reader. + */ + private fun reset() { + id = null + timestamp = null + cluster = null + cpuCores = -1 + cpuCapacity = Double.NaN + cpuUsage = Double.NaN + cpuDemand = Double.NaN + cpuReadyPct = Double.NaN + memCapacity = Double.NaN + diskRead = Double.NaN + diskWrite = Double.NaN + poweredOn = false } /** -- cgit v1.2.3 From 6489ebe975e9ba842fd7cce1b1136944728bb21d Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 11 Sep 2021 10:54:38 +0200 Subject: fix(capelin): Parse last column in Solvinity trace format This change fixes an issue where the last column in the Solvinity traces is not parsed correctly, due to the last column having no whitespace at the end to seek to. --- .../opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt index a7d2d70a..6ea403fe 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTableReader.kt @@ -65,7 +65,7 @@ internal class SvResourceStateTableReader(private val reader: BufferedReader) : end = line.indexOf(' ', start) if (end < 0) { - break + end = length } val field = line.subSequence(start, end) as String -- cgit v1.2.3 From 9e8ea96270701e643f95b18d2b91583d9fca08d2 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 11 Sep 2021 11:10:38 +0200 Subject: feat(capelin): Implement trace API for Azure VM trace format This change adds a trace API implementation for the Azure VM traces. --- .../opendc-experiments-capelin/build.gradle.kts | 1 + .../experiments/capelin/trace/TraceConverter.kt | 488 +++++++-------------- .../capelin/trace/azure/AzureResourceStateTable.kt | 130 ++++++ .../trace/azure/AzureResourceStateTableReader.kt | 149 +++++++ .../capelin/trace/azure/AzureResourceTable.kt | 57 +++ .../trace/azure/AzureResourceTableReader.kt | 169 +++++++ .../experiments/capelin/trace/azure/AzureTrace.kt | 46 ++ .../capelin/trace/azure/AzureTraceFormat.kt | 56 +++ .../opendc/experiments/capelin/trace/bp/Schemas.kt | 55 +++ 9 files changed, 822 insertions(+), 329 deletions(-) create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt create mode 100644 opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 036d0638..7dadd14d 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -45,6 +45,7 @@ dependencies { implementation(libs.progressbar) implementation(libs.clikt) implementation(libs.jackson.module.kotlin) + implementation(libs.jackson.dataformat.csv) implementation(kotlin("reflect")) implementation(libs.parquet) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt index a021de8d..1f3878eb 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/TraceConverter.kt @@ -25,80 +25,74 @@ package org.opendc.experiments.capelin.trace import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.parameters.arguments.argument import com.github.ajalt.clikt.parameters.groups.OptionGroup -import com.github.ajalt.clikt.parameters.groups.groupChoice +import com.github.ajalt.clikt.parameters.groups.cooccurring import com.github.ajalt.clikt.parameters.options.* -import com.github.ajalt.clikt.parameters.types.file -import com.github.ajalt.clikt.parameters.types.long -import me.tongfei.progressbar.ProgressBar -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder +import com.github.ajalt.clikt.parameters.types.* +import mu.KotlinLogging import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.opendc.experiments.capelin.trace.azure.AzureTraceFormat +import org.opendc.experiments.capelin.trace.bp.BP_RESOURCES_SCHEMA +import org.opendc.experiments.capelin.trace.bp.BP_RESOURCE_STATES_SCHEMA import org.opendc.experiments.capelin.trace.sv.SvTraceFormat import org.opendc.trace.* import org.opendc.trace.bitbrains.BitbrainsTraceFormat -import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.parquet.LocalOutputFile -import java.io.BufferedReader import java.io.File -import java.io.FileReader import java.util.* import kotlin.math.max import kotlin.math.min +import kotlin.math.roundToLong + +/** + * A script to convert a trace in text format into a Parquet trace. + */ +fun main(args: Array): Unit = TraceConverterCli().main(args) /** * Represents the command for converting traces */ class TraceConverterCli : CliktCommand(name = "trace-converter") { + /** + * The logger instance for the converter. + */ + private val logger = KotlinLogging.logger {} + /** * The directory where the trace should be stored. */ - private val outputPath by option("-O", "--output", help = "path to store the trace") + private val output by option("-O", "--output", help = "path to store the trace") .file(canBeFile = false, mustExist = false) .defaultLazy { File("output") } /** * The directory where the input trace is located. */ - private val inputPath by argument("input", help = "path to the input trace") + private val input by argument("input", help = "path to the input trace") .file(canBeFile = false) /** - * The input type of the trace. + * The input format of the trace. */ - private val type by option("-t", "--type", help = "input type of trace").groupChoice( - "solvinity" to SolvinityConversion(), - "bitbrains" to BitbrainsConversion(), - "azure" to AzureConversion() - ) + private val format by option("-f", "--format", help = "input format of trace") + .choice( + "solvinity" to SvTraceFormat(), + "bitbrains" to BitbrainsTraceFormat(), + "azure" to AzureTraceFormat() + ) + .required() + + /** + * The sampling options. + */ + private val samplingOptions by SamplingOptions().cooccurring() override fun run() { - val metaSchema = SchemaBuilder - .record("meta") - .namespace("org.opendc.format.sc20") - .fields() - .name("id").type().stringType().noDefault() - .name("submissionTime").type().longType().noDefault() - .name("endTime").type().longType().noDefault() - .name("maxCores").type().intType().noDefault() - .name("requiredMemory").type().longType().noDefault() - .endRecord() - val schema = SchemaBuilder - .record("trace") - .namespace("org.opendc.format.sc20") - .fields() - .name("id").type().stringType().noDefault() - .name("time").type().longType().noDefault() - .name("duration").type().longType().noDefault() - .name("cores").type().intType().noDefault() - .name("cpuUsage").type().doubleType().noDefault() - .name("flops").type().longType().noDefault() - .endRecord() - - val metaParquet = File(outputPath, "meta.parquet") - val traceParquet = File(outputPath, "trace.parquet") + val metaParquet = File(output, "meta.parquet") + val traceParquet = File(output, "trace.parquet") if (metaParquet.exists()) { metaParquet.delete() @@ -107,324 +101,160 @@ class TraceConverterCli : CliktCommand(name = "trace-converter") { traceParquet.delete() } + val trace = format.open(input.toURI().toURL()) + + logger.info { "Building resources table" } + val metaWriter = AvroParquetWriter.builder(LocalOutputFile(metaParquet)) - .withSchema(metaSchema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .withSchema(BP_RESOURCES_SCHEMA) + .withCompressionCodec(CompressionCodecName.ZSTD) + .enablePageWriteChecksum() .build() + val selectedVms = metaWriter.use { convertResources(trace, it) } + + logger.info { "Wrote ${selectedVms.size} rows" } + logger.info { "Building resource states table" } + val writer = AvroParquetWriter.builder(LocalOutputFile(traceParquet)) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withPageSize(4 * 1024 * 1024) // For compression - .withRowGroupSize(16 * 1024 * 1024) // For write buffering (Page size) + .withSchema(BP_RESOURCE_STATES_SCHEMA) + .withCompressionCodec(CompressionCodecName.ZSTD) + .enableDictionaryEncoding() + .enablePageWriteChecksum() + .withBloomFilterEnabled("id", true) + .withBloomFilterNDV("id", selectedVms.size.toLong()) .build() - try { - val type = type ?: throw IllegalArgumentException("Invalid trace conversion") - val allFragments = type.read(inputPath, metaSchema, metaWriter) - allFragments.sortWith(compareBy { it.tick }.thenBy { it.id }) - - for (fragment in allFragments) { - val record = GenericData.Record(schema) - record.put("id", fragment.id) - record.put("time", fragment.tick) - record.put("duration", fragment.duration) - record.put("cores", fragment.cores) - record.put("cpuUsage", fragment.usage) - record.put("flops", fragment.flops) - - writer.write(record) - } - } finally { - writer.close() - metaWriter.close() - } + val statesCount = writer.use { convertResourceStates(trace, it, selectedVms) } + logger.info { "Wrote $statesCount rows" } } -} -/** - * The supported trace conversions. - */ -sealed class TraceConversion(name: String) : OptionGroup(name) { /** - * Read the fragments of the trace. + * Convert the resources table for the trace. */ - abstract fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter - ): MutableList -} - -/** - * A [TraceConversion] that uses the Trace API to perform the conversion. - */ -abstract class AbstractConversion(name: String) : TraceConversion(name) { - abstract val format: TraceFormat - - override fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter - ): MutableList { - val fragments = mutableListOf() - val trace = format.open(traceDirectory.toURI().toURL()) + private fun convertResources(trace: Trace, writer: ParquetWriter): Set { + val random = samplingOptions?.let { Random(it.seed) } + val samplingFraction = samplingOptions?.fraction ?: 1.0 val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() - var lastId: String? = null - var maxCores = Int.MIN_VALUE - var requiredMemory = Long.MIN_VALUE - var minTime = Long.MAX_VALUE - var maxTime = Long.MIN_VALUE - var lastTimestamp = Long.MIN_VALUE - - while (reader.nextRow()) { - val id = reader.get(RESOURCE_STATE_ID) - - if (lastId != null && lastId != id) { - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", lastId) - metaRecord.put("submissionTime", minTime) - metaRecord.put("endTime", maxTime) - metaRecord.put("maxCores", maxCores) - metaRecord.put("requiredMemory", requiredMemory) - metaWriter.write(metaRecord) - } - lastId = id + var hasNextRow = reader.nextRow() + val selectedVms = mutableSetOf() - val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP) - val timestampMs = timestamp.toEpochMilli() - val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) - val cores = reader.getInt(RESOURCE_STATE_NCPUS) - val memCapacity = reader.getDouble(RESOURCE_STATE_MEM_CAPACITY) + while (hasNextRow) { + var id: String + var numCpus = Int.MIN_VALUE + var memCapacity = Double.MIN_VALUE + var memUsage = Double.MIN_VALUE + var startTime = Long.MAX_VALUE + var stopTime = Long.MIN_VALUE - maxCores = max(maxCores, cores) - requiredMemory = max(requiredMemory, (memCapacity / 1000).toLong()) + do { + id = reader.get(RESOURCE_STATE_ID) - if (lastTimestamp < 0) { - lastTimestamp = timestampMs - 5 * 60 * 1000L - minTime = min(minTime, lastTimestamp) - } + val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() + startTime = min(startTime, timestamp) + stopTime = max(stopTime, timestamp) + + numCpus = max(numCpus, reader.getInt(RESOURCE_STATE_NCPUS)) - if (fragments.isEmpty()) { - val duration = 5 * 60 * 1000L - val flops: Long = (cpuUsage * duration / 1000).toLong() - fragments.add(Fragment(id, lastTimestamp, flops, duration, cpuUsage, cores)) - } else { - val last = fragments.last() - val duration = timestampMs - lastTimestamp - val flops: Long = (cpuUsage * duration / 1000).toLong() - - // Perform run-length encoding - if (last.id == id && (duration == 0L || last.usage == cpuUsage)) { - fragments[fragments.size - 1] = last.copy(duration = last.duration + duration) - } else { - fragments.add( - Fragment( - id, - lastTimestamp, - flops, - duration, - cpuUsage, - cores - ) - ) + memCapacity = max(memCapacity, reader.getDouble(RESOURCE_STATE_MEM_CAPACITY)) + if (reader.hasColumn(RESOURCE_STATE_MEM_USAGE)) { + memUsage = max(memUsage, reader.getDouble(RESOURCE_STATE_MEM_USAGE)) } + + hasNextRow = reader.nextRow() + } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID)) + + // Sample only a fraction of the VMs + if (random != null && random.nextDouble() > samplingFraction) { + continue } - val last = fragments.last() - maxTime = max(maxTime, last.tick + last.duration) - lastTimestamp = timestampMs + val builder = GenericRecordBuilder(BP_RESOURCES_SCHEMA) + + builder["id"] = id + builder["submissionTime"] = startTime + builder["endTime"] = stopTime + builder["maxCores"] = numCpus + builder["requiredMemory"] = max(memCapacity, memUsage).roundToLong() + + logger.info { "Selecting VM $id" } + + writer.write(builder.build()) + selectedVms.add(id) } - return fragments + + return selectedVms } -} -class SolvinityConversion : AbstractConversion("Solvinity") { - override val format: TraceFormat = SvTraceFormat() -} + /** + * Convert the resource states table for the trace. + */ + private fun convertResourceStates(trace: Trace, writer: ParquetWriter, selectedVms: Set): Int { + val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader() -/** - * Conversion of the Bitbrains public trace. - */ -class BitbrainsConversion : AbstractConversion("Bitbrains") { - override val format: TraceFormat = BitbrainsTraceFormat() -} + var hasNextRow = reader.nextRow() + var count = 0 -/** - * Conversion of the Azure public VM trace. - */ -class AzureConversion : TraceConversion("Azure") { - private val seed by option(help = "seed for trace sampling") - .long() - .default(0) - - override fun read( - traceDirectory: File, - metaSchema: Schema, - metaWriter: ParquetWriter - ): MutableList { - val random = Random(seed) - val fraction = 0.01 - - // Read VM table - val vmIdTableCol = 0 - val coreTableCol = 9 - val provisionedMemoryTableCol = 10 - - var vmId: String - var cores: Int - var requiredMemory: Long - - val vmIds = mutableSetOf() - val vmIdToMetadata = mutableMapOf() - - BufferedReader(FileReader(File(traceDirectory, "vmtable.csv"))).use { reader -> - reader.lineSequence() - .chunked(1024) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - // Sample only a fraction of the VMs - if (random.nextDouble() > fraction) { - continue - } - - val values = line.split(",") - - // Exclude VMs with a large number of cores (not specified exactly) - if (values[coreTableCol].contains(">")) { - continue - } - - vmId = values[vmIdTableCol].trim() - cores = values[coreTableCol].trim().toInt() - requiredMemory = values[provisionedMemoryTableCol].trim().toInt() * 1_000L // GB -> MB - - vmIds.add(vmId) - vmIdToMetadata[vmId] = VmInfo(cores, requiredMemory, Long.MAX_VALUE, -1L) - } + while (hasNextRow) { + var lastTimestamp = Long.MIN_VALUE + + do { + val id = reader.get(RESOURCE_STATE_ID) + + if (id !in selectedVms) { + hasNextRow = reader.nextRow() + continue } - } - // Read VM metric reading files - val timestampCol = 0 - val vmIdCol = 1 - val cpuUsageCol = 4 - val traceInterval = 5 * 60 * 1000L - - val vmIdToFragments = mutableMapOf>() - val vmIdToLastFragment = mutableMapOf() - val allFragments = mutableListOf() - - for (i in ProgressBar.wrap((1..195).toList(), "Reading Trace")) { - val readingsFile = File(File(traceDirectory, "readings"), "readings-$i.csv") - var timestamp: Long - var cpuUsage: Double - - BufferedReader(FileReader(readingsFile)).use { reader -> - reader.lineSequence() - .chunked(128) - .forEach { lines -> - for (line in lines) { - // Ignore comments in the trace - if (line.startsWith("#") || line.isBlank()) { - continue - } - - val values = line.split(",") - vmId = values[vmIdCol].trim() - - // Ignore readings for VMs not in the sample - if (!vmIds.contains(vmId)) { - continue - } - - timestamp = values[timestampCol].trim().toLong() * 1000L - vmIdToMetadata[vmId]!!.minTime = min(vmIdToMetadata[vmId]!!.minTime, timestamp) - cpuUsage = values[cpuUsageCol].trim().toDouble() * 3_000 // MHz - vmIdToMetadata[vmId]!!.maxTime = max(vmIdToMetadata[vmId]!!.maxTime, timestamp) - - val flops: Long = (cpuUsage * 5 * 60).toLong() - val lastFragment = vmIdToLastFragment[vmId] - - vmIdToLastFragment[vmId] = - if (lastFragment != null && lastFragment.flops == 0L && flops == 0L) { - Fragment( - vmId, - lastFragment.tick, - lastFragment.flops + flops, - lastFragment.duration + traceInterval, - cpuUsage, - vmIdToMetadata[vmId]!!.cores - ) - } else { - val fragment = - Fragment( - vmId, - timestamp, - flops, - traceInterval, - cpuUsage, - vmIdToMetadata[vmId]!!.cores - ) - if (lastFragment != null) { - if (vmIdToFragments[vmId] == null) { - vmIdToFragments[vmId] = mutableListOf() - } - vmIdToFragments[vmId]!!.add(lastFragment) - allFragments.add(lastFragment) - } - fragment - } - } - } - } - } + val builder = GenericRecordBuilder(BP_RESOURCE_STATES_SCHEMA) + builder["id"] = id - for (entry in vmIdToLastFragment) { - if (entry.value != null) { - if (vmIdToFragments[entry.key] == null) { - vmIdToFragments[entry.key] = mutableListOf() + val timestamp = reader.get(RESOURCE_STATE_TIMESTAMP).toEpochMilli() + if (lastTimestamp < 0) { + lastTimestamp = timestamp - 5 * 60 * 1000L } - vmIdToFragments[entry.key]!!.add(entry.value!!) - } - } - println("Read ${vmIdToLastFragment.size} VMs") - - for (entry in vmIdToMetadata) { - val metaRecord = GenericData.Record(metaSchema) - metaRecord.put("id", entry.key) - metaRecord.put("submissionTime", entry.value.minTime) - metaRecord.put("endTime", entry.value.maxTime) - println("${entry.value.minTime} - ${entry.value.maxTime}") - metaRecord.put("maxCores", entry.value.cores) - metaRecord.put("requiredMemory", entry.value.requiredMemory) - metaWriter.write(metaRecord) - } + val duration = timestamp - lastTimestamp + val cores = reader.getInt(RESOURCE_STATE_NCPUS) + val cpuUsage = reader.getDouble(RESOURCE_STATE_CPU_USAGE) + val flops = (cpuUsage * duration / 1000.0).roundToLong() - return allFragments - } -} + builder["time"] = timestamp + builder["duration"] = duration + builder["cores"] = cores + builder["cpuUsage"] = cpuUsage + builder["flops"] = flops -data class Fragment( - val id: String, - val tick: Long, - val flops: Long, - val duration: Long, - val usage: Double, - val cores: Int -) + writer.write(builder.build()) -class VmInfo(val cores: Int, val requiredMemory: Long, var minTime: Long, var maxTime: Long) + lastTimestamp = timestamp + hasNextRow = reader.nextRow() + } while (hasNextRow && id == reader.get(RESOURCE_STATE_ID)) -/** - * A script to convert a trace in text format into a Parquet trace. - */ -fun main(args: Array): Unit = TraceConverterCli().main(args) + count++ + } + + return count + } + + /** + * Options for sampling the workload trace. + */ + private class SamplingOptions : OptionGroup() { + /** + * The fraction of VMs to sample + */ + val fraction by option("--sampling-fraction", help = "fraction of the workload to sample") + .double() + .restrictTo(0.0001, 1.0) + .required() + + /** + * The seed for sampling the trace. + */ + val seed by option("--sampling-seed", help = "seed for sampling the workload") + .long() + .default(0) + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt new file mode 100644 index 00000000..f8e57d1d --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.azure + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.nio.file.Files +import java.nio.file.Path +import java.util.stream.Collectors +import kotlin.io.path.extension +import kotlin.io.path.nameWithoutExtension + +/** + * The resource state [Table] for the Azure v1 VM traces. + */ +internal class AzureResourceStateTable(private val factory: CsvFactory, path: Path) : Table { + /** + * The partitions that belong to the table. + */ + private val partitions = Files.walk(path, 1) + .filter { !Files.isDirectory(it) && it.extension == "csv" } + .collect(Collectors.toMap({ it.nameWithoutExtension }, { it })) + .toSortedMap() + + override val name: String = TABLE_RESOURCE_STATES + + override val isSynthetic: Boolean = false + + override fun isSupported(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_STATE_ID -> true + RESOURCE_STATE_TIMESTAMP -> true + RESOURCE_STATE_CPU_USAGE_PCT -> true + else -> false + } + } + + override fun newReader(): TableReader { + val it = partitions.iterator() + + return object : TableReader { + var delegate: TableReader? = nextDelegate() + + override fun nextRow(): Boolean { + var delegate = delegate + + while (delegate != null) { + if (delegate.nextRow()) { + break + } + + delegate.close() + delegate = nextDelegate() + } + + this.delegate = delegate + return delegate != null + } + + override fun hasColumn(column: TableColumn<*>): Boolean = delegate?.hasColumn(column) ?: false + + override fun get(column: TableColumn): T { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.get(column) + } + + override fun getBoolean(column: TableColumn): Boolean { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getBoolean(column) + } + + override fun getInt(column: TableColumn): Int { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getInt(column) + } + + override fun getLong(column: TableColumn): Long { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getLong(column) + } + + override fun getDouble(column: TableColumn): Double { + val delegate = checkNotNull(delegate) { "Invalid reader state" } + return delegate.getDouble(column) + } + + override fun close() { + delegate?.close() + } + + private fun nextDelegate(): TableReader? { + return if (it.hasNext()) { + val (_, path) = it.next() + return AzureResourceStateTableReader(factory.createParser(path.toFile())) + } else { + null + } + } + + override fun toString(): String = "AzureCompositeTableReader" + } + } + + override fun newReader(partition: String): TableReader { + val path = requireNotNull(partitions[partition]) { "Invalid partition $partition" } + return AzureResourceStateTableReader(factory.createParser(path.toFile())) + } + + override fun toString(): String = "AzureResourceStateTable" +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt new file mode 100644 index 00000000..f80c0e82 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTableReader.kt @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.azure + +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.dataformat.csv.CsvParser +import com.fasterxml.jackson.dataformat.csv.CsvSchema +import org.opendc.trace.* +import java.time.Instant + +/** + * A [TableReader] for the Azure v1 VM resource state table. + */ +internal class AzureResourceStateTableReader(private val parser: CsvParser) : TableReader { + init { + parser.schema = schema + } + + override fun nextRow(): Boolean { + reset() + + if (!nextStart()) { + return false + } + + while (true) { + val token = parser.nextValue() + + if (token == null || token == JsonToken.END_OBJECT) { + break + } + + when (parser.currentName) { + "timestamp" -> timestamp = Instant.ofEpochSecond(parser.longValue) + "vm id" -> id = parser.text + "avg cpu" -> cpuUsagePct = parser.doubleValue + } + } + + return true + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_STATE_ID -> true + RESOURCE_STATE_TIMESTAMP -> true + RESOURCE_STATE_CPU_USAGE_PCT -> true + else -> false + } + } + + override fun get(column: TableColumn): T { + val res: Any? = when (column) { + RESOURCE_STATE_ID -> id + RESOURCE_STATE_TIMESTAMP -> timestamp + RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn): Int { + throw IllegalArgumentException("Invalid column") + } + + override fun getLong(column: TableColumn): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn): Double { + return when (column) { + RESOURCE_STATE_CPU_USAGE_PCT -> cpuUsagePct + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun close() { + parser.close() + } + + /** + * Advance the parser until the next object start. + */ + private fun nextStart(): Boolean { + var token = parser.nextValue() + + while (token != null && token != JsonToken.START_OBJECT) { + token = parser.nextValue() + } + + return token != null + } + + /** + * State fields of the reader. + */ + private var id: String? = null + private var timestamp: Instant? = null + private var cpuUsagePct = Double.NaN + + /** + * Reset the state. + */ + private fun reset() { + id = null + timestamp = null + cpuUsagePct = Double.NaN + } + + companion object { + /** + * The [CsvSchema] that is used to parse the trace. + */ + private val schema = CsvSchema.builder() + .addColumn("timestamp", CsvSchema.ColumnType.NUMBER) + .addColumn("vm id", CsvSchema.ColumnType.STRING) + .addColumn("CPU min cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU max cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("CPU avg cpu", CsvSchema.ColumnType.NUMBER) + .setAllowComments(true) + .build() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt new file mode 100644 index 00000000..bbfd25ff --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.azure + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.nio.file.Path + +/** + * The resource [Table] for the Azure v1 VM traces. + */ +internal class AzureResourceTable(private val factory: CsvFactory, private val path: Path) : Table { + override val name: String = TABLE_RESOURCES + + override val isSynthetic: Boolean = false + + override fun isSupported(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_ID -> true + RESOURCE_START_TIME -> true + RESOURCE_STOP_TIME -> true + RESOURCE_NCPUS -> true + RESOURCE_MEM_CAPACITY -> true + else -> false + } + } + + override fun newReader(): TableReader { + return AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile())) + } + + override fun newReader(partition: String): TableReader { + throw IllegalArgumentException("No partition $partition") + } + + override fun toString(): String = "AzureResourceTable" +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt new file mode 100644 index 00000000..b712b854 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTableReader.kt @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.azure + +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.dataformat.csv.CsvParser +import com.fasterxml.jackson.dataformat.csv.CsvSchema +import org.apache.parquet.example.Paper.schema +import org.opendc.trace.* +import java.time.Instant + +/** + * A [TableReader] for the Azure v1 VM resources table. + */ +internal class AzureResourceTableReader(private val parser: CsvParser) : TableReader { + init { + parser.schema = schema + } + + override fun nextRow(): Boolean { + reset() + + if (!nextStart()) { + return false + } + + while (true) { + val token = parser.nextValue() + + if (token == null || token == JsonToken.END_OBJECT) { + break + } + + when (parser.currentName) { + "vm id" -> id = parser.text + "vm created" -> startTime = Instant.ofEpochSecond(parser.longValue) + "vm deleted" -> stopTime = Instant.ofEpochSecond(parser.longValue) + "vm virtual core count" -> cpuCores = parser.intValue + "vm memory" -> memCapacity = parser.doubleValue * 1e6 // GB to KB + } + } + + return true + } + + override fun hasColumn(column: TableColumn<*>): Boolean { + return when (column) { + RESOURCE_ID -> true + RESOURCE_START_TIME -> true + RESOURCE_STOP_TIME -> true + RESOURCE_NCPUS -> true + RESOURCE_MEM_CAPACITY -> true + else -> false + } + } + + override fun get(column: TableColumn): T { + val res: Any? = when (column) { + RESOURCE_ID -> id + RESOURCE_START_TIME -> startTime + RESOURCE_STOP_TIME -> stopTime + RESOURCE_NCPUS -> getInt(RESOURCE_STATE_NCPUS) + RESOURCE_MEM_CAPACITY -> getDouble(RESOURCE_STATE_MEM_CAPACITY) + else -> throw IllegalArgumentException("Invalid column") + } + + @Suppress("UNCHECKED_CAST") + return res as T + } + + override fun getBoolean(column: TableColumn): Boolean { + throw IllegalArgumentException("Invalid column") + } + + override fun getInt(column: TableColumn): Int { + return when (column) { + RESOURCE_NCPUS -> cpuCores + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun getLong(column: TableColumn): Long { + throw IllegalArgumentException("Invalid column") + } + + override fun getDouble(column: TableColumn): Double { + return when (column) { + RESOURCE_MEM_CAPACITY -> memCapacity + else -> throw IllegalArgumentException("Invalid column") + } + } + + override fun close() { + parser.close() + } + + /** + * Advance the parser until the next object start. + */ + private fun nextStart(): Boolean { + var token = parser.nextValue() + + while (token != null && token != JsonToken.START_OBJECT) { + token = parser.nextValue() + } + + return token != null + } + + /** + * State fields of the reader. + */ + private var id: String? = null + private var startTime: Instant? = null + private var stopTime: Instant? = null + private var cpuCores = -1 + private var memCapacity = Double.NaN + + /** + * Reset the state. + */ + fun reset() { + id = null + startTime = null + stopTime = null + cpuCores = -1 + memCapacity = Double.NaN + } + + companion object { + /** + * The [CsvSchema] that is used to parse the trace. + */ + private val schema = CsvSchema.builder() + .addColumn("vm id", CsvSchema.ColumnType.NUMBER) + .addColumn("subscription id", CsvSchema.ColumnType.STRING) + .addColumn("deployment id", CsvSchema.ColumnType.NUMBER) + .addColumn("timestamp vm created", CsvSchema.ColumnType.NUMBER) + .addColumn("timestamp vm deleted", CsvSchema.ColumnType.NUMBER) + .addColumn("max cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("avg cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("p95 cpu", CsvSchema.ColumnType.NUMBER) + .addColumn("vm category", CsvSchema.ColumnType.NUMBER) + .addColumn("vm virtual core count", CsvSchema.ColumnType.NUMBER) + .addColumn("vm memory", CsvSchema.ColumnType.NUMBER) + .setAllowComments(true) + .build() + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt new file mode 100644 index 00000000..24c60bab --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTrace.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.azure + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import org.opendc.trace.* +import java.nio.file.Path + +/** + * [Trace] implementation for the Azure v1 VM traces. + */ +class AzureTrace internal constructor(private val factory: CsvFactory, private val path: Path) : Trace { + override val tables: List = listOf(TABLE_RESOURCES, TABLE_RESOURCE_STATES) + + override fun containsTable(name: String): Boolean = name in tables + + override fun getTable(name: String): Table? { + return when (name) { + TABLE_RESOURCES -> AzureResourceTable(factory, path) + TABLE_RESOURCE_STATES -> AzureResourceStateTable(factory, path) + else -> null + } + } + + override fun toString(): String = "AzureTrace[$path]" +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt new file mode 100644 index 00000000..744e43a0 --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureTraceFormat.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.azure + +import com.fasterxml.jackson.dataformat.csv.CsvFactory +import com.fasterxml.jackson.dataformat.csv.CsvParser +import org.opendc.trace.spi.TraceFormat +import java.net.URL +import java.nio.file.Paths +import kotlin.io.path.exists + +/** + * A format implementation for the Azure v1 format. + */ +class AzureTraceFormat : TraceFormat { + /** + * The name of this trace format. + */ + override val name: String = "azure-v1" + + /** + * The [CsvFactory] used to create the parser. + */ + private val factory = CsvFactory() + .enable(CsvParser.Feature.ALLOW_COMMENTS) + .enable(CsvParser.Feature.TRIM_SPACES) + + /** + * Open the trace file. + */ + override fun open(url: URL): AzureTrace { + val path = Paths.get(url.toURI()) + require(path.exists()) { "URL $url does not exist" } + return AzureTrace(factory, path) + } +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt new file mode 100644 index 00000000..7dd8161d --- /dev/null +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/Schemas.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.capelin.trace.bp + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder + +/** + * Schema for the resources table in the trace. + */ +val BP_RESOURCES_SCHEMA: Schema = SchemaBuilder + .record("meta") + .namespace("org.opendc.trace.capelin") + .fields() + .requiredString("id") + .requiredLong("submissionTime") + .requiredLong("endTime") + .requiredInt("maxCores") + .requiredLong("requiredMemory") + .endRecord() + +/** + * Schema for the resource states table in the trace. + */ +val BP_RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder + .record("meta") + .namespace("org.opendc.trace.capelin") + .fields() + .requiredString("id") + .requiredLong("time") + .requiredLong("duration") + .requiredInt("cores") + .requiredDouble("cpuUsage") + .requiredLong("flops") + .endRecord() -- cgit v1.2.3 From 49dd8377c8bfde1e64e411c6a6f921c768b9b53b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 12 Sep 2021 11:22:07 +0200 Subject: refactor(trace): Add API for accessing available table columns This change adds a new API to the Table interface for accessing the table columns that the table supports. This does not necessarily mean that the column will have a value for every row, but that the table format has defined this particular column. --- .../capelin/trace/azure/AzureResourceStateTable.kt | 13 ++++----- .../capelin/trace/azure/AzureResourceTable.kt | 17 +++++------- .../capelin/trace/bp/BPResourceStateTable.kt | 17 +++++------- .../capelin/trace/bp/BPResourceTable.kt | 17 +++++------- .../capelin/trace/sv/SvResourceStateTable.kt | 31 ++++++++++------------ 5 files changed, 40 insertions(+), 55 deletions(-) (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt index f8e57d1d..f98f4b2c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceStateTable.kt @@ -46,14 +46,11 @@ internal class AzureResourceStateTable(private val factory: CsvFactory, path: Pa override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_STATE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_CPU_USAGE_PCT -> true - else -> false - } - } + override val columns: List> = listOf( + RESOURCE_STATE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_CPU_USAGE_PCT + ) override fun newReader(): TableReader { val it = partitions.iterator() diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt index bbfd25ff..c9d4f7eb 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/azure/AzureResourceTable.kt @@ -34,16 +34,13 @@ internal class AzureResourceTable(private val factory: CsvFactory, private val p override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_START_TIME -> true - RESOURCE_STOP_TIME -> true - RESOURCE_NCPUS -> true - RESOURCE_MEM_CAPACITY -> true - else -> false - } - } + override val columns: List> = listOf( + RESOURCE_ID, + RESOURCE_START_TIME, + RESOURCE_STOP_TIME, + RESOURCE_NCPUS, + RESOURCE_MEM_CAPACITY + ) override fun newReader(): TableReader { return AzureResourceTableReader(factory.createParser(path.resolve("vmtable/vmtable.csv").toFile())) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt index 35bfd5ef..f051bf88 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceStateTable.kt @@ -34,16 +34,13 @@ internal class BPResourceStateTable(private val path: Path) : Table { override val name: String = TABLE_RESOURCE_STATES override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_STATE_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_DURATION -> true - RESOURCE_STATE_NCPUS -> true - RESOURCE_STATE_CPU_USAGE -> true - else -> false - } - } + override val columns: List> = listOf( + RESOURCE_STATE_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_DURATION, + RESOURCE_STATE_NCPUS, + RESOURCE_STATE_CPU_USAGE, + ) override fun newReader(): TableReader { val reader = LocalParquetReader(path.resolve("trace.parquet")) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt index bff8c55e..5b0f013f 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/bp/BPResourceTable.kt @@ -34,16 +34,13 @@ internal class BPResourceTable(private val path: Path) : Table { override val name: String = TABLE_RESOURCES override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_ID -> true - RESOURCE_START_TIME -> true - RESOURCE_STOP_TIME -> true - RESOURCE_NCPUS -> true - RESOURCE_MEM_CAPACITY -> true - else -> false - } - } + override val columns: List> = listOf( + RESOURCE_ID, + RESOURCE_START_TIME, + RESOURCE_STOP_TIME, + RESOURCE_NCPUS, + RESOURCE_MEM_CAPACITY + ) override fun newReader(): TableReader { val reader = LocalParquetReader(path.resolve("meta.parquet")) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt index 3a9bda69..67140fe9 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/trace/sv/SvResourceStateTable.kt @@ -46,23 +46,20 @@ internal class SvResourceStateTable(path: Path) : Table { override val isSynthetic: Boolean = false - override fun isSupported(column: TableColumn<*>): Boolean { - return when (column) { - RESOURCE_STATE_ID -> true - RESOURCE_STATE_CLUSTER_ID -> true - RESOURCE_STATE_TIMESTAMP -> true - RESOURCE_STATE_NCPUS -> true - RESOURCE_STATE_CPU_CAPACITY -> true - RESOURCE_STATE_CPU_USAGE -> true - RESOURCE_STATE_CPU_USAGE_PCT -> true - RESOURCE_STATE_CPU_DEMAND -> true - RESOURCE_STATE_CPU_READY_PCT -> true - RESOURCE_STATE_MEM_CAPACITY -> true - RESOURCE_STATE_DISK_READ -> true - RESOURCE_STATE_DISK_WRITE -> true - else -> false - } - } + override val columns: List> = listOf( + RESOURCE_STATE_ID, + RESOURCE_STATE_CLUSTER_ID, + RESOURCE_STATE_TIMESTAMP, + RESOURCE_STATE_NCPUS, + RESOURCE_STATE_CPU_CAPACITY, + RESOURCE_STATE_CPU_USAGE, + RESOURCE_STATE_CPU_USAGE_PCT, + RESOURCE_STATE_CPU_DEMAND, + RESOURCE_STATE_CPU_READY_PCT, + RESOURCE_STATE_MEM_CAPACITY, + RESOURCE_STATE_DISK_READ, + RESOURCE_STATE_DISK_WRITE, + ) override fun newReader(): TableReader { val it = partitions.iterator() -- cgit v1.2.3