From ee057033b4c534fdd3e8a9d2320d75035d30f27a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 1 May 2022 21:16:43 +0200 Subject: refactor(trace/parquet): Support custom ReadSupport implementations This change updates the `LocalParquetReader` implementation to support custom `ReadSupport` implementations, so we do not have to rely on the Avro implementation necessarily. --- .../kotlin/org/opendc/trace/util/avro/AvroUtils.kt | 44 ++++++++++++++++++ .../org/opendc/trace/util/parquet/AvroUtils.kt | 44 ------------------ .../trace/util/parquet/LocalParquetReader.kt | 53 ++++++++++++++++------ 3 files changed, 83 insertions(+), 58 deletions(-) create mode 100644 opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt delete mode 100644 opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt (limited to 'opendc-trace/opendc-trace-parquet/src') diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt new file mode 100644 index 00000000..a655d39f --- /dev/null +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("AvroUtils") +package org.opendc.trace.util.avro + +import org.apache.avro.LogicalTypes +import org.apache.avro.Schema + +/** + * Schema for UUID type. + */ +public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) + +/** + * Schema for timestamp type. + */ +public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) + +/** + * Helper function to make a [Schema] field optional. + */ +public fun Schema.optional(): Schema { + return Schema.createUnion(Schema.create(Schema.Type.NULL), this) +} diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt deleted file mode 100644 index 086b900b..00000000 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("AvroUtils") -package org.opendc.trace.util.parquet - -import org.apache.avro.LogicalTypes -import org.apache.avro.Schema - -/** - * Schema for UUID type. - */ -public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) - -/** - * Schema for timestamp type. - */ -public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) - -/** - * Helper function to make a [Schema] field optional. - */ -public fun Schema.optional(): Schema { - return Schema.createUnion(Schema.create(Schema.Type.NULL), this) -} diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt index ef9eaeb3..bb2bb10d 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt @@ -24,6 +24,7 @@ package org.opendc.trace.util.parquet import org.apache.parquet.avro.AvroParquetReader import org.apache.parquet.hadoop.ParquetReader +import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.InputFile import java.io.File import java.io.IOException @@ -32,11 +33,15 @@ import java.nio.file.Path import kotlin.io.path.isDirectory /** - * A helper class to read Parquet files. + * A helper class to read Parquet files from the filesystem. + * + * This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets. * * @param path The path to the Parquet file or directory to read. + * @param factory Function to construct a [ParquetReader] for a local [InputFile]. */ -public class LocalParquetReader(path: Path) : AutoCloseable { +public class LocalParquetReader(path: Path, + private val factory: (InputFile) -> ParquetReader = avro()) : AutoCloseable { /** * The input files to process. */ @@ -93,20 +98,40 @@ public class LocalParquetReader(path: Path) : AutoCloseable { private fun initReader() { reader?.close() - this.reader = if (filesIterator.hasNext()) { - createReader(filesIterator.next()) - } else { - null + try { + this.reader = if (filesIterator.hasNext()) { + factory(filesIterator.next()) + } else { + null + } + } catch (e: Throwable) { + this.reader = null + throw e } } - /** - * Create a Parquet reader for the specified file. - */ - private fun createReader(input: InputFile): ParquetReader { - return AvroParquetReader - .builder(input) - .disableCompatibility() - .build() + public companion object { + /** + * A factory for reading Avro Parquet files. + */ + public fun avro(): (InputFile) -> ParquetReader { + return { input -> + AvroParquetReader + .builder(input) + .disableCompatibility() + .build() + } + } + + /** + * A factory for reading Parquet files with custom [ReadSupport]. + */ + public fun custom(readSupport: ReadSupport): (InputFile) -> ParquetReader { + return { input -> + object : ParquetReader.Builder(input) { + override fun getReadSupport(): ReadSupport = readSupport + }.build() + } + } } } -- cgit v1.2.3 From ea5e79fc77072e6151ee7952581b97e35a2027fb Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 1 May 2022 22:54:08 +0200 Subject: perf(trace/opendc): Read records using low-level API This change updates the OpenDC VM format reader implementation to use the low-level record reading APIs provided by the `parquet-mr` library for improved performance. Previously, we used the `parquet-avro` library to read/write Avro records in Parquet format, but that library carries considerable overhead. --- .../trace/util/parquet/LocalParquetReader.kt | 6 ++- .../trace/util/parquet/LocalParquetWriter.kt | 55 ++++++++++++++++++++++ 2 files changed, 59 insertions(+), 2 deletions(-) create mode 100644 opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt (limited to 'opendc-trace/opendc-trace-parquet/src') diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt index bb2bb10d..3e6f19a2 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt @@ -40,8 +40,10 @@ import kotlin.io.path.isDirectory * @param path The path to the Parquet file or directory to read. * @param factory Function to construct a [ParquetReader] for a local [InputFile]. */ -public class LocalParquetReader(path: Path, - private val factory: (InputFile) -> ParquetReader = avro()) : AutoCloseable { +public class LocalParquetReader( + path: Path, + private val factory: (InputFile) -> ParquetReader = avro() +) : AutoCloseable { /** * The input files to process. */ diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt new file mode 100644 index 00000000..b5eb1deb --- /dev/null +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.util.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.OutputFile +import java.nio.file.Path + +/** + * Helper class for writing Parquet records to local disk. + */ +public class LocalParquetWriter { + /** + * A [ParquetWriter.Builder] implementation supporting custom [OutputFile]s and [WriteSupport] implementations. + */ + public class Builder internal constructor( + output: OutputFile, + private val writeSupport: WriteSupport + ) : ParquetWriter.Builder>(output) { + override fun self(): Builder = this + + override fun getWriteSupport(conf: Configuration): WriteSupport = writeSupport + } + + public companion object { + /** + * Create a [Builder] instance that writes a Parquet file at the specified [path]. + */ + @JvmStatic + public fun builder(path: Path, writeSupport: WriteSupport): Builder = + Builder(LocalOutputFile(path), writeSupport) + } +} -- cgit v1.2.3 From 9411845b3f26536a1e6ea40504e396f19d25a09a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 2 May 2022 11:44:48 +0200 Subject: refactor(trace/parquet): Drop dependency on Avro This change updates the Parquet support library in OpenDC to not rely on Avro, but instead interface directly with Parquet's reading and writing functionality, providing less overhead. --- .../kotlin/org/opendc/trace/util/avro/AvroUtils.kt | 44 -------- .../trace/util/parquet/LocalParquetReader.kt | 39 ++----- .../org/opendc/trace/util/parquet/ParquetTest.kt | 118 +++++++++++++++------ 3 files changed, 99 insertions(+), 102 deletions(-) delete mode 100644 opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt (limited to 'opendc-trace/opendc-trace-parquet/src') diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt deleted file mode 100644 index a655d39f..00000000 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("AvroUtils") -package org.opendc.trace.util.avro - -import org.apache.avro.LogicalTypes -import org.apache.avro.Schema - -/** - * Schema for UUID type. - */ -public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) - -/** - * Schema for timestamp type. - */ -public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) - -/** - * Helper function to make a [Schema] field optional. - */ -public fun Schema.optional(): Schema { - return Schema.createUnion(Schema.create(Schema.Type.NULL), this) -} diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt index 3e6f19a2..eef83956 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt @@ -22,7 +22,6 @@ package org.opendc.trace.util.parquet -import org.apache.parquet.avro.AvroParquetReader import org.apache.parquet.hadoop.ParquetReader import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.InputFile @@ -38,11 +37,11 @@ import kotlin.io.path.isDirectory * This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets. * * @param path The path to the Parquet file or directory to read. - * @param factory Function to construct a [ParquetReader] for a local [InputFile]. + * @param readSupport Helper class to perform conversion from Parquet to [T]. */ public class LocalParquetReader( path: Path, - private val factory: (InputFile) -> ParquetReader = avro() + private val readSupport: ReadSupport ) : AutoCloseable { /** * The input files to process. @@ -64,7 +63,7 @@ public class LocalParquetReader( /** * Construct a [LocalParquetReader] for the specified [file]. */ - public constructor(file: File) : this(file.toPath()) + public constructor(file: File, readSupport: ReadSupport) : this(file.toPath(), readSupport) /** * Read a single entry in the Parquet file. @@ -102,7 +101,7 @@ public class LocalParquetReader( try { this.reader = if (filesIterator.hasNext()) { - factory(filesIterator.next()) + createReader(filesIterator.next()) } else { null } @@ -112,28 +111,12 @@ public class LocalParquetReader( } } - public companion object { - /** - * A factory for reading Avro Parquet files. - */ - public fun avro(): (InputFile) -> ParquetReader { - return { input -> - AvroParquetReader - .builder(input) - .disableCompatibility() - .build() - } - } - - /** - * A factory for reading Parquet files with custom [ReadSupport]. - */ - public fun custom(readSupport: ReadSupport): (InputFile) -> ParquetReader { - return { input -> - object : ParquetReader.Builder(input) { - override fun getReadSupport(): ReadSupport = readSupport - }.build() - } - } + /** + * Construct a [ParquetReader] for the specified [input] with a custom [ReadSupport]. + */ + private fun createReader(input: InputFile): ParquetReader { + return object : ParquetReader.Builder(input) { + override fun getReadSupport(): ReadSupport<@UnsafeVariance T> = this@LocalParquetReader.readSupport + }.build() } } diff --git a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt index 8ef4d1fb..be354319 100644 --- a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt +++ b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt @@ -22,36 +22,81 @@ package org.opendc.trace.util.parquet -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.avro.AvroParquetWriter +import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.Converter +import org.apache.parquet.io.api.GroupConverter +import org.apache.parquet.io.api.PrimitiveConverter +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Type +import org.apache.parquet.schema.Types import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals -import java.io.File import java.nio.file.FileAlreadyExistsException +import java.nio.file.Files import java.nio.file.NoSuchFileException +import java.nio.file.Path /** * Test suite for the Parquet helper classes. */ internal class ParquetTest { - private val schema = SchemaBuilder - .record("test") - .namespace("org.opendc.format.util") - .fields() - .name("field").type().intType().noDefault() - .endRecord() + private lateinit var path: Path - private lateinit var file: File + private val schema = Types.buildMessage() + .addField( + Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED) + .named("field") + ) + .named("test") + private val writeSupport = object : WriteSupport() { + lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(schema, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: Int) { + val consumer = recordConsumer + + consumer.startMessage() + consumer.startField("field", 0) + consumer.addInteger(record) + consumer.endField("field", 0) + consumer.endMessage() + } + } + + private val readSupport = object : ReadSupport() { + override fun init( + configuration: Configuration, + keyValueMetaData: Map, + fileSchema: MessageType + ): ReadContext = ReadContext(fileSchema) + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map, + fileSchema: MessageType, + readContext: ReadContext + ): RecordMaterializer = TestRecordMaterializer() + } /** - * Setup the test + * Set up the test */ @BeforeEach fun setUp() { - file = File.createTempFile("opendc", "parquet") + path = Files.createTempFile("opendc", "parquet") } /** @@ -59,7 +104,7 @@ internal class ParquetTest { */ @AfterEach fun tearDown() { - file.delete() + Files.deleteIfExists(path) } /** @@ -68,29 +113,24 @@ internal class ParquetTest { @Test fun testSmoke() { val n = 4 - val writer = AvroParquetWriter.builder(LocalOutputFile(file)) - .withSchema(schema) + val writer = LocalParquetWriter.builder(path, writeSupport) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build() try { repeat(n) { i -> - val record = GenericData.Record(schema) - record.put("field", i) - writer.write(record) + writer.write(i) } } finally { writer.close() } - val reader = AvroParquetReader.builder(LocalInputFile(file)) - .build() - + val reader = LocalParquetReader(path, readSupport) var counter = 0 try { while (true) { val record = reader.read() ?: break - assertEquals(counter++, record.get("field")) + assertEquals(counter++, record) } } finally { reader.close() @@ -105,9 +145,7 @@ internal class ParquetTest { @Test fun testOverwrite() { assertThrows { - AvroParquetWriter.builder(LocalOutputFile(file)) - .withSchema(schema) - .build() + LocalParquetWriter.builder(path, writeSupport).build() } } @@ -116,10 +154,30 @@ internal class ParquetTest { */ @Test fun testNonExistent() { - file.delete() + Files.deleteIfExists(path) assertThrows { - AvroParquetReader.builder(LocalInputFile(file)) - .build() + LocalParquetReader(path, readSupport) + } + } + + private class TestRecordMaterializer : RecordMaterializer() { + private var current: Int = 0 + private val fieldConverter = object : PrimitiveConverter() { + override fun addInt(value: Int) { + current = value + } + } + private val root = object : GroupConverter() { + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return fieldConverter + } + override fun start() {} + override fun end() {} } + + override fun getCurrentRecord(): Int = current + + override fun getRootConverter(): GroupConverter = root } } -- cgit v1.2.3