diff options
Diffstat (limited to 'opendc-trace')
5 files changed, 102 insertions, 106 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 155f8cf3..b455a2cf 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -105,11 +105,11 @@ public class OdcVmTraceFormat : TraceFormat { override fun newReader(path: Path, table: String): TableReader { return when (table) { TABLE_RESOURCES -> { - val reader = LocalParquetReader(path.resolve("meta.parquet"), LocalParquetReader.custom(ResourceReadSupport())) + val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport()) OdcVmResourceTableReader(reader) } TABLE_RESOURCE_STATES -> { - val reader = LocalParquetReader(path.resolve("trace.parquet"), LocalParquetReader.custom(ResourceStateReadSupport())) + val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport()) OdcVmResourceStateTableReader(reader) } TABLE_INTERFERENCE_GROUPS -> { diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt deleted file mode 100644 index a655d39f..00000000 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -@file:JvmName("AvroUtils") -package org.opendc.trace.util.avro - -import org.apache.avro.LogicalTypes -import org.apache.avro.Schema - -/** - * Schema for UUID type. - */ -public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) - -/** - * Schema for timestamp type. - */ -public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) - -/** - * Helper function to make a [Schema] field optional. - */ -public fun Schema.optional(): Schema { - return Schema.createUnion(Schema.create(Schema.Type.NULL), this) -} diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt index 3e6f19a2..eef83956 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt @@ -22,7 +22,6 @@ package org.opendc.trace.util.parquet -import org.apache.parquet.avro.AvroParquetReader import org.apache.parquet.hadoop.ParquetReader import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.InputFile @@ -38,11 +37,11 @@ import kotlin.io.path.isDirectory * This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets. * * @param path The path to the Parquet file or directory to read. - * @param factory Function to construct a [ParquetReader] for a local [InputFile]. + * @param readSupport Helper class to perform conversion from Parquet to [T]. */ public class LocalParquetReader<out T>( path: Path, - private val factory: (InputFile) -> ParquetReader<T> = avro() + private val readSupport: ReadSupport<T> ) : AutoCloseable { /** * The input files to process. @@ -64,7 +63,7 @@ public class LocalParquetReader<out T>( /** * Construct a [LocalParquetReader] for the specified [file]. */ - public constructor(file: File) : this(file.toPath()) + public constructor(file: File, readSupport: ReadSupport<T>) : this(file.toPath(), readSupport) /** * Read a single entry in the Parquet file. @@ -102,7 +101,7 @@ public class LocalParquetReader<out T>( try { this.reader = if (filesIterator.hasNext()) { - factory(filesIterator.next()) + createReader(filesIterator.next()) } else { null } @@ -112,28 +111,12 @@ public class LocalParquetReader<out T>( } } - public companion object { - /** - * A factory for reading Avro Parquet files. - */ - public fun <T> avro(): (InputFile) -> ParquetReader<T> { - return { input -> - AvroParquetReader - .builder<T>(input) - .disableCompatibility() - .build() - } - } - - /** - * A factory for reading Parquet files with custom [ReadSupport]. - */ - public fun <T> custom(readSupport: ReadSupport<T>): (InputFile) -> ParquetReader<T> { - return { input -> - object : ParquetReader.Builder<T>(input) { - override fun getReadSupport(): ReadSupport<T> = readSupport - }.build() - } - } + /** + * Construct a [ParquetReader] for the specified [input] with a custom [ReadSupport]. + */ + private fun createReader(input: InputFile): ParquetReader<T> { + return object : ParquetReader.Builder<T>(input) { + override fun getReadSupport(): ReadSupport<@UnsafeVariance T> = this@LocalParquetReader.readSupport + }.build() } } diff --git a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt index 8ef4d1fb..be354319 100644 --- a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt +++ b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt @@ -22,36 +22,81 @@ package org.opendc.trace.util.parquet -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.avro.AvroParquetWriter +import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.Converter +import org.apache.parquet.io.api.GroupConverter +import org.apache.parquet.io.api.PrimitiveConverter +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Type +import org.apache.parquet.schema.Types import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals -import java.io.File import java.nio.file.FileAlreadyExistsException +import java.nio.file.Files import java.nio.file.NoSuchFileException +import java.nio.file.Path /** * Test suite for the Parquet helper classes. */ internal class ParquetTest { - private val schema = SchemaBuilder - .record("test") - .namespace("org.opendc.format.util") - .fields() - .name("field").type().intType().noDefault() - .endRecord() + private lateinit var path: Path - private lateinit var file: File + private val schema = Types.buildMessage() + .addField( + Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED) + .named("field") + ) + .named("test") + private val writeSupport = object : WriteSupport<Int>() { + lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(schema, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: Int) { + val consumer = recordConsumer + + consumer.startMessage() + consumer.startField("field", 0) + consumer.addInteger(record) + consumer.endField("field", 0) + consumer.endMessage() + } + } + + private val readSupport = object : ReadSupport<Int>() { + override fun init( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType + ): ReadContext = ReadContext(fileSchema) + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType, + readContext: ReadContext + ): RecordMaterializer<Int> = TestRecordMaterializer() + } /** - * Setup the test + * Set up the test */ @BeforeEach fun setUp() { - file = File.createTempFile("opendc", "parquet") + path = Files.createTempFile("opendc", "parquet") } /** @@ -59,7 +104,7 @@ internal class ParquetTest { */ @AfterEach fun tearDown() { - file.delete() + Files.deleteIfExists(path) } /** @@ -68,29 +113,24 @@ internal class ParquetTest { @Test fun testSmoke() { val n = 4 - val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(file)) - .withSchema(schema) + val writer = LocalParquetWriter.builder(path, writeSupport) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build() try { repeat(n) { i -> - val record = GenericData.Record(schema) - record.put("field", i) - writer.write(record) + writer.write(i) } } finally { writer.close() } - val reader = AvroParquetReader.builder<GenericData.Record>(LocalInputFile(file)) - .build() - + val reader = LocalParquetReader(path, readSupport) var counter = 0 try { while (true) { val record = reader.read() ?: break - assertEquals(counter++, record.get("field")) + assertEquals(counter++, record) } } finally { reader.close() @@ -105,9 +145,7 @@ internal class ParquetTest { @Test fun testOverwrite() { assertThrows<FileAlreadyExistsException> { - AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(file)) - .withSchema(schema) - .build() + LocalParquetWriter.builder(path, writeSupport).build() } } @@ -116,10 +154,30 @@ internal class ParquetTest { */ @Test fun testNonExistent() { - file.delete() + Files.deleteIfExists(path) assertThrows<NoSuchFileException> { - AvroParquetReader.builder<GenericData.Record>(LocalInputFile(file)) - .build() + LocalParquetReader(path, readSupport) + } + } + + private class TestRecordMaterializer : RecordMaterializer<Int>() { + private var current: Int = 0 + private val fieldConverter = object : PrimitiveConverter() { + override fun addInt(value: Int) { + current = value + } + } + private val root = object : GroupConverter() { + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return fieldConverter + } + override fun start() {} + override fun end() {} } + + override fun getCurrentRecord(): Int = current + + override fun getRootConverter(): GroupConverter = root } } diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index aae71c58..d6e42c8c 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -66,8 +66,7 @@ public class WtfTraceFormat : TraceFormat { override fun newReader(path: Path, table: String): TableReader { return when (table) { TABLE_TASKS -> { - val factory = LocalParquetReader.custom(TaskReadSupport()) - val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), factory) + val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport()) WtfTaskTableReader(reader) } else -> throw IllegalArgumentException("Table $table not supported") |
