From f9ffdfb29a3f08ac11e739494e754c81ef4f5157 Mon Sep 17 00:00:00 2001 From: Alessio Leonardo Tomei <122273875+T0mexX@users.noreply.github.com> Date: Thu, 22 Aug 2024 16:45:22 +0200 Subject: Refactored exporters. Allows output column selection in scenario (#241) (#241) --- opendc-trace/opendc-trace-parquet/build.gradle.kts | 8 + .../opendc/trace/util/parquet/ParquetDataWriter.kt | 134 ++++++++++++++++ .../trace/util/parquet/exporter/ExportColumn.kt | 174 +++++++++++++++++++++ .../parquet/exporter/ExportColumnSerializer.kt | 120 ++++++++++++++ .../trace/util/parquet/exporter/Exportable.kt | 35 +++++ .../opendc/trace/util/parquet/exporter/Exporter.kt | 146 +++++++++++++++++ 6 files changed, 617 insertions(+) create mode 100644 opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/ParquetDataWriter.kt create mode 100644 opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumn.kt create mode 100644 opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumnSerializer.kt create mode 100644 opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exportable.kt create mode 100644 opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exporter.kt (limited to 'opendc-trace') diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts index 4cdd4350..0a0507ef 100644 --- a/opendc-trace/opendc-trace-parquet/build.gradle.kts +++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts @@ -25,9 +25,17 @@ description = "Parquet helpers for traces in OpenDC" // Build configuration plugins { `kotlin-library-conventions` + kotlin("plugin.serialization") version "1.9.22" } dependencies { + // Needed for ParquetDataWriter + implementation(libs.kotlin.logging) + + implementation(projects.opendcCommon) + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1") + implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.0") + // This configuration is necessary for a slim dependency on Apache Parquet api(libs.parquet) { exclude(group = "org.apache.hadoop") diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/ParquetDataWriter.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/ParquetDataWriter.kt new file mode 100644 index 00000000..e4b9a147 --- /dev/null +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/ParquetDataWriter.kt @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.util.parquet + +import mu.KotlinLogging +import org.apache.parquet.column.ParquetProperties +import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import java.io.File +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import kotlin.concurrent.thread + +/** + * A writer that writes data in Parquet format. + * + * @param path The path to the file to write the data to. + * @param writeSupport The [WriteSupport] implementation for converting the records to Parquet format. + */ +public abstract class ParquetDataWriter( + path: File, + private val writeSupport: WriteSupport, + bufferSize: Int = 4096, +) : AutoCloseable { + /** + * The logging instance to use. + */ + private val logger = KotlinLogging.logger {} + + /** + * The queue of records to process. + */ + private val queue: BlockingQueue = ArrayBlockingQueue(bufferSize) + + /** + * An exception to be propagated to the actual writer. + */ + private var exception: Throwable? = null + + /** + * The thread that is responsible for writing the Parquet records. + */ + private val writerThread = + thread(start = false, name = this.toString()) { + val writer = + let { + val builder = + LocalParquetWriter.builder(path.toPath(), writeSupport) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) + .withCompressionCodec(CompressionCodecName.ZSTD) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + buildWriter(builder) + } + + val queue = queue + val buf = mutableListOf() + var shouldStop = false + + try { + while (!shouldStop) { + try { + writer.write(queue.take()) + } catch (e: InterruptedException) { + shouldStop = true + } + + if (queue.drainTo(buf) > 0) { + for (data in buf) { + writer.write(data) + } + buf.clear() + } + } + } catch (e: Throwable) { + logger.error(e) { "Failure in Parquet data writer" } + exception = e + } finally { + writer.close() + } + } + + /** + * Build the [ParquetWriter] used to write the Parquet files. + */ + protected open fun buildWriter(builder: LocalParquetWriter.Builder<@UnsafeVariance T>): ParquetWriter<@UnsafeVariance T> { + return builder.build() + } + + /** + * Write the specified metrics to the database. + */ + public fun write(data: T) { + val exception = exception + if (exception != null) { + throw IllegalStateException("Writer thread failed", exception) + } + + queue.put(data) + } + + /** + * Signal the writer to stop. + */ + override fun close() { + writerThread.interrupt() + writerThread.join() + } + + init { + writerThread.start() + } +} diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumn.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumn.kt new file mode 100644 index 00000000..90e00f4b --- /dev/null +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumn.kt @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.util.parquet.exporter + +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import org.apache.parquet.schema.LogicalTypeAnnotation +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 +import org.apache.parquet.schema.Type +import org.opendc.common.logger.logger +import org.slf4j.Logger +import kotlin.reflect.KClass + +/** + * A column that can be used to build a parquet schema to export [T] records. + * + * See [columnSerializer] for deserialization of this class. + * + * ```kotlin + * class Foo: Exportable { + * ... + * val MY_FIELD = ExportColumn( + * field = Types.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("my_field_name") + * ) { exportable: Foo -> addDouble(exportable.getMyValue()) } + * ``` + * + * @param[field] + * The apache parquet field, it includes information such as: + * - Required (not) + * - Field name + * - [PrimitiveType] (e.g. [INT32], [DOUBLE] etc.) + * - [LogicalTypeAnnotation] (e.g. TIMESTAMP, etc.) + * + * @param[getValue] + * Retrieves the value to be exported from the [Exportable] of [T] passed as param. + * The value returned needs to match the expected [PrimitiveType] defined in the field. + * + * A second type parameter could have been added to the class to enforce the correct type at compile time, + * however it would have added too much complexity to the interface. `ExportColumn` -> `ExportColumn` + * + * @param[regex] The pattern used to determine whether a string refers to this column. + * The default one matches the column name with either underscores or blank + * spaces between words in a case-insensitive manner. + * + * @param[exportableClass] + * The [KClass] of the [Exportable]. Used for intuitive lof messages. This class + * can be instantiated with inline constructor [Companion.invoke] without providing this parameter. + */ +public class ExportColumn + @PublishedApi + internal constructor( + public val field: Type, + @PublishedApi internal val regex: Regex, + @PublishedApi internal val exportableClass: KClass, + internal val getValue: (T) -> Any?, + ) { + /** + * The name of the column (e.g. "timestamp"). + */ + public val name: String by lazy { field.name } + + /** + * The primitive type of the field (e.g. INT32). + */ + public val primitiveTypeName: PrimitiveTypeName by lazy { field.asPrimitiveType().primitiveTypeName } + + init { + // Adds the column among those that can be deserialized. + addField(this) + } + + override fun toString(): String = "[ExportColumn: name=$name, exportable=${exportableClass.simpleName}]" + + public companion object { + @PublishedApi + internal val LOG: Logger by logger() + + /** + * Reified constructor, needed to store [T] class without providing it as parameter. + */ + public inline operator fun invoke( + field: Type, + regex: Regex = Regex("\\s*(?:${field.name}|${field.name.replace('_', ' ')})\\s*", RegexOption.IGNORE_CASE), + noinline getValue: (T) -> Any?, + ): ExportColumn = + ExportColumn( + field = field, + getValue = getValue, + exportableClass = T::class, + regex = regex, + ) + + /** + * All the columns that have been instantiated. They are added in `init` block. + * Keep in mind that in order to deserialize to a column, that column needs to be loaded by the jvm. + */ + @PublishedApi + internal val allColumns: MutableSet> = mutableSetOf() + + @PublishedApi + internal val allColumnsLock: Mutex = Mutex() + + /** + * Function invoked in the `init` block of each [ExportColumn]. + * Adds the column to those that can be deserialized. + */ + private fun addField(column: ExportColumn): Unit = + runBlocking { + allColumnsLock.withLock { allColumns.add(column) } + } + + /** + * @return the [ExportColumn] whose [ExportColumn.regex] matches [columnName] **and** + * whose generic type ([Exportable]) is [T] if any, `null` otherwise. + * + * This method needs to be inlined and reified cause of runtime type erasure + * that does not allow to type check the generic class parameter. + */ + @Suppress("UNCHECKED_CAST") // I do not know why it is needed since the cast is nullable. + @PublishedApi + internal inline fun matchingColOrNull(columnName: String): ExportColumn? = + runBlocking { + val allColumns = allColumnsLock.withLock { allColumns.toSet() } + + allColumns.forEach { column -> + // If it is an ExportColumn of same type. + if (column.exportableClass == T::class) { + // Just a smart cast that always succeeds at runtime cause + // of type erasure but is needed at compile time. + (column as? ExportColumn) + ?.regex + // If fieldName matches the field regex. + ?.matchEntire(columnName) + ?.let { return@runBlocking column } + } + } + + null + } + + /** + * Returns all [ExportColumn]s of type [T] that have been loaded up until now. + */ + @Suppress("UNCHECKED_CAST") + public inline fun getAllLoadedColumns(): List> = + runBlocking { + allColumnsLock.withLock { allColumns.filter { it.exportableClass == T::class } as List> } + } + } + } diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumnSerializer.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumnSerializer.kt new file mode 100644 index 00000000..e07980f9 --- /dev/null +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumnSerializer.kt @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.util.parquet.exporter + +import kotlinx.serialization.KSerializer +import kotlinx.serialization.SerializationException +import kotlinx.serialization.builtins.ListSerializer +import kotlinx.serialization.descriptors.SerialDescriptor +import kotlinx.serialization.descriptors.serialDescriptor +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.encoding.Encoder +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonDecoder +import kotlinx.serialization.json.jsonArray +import org.opendc.common.logger.errAndNull +import org.opendc.common.logger.logger + +/** + * Returns a serializer for [ExportColumn] of [T] based on [ExportColumn.name]. Export columns can be + * deserialized from string values if the string matches a [ExportColumn.regex]. + * + * ###### Note: + * - **In order to deserialize columns, they need to be loaded at runtime**. + * - **The serializer needs the reified type [T], meaning static deserialization + * (e.g. `@Serializable`, `@Serializer`) will not work. The serializer for [ExportColumn] of [T] needs to be retrieved with this method.** + * + * It is assumed the user always know what type of column is needed from deserialization, + * so that column can be encoded only by their name, not including their type (which would be tedious to write in json). + * + * ```kotlin + * // Decode column of Foo + * class Foo: Exportable + * json.decodeFrom(deserializer = columnSerializer(), ) + * + * // Decode a collection of columns of Foo + * json.decodeFrom(deserializer = ListSerializer(columnSerializer()), ) + * ``` + */ +public inline fun columnSerializer(): KSerializer> = + object : KSerializer> { + override val descriptor: SerialDescriptor = serialDescriptor() + + override fun deserialize(decoder: Decoder): ExportColumn { + val strValue = decoder.decodeString().trim('"') + return ExportColumn.matchingColOrNull(strValue) + ?: throw SerializationException( + "unable to deserialize export column '$strValue'." + + "Keep in mind that export columns need to be loaded by the jvm in order to be deserialized", + ) + } + + override fun serialize( + encoder: Encoder, + value: ExportColumn, + ) { + encoder.encodeString(value.name) + } + } + +/** + * Serializer for a [List] of [ExportColumn] of [T], with the peculiarity of + * ignoring unrecognized column names (logging an error when an + * unrecognized column is encountered). + */ +public class ColListSerializer( + private val columnSerializer: KSerializer>, +) : KSerializer>> { + private val listSerializer = ListSerializer(columnSerializer) + override val descriptor: SerialDescriptor = ListSerializer(columnSerializer).descriptor + + /** + * Unrecognized columns are ignored and an error message is logged. + * + * @return the decoded list of [ExportColumn]s (might be empty). + * @throws[SerializationException] if the current element is not a [jsonArray] or its string representation. + */ + override fun deserialize(decoder: Decoder): List> = + (decoder as? JsonDecoder)?.decodeJsonElement()?.jsonArray?.mapNotNull { + try { + Json.decodeFromJsonElement(columnSerializer, it) + } catch (_: Exception) { + LOG.errAndNull("no match found for column $it, ignoring...") + } + } ?: let { + val strValue = decoder.decodeString().trim('"') + // Basically a recursive call with a json decoder instead of the argument decoder. + Json.decodeFromString(strValue) + } + + override fun serialize( + encoder: Encoder, + value: List>, + ) { + listSerializer.serialize(encoder, value) + } + + private companion object { + val LOG by logger() + } +} diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exportable.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exportable.kt new file mode 100644 index 00000000..61e766d0 --- /dev/null +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exportable.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.util.parquet.exporter + +/** + * Classes that implement this interface can be exported + * as records in a parquet file through an [Exporter]. + */ +public interface Exportable { + public companion object { + public inline fun getAllLoadedColumns(): List> { + return ExportColumn.getAllLoadedColumns() + } + } +} diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exporter.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exporter.kt new file mode 100644 index 00000000..05f36530 --- /dev/null +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exporter.kt @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.util.parquet.exporter + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32 +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64 +import org.apache.parquet.schema.Types +import org.opendc.trace.util.parquet.ParquetDataWriter +import java.io.File + +public class Exporter + @PublishedApi + internal constructor( + outputFile: File, + writeSupp: WriteSupport, + bufferSize: Int, + ) : ParquetDataWriter( + path = outputFile, + writeSupport = writeSupp, + bufferSize = bufferSize, + ) { + public companion object { + /** + * Reified constructor that allows to use the runtime [Class.getSimpleName] name of [T] as the schema name. + * @param[outputFile] the output file where the [Exportable]s will be written. + * @param[columns] the columns that will be included in the output parquet file. + * @param[schemaName] the name of the schema of the output parquet file. + */ + public inline operator fun invoke( + outputFile: File, + vararg columns: ExportColumn = emptyArray(), + schemaName: String? = null, + bufferSize: Int = 4096, + ): Exporter = + Exporter( + outputFile = outputFile, + writeSupp = writeSuppFor(columns.toSet(), schemaName = schemaName ?: T::class.simpleName ?: "unknown"), + bufferSize = bufferSize, + ) + + /** + * Reified constructor that allows to use the runtime [Class.getSimpleName] name of [T] as the schema name. + * @param[outputFile] the output file where the [Exportable]s will be written. + * @param[columns] the columns that will be included in the output parquet file. + * @param[schemaName] the name of the schema of the output parquet file. + */ + public inline operator fun invoke( + outputFile: File, + columns: Collection> = emptySet(), + schemaName: String? = null, + bufferSize: Int = 4096, + ): Exporter = + Exporter( + outputFile = outputFile, + writeSupp = writeSuppFor(columns.toSet(), schemaName = schemaName ?: T::class.simpleName ?: "unknown"), + bufferSize = bufferSize, + ) + + /** + * @return an anonymous [WriteSupport] for [T] with only the columns included in [columns]. + */ + @PublishedApi + internal fun writeSuppFor( + columns: Set>, + schemaName: String, + ): WriteSupport = + object : WriteSupport() { + private lateinit var cons: RecordConsumer + + private val schema: MessageType = + Types + .buildMessage() + .addFields(*columns.map { it.field }.toTypedArray()) + .named(schemaName) + + override fun init(configuration: Configuration): WriteContext = WriteContext(schema, emptyMap()) + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + cons = recordConsumer + } + + override fun write(record: T) = + with(cons) { + startMessage() + + columns.forEachIndexed { idx, column -> + fun Any.castedOrThrow(): T { + @Suppress("UNCHECKED_CAST") + return (this as? T) ?: throw TypeCastException( + "attempt to add value of type ${this::class} to export " + + "field $column which requires a different type", + ) + } + val valueToAdd: Any = + column.getValue( + record, + ) ?: return@forEachIndexed // Maybe add explicit check for optional fields + + startField(column.name, idx) + when (column.primitiveTypeName) { + INT32 -> addInteger(valueToAdd.castedOrThrow()) + INT64 -> addLong(valueToAdd.castedOrThrow()) + DOUBLE -> addDouble(valueToAdd.castedOrThrow()) + BINARY -> addBinary(valueToAdd.castedOrThrow()) + FLOAT -> addFloat(valueToAdd.castedOrThrow()) + BOOLEAN -> addBoolean(valueToAdd.castedOrThrow()) + else -> throw RuntimeException( + "parquet primitive type name '${column.primitiveTypeName} is not supported", + ) + } + endField(column.name, idx) + } + + cons.endMessage() + } + } + } + } -- cgit v1.2.3