diff options
Diffstat (limited to 'opendc-trace/opendc-trace-parquet/src/test')
| -rw-r--r-- | opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt | 118 |
1 files changed, 88 insertions, 30 deletions
diff --git a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt index 8ef4d1fb..be354319 100644 --- a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt +++ b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt @@ -22,36 +22,81 @@ package org.opendc.trace.util.parquet -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.avro.AvroParquetWriter +import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.Converter +import org.apache.parquet.io.api.GroupConverter +import org.apache.parquet.io.api.PrimitiveConverter +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Type +import org.apache.parquet.schema.Types import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals -import java.io.File import java.nio.file.FileAlreadyExistsException +import java.nio.file.Files import java.nio.file.NoSuchFileException +import java.nio.file.Path /** * Test suite for the Parquet helper classes. */ internal class ParquetTest { - private val schema = SchemaBuilder - .record("test") - .namespace("org.opendc.format.util") - .fields() - .name("field").type().intType().noDefault() - .endRecord() + private lateinit var path: Path - private lateinit var file: File + private val schema = Types.buildMessage() + .addField( + Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED) + .named("field") + ) + .named("test") + private val writeSupport = object : WriteSupport<Int>() { + lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(schema, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: Int) { + val consumer = recordConsumer + + consumer.startMessage() + consumer.startField("field", 0) + consumer.addInteger(record) + consumer.endField("field", 0) + consumer.endMessage() + } + } + + private val readSupport = object : ReadSupport<Int>() { + override fun init( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType + ): ReadContext = ReadContext(fileSchema) + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType, + readContext: ReadContext + ): RecordMaterializer<Int> = TestRecordMaterializer() + } /** - * Setup the test + * Set up the test */ @BeforeEach fun setUp() { - file = File.createTempFile("opendc", "parquet") + path = Files.createTempFile("opendc", "parquet") } /** @@ -59,7 +104,7 @@ internal class ParquetTest { */ @AfterEach fun tearDown() { - file.delete() + Files.deleteIfExists(path) } /** @@ -68,29 +113,24 @@ internal class ParquetTest { @Test fun testSmoke() { val n = 4 - val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(file)) - .withSchema(schema) + val writer = LocalParquetWriter.builder(path, writeSupport) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build() try { repeat(n) { i -> - val record = GenericData.Record(schema) - record.put("field", i) - writer.write(record) + writer.write(i) } } finally { writer.close() } - val reader = AvroParquetReader.builder<GenericData.Record>(LocalInputFile(file)) - .build() - + val reader = LocalParquetReader(path, readSupport) var counter = 0 try { while (true) { val record = reader.read() ?: break - assertEquals(counter++, record.get("field")) + assertEquals(counter++, record) } } finally { reader.close() @@ -105,9 +145,7 @@ internal class ParquetTest { @Test fun testOverwrite() { assertThrows<FileAlreadyExistsException> { - AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(file)) - .withSchema(schema) - .build() + LocalParquetWriter.builder(path, writeSupport).build() } } @@ -116,10 +154,30 @@ internal class ParquetTest { */ @Test fun testNonExistent() { - file.delete() + Files.deleteIfExists(path) assertThrows<NoSuchFileException> { - AvroParquetReader.builder<GenericData.Record>(LocalInputFile(file)) - .build() + LocalParquetReader(path, readSupport) + } + } + + private class TestRecordMaterializer : RecordMaterializer<Int>() { + private var current: Int = 0 + private val fieldConverter = object : PrimitiveConverter() { + override fun addInt(value: Int) { + current = value + } + } + private val root = object : GroupConverter() { + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return fieldConverter + } + override fun start() {} + override fun end() {} } + + override fun getCurrentRecord(): Int = current + + override fun getRootConverter(): GroupConverter = root } } |
