summaryrefslogtreecommitdiff
path: root/opendc-trace
diff options
context:
space:
mode:
authorAlessio Leonardo Tomei <122273875+T0mexX@users.noreply.github.com>2024-08-22 16:45:22 +0200
committerGitHub <noreply@github.com>2024-08-22 16:45:22 +0200
commitf9ffdfb29a3f08ac11e739494e754c81ef4f5157 (patch)
treeabc65714427e2738e5278032230ba60a9b6f0a28 /opendc-trace
parent4f98fb2bf8204f6af52cd6eeb3313d21c6ca95bc (diff)
Refactored exporters. Allows output column selection in scenario (#241) (#241)
Diffstat (limited to 'opendc-trace')
-rw-r--r--opendc-trace/opendc-trace-parquet/build.gradle.kts8
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/ParquetDataWriter.kt134
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumn.kt174
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumnSerializer.kt120
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exportable.kt35
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exporter.kt146
6 files changed, 617 insertions, 0 deletions
diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts
index 4cdd4350..0a0507ef 100644
--- a/opendc-trace/opendc-trace-parquet/build.gradle.kts
+++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts
@@ -25,9 +25,17 @@ description = "Parquet helpers for traces in OpenDC"
// Build configuration
plugins {
`kotlin-library-conventions`
+ kotlin("plugin.serialization") version "1.9.22"
}
dependencies {
+ // Needed for ParquetDataWriter
+ implementation(libs.kotlin.logging)
+
+ implementation(projects.opendcCommon)
+ implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1")
+ implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.0")
+
// This configuration is necessary for a slim dependency on Apache Parquet
api(libs.parquet) {
exclude(group = "org.apache.hadoop")
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/ParquetDataWriter.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/ParquetDataWriter.kt
new file mode 100644
index 00000000..e4b9a147
--- /dev/null
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/ParquetDataWriter.kt
@@ -0,0 +1,134 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.util.parquet
+
+import mu.KotlinLogging
+import org.apache.parquet.column.ParquetProperties
+import org.apache.parquet.hadoop.ParquetFileWriter
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import java.io.File
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.BlockingQueue
+import kotlin.concurrent.thread
+
+/**
+ * A writer that writes data in Parquet format.
+ *
+ * @param path The path to the file to write the data to.
+ * @param writeSupport The [WriteSupport] implementation for converting the records to Parquet format.
+ */
+public abstract class ParquetDataWriter<in T>(
+ path: File,
+ private val writeSupport: WriteSupport<T>,
+ bufferSize: Int = 4096,
+) : AutoCloseable {
+ /**
+ * The logging instance to use.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The queue of records to process.
+ */
+ private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize)
+
+ /**
+ * An exception to be propagated to the actual writer.
+ */
+ private var exception: Throwable? = null
+
+ /**
+ * The thread that is responsible for writing the Parquet records.
+ */
+ private val writerThread =
+ thread(start = false, name = this.toString()) {
+ val writer =
+ let {
+ val builder =
+ LocalParquetWriter.builder(path.toPath(), writeSupport)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
+ .withCompressionCodec(CompressionCodecName.ZSTD)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ buildWriter(builder)
+ }
+
+ val queue = queue
+ val buf = mutableListOf<T>()
+ var shouldStop = false
+
+ try {
+ while (!shouldStop) {
+ try {
+ writer.write(queue.take())
+ } catch (e: InterruptedException) {
+ shouldStop = true
+ }
+
+ if (queue.drainTo(buf) > 0) {
+ for (data in buf) {
+ writer.write(data)
+ }
+ buf.clear()
+ }
+ }
+ } catch (e: Throwable) {
+ logger.error(e) { "Failure in Parquet data writer" }
+ exception = e
+ } finally {
+ writer.close()
+ }
+ }
+
+ /**
+ * Build the [ParquetWriter] used to write the Parquet files.
+ */
+ protected open fun buildWriter(builder: LocalParquetWriter.Builder<@UnsafeVariance T>): ParquetWriter<@UnsafeVariance T> {
+ return builder.build()
+ }
+
+ /**
+ * Write the specified metrics to the database.
+ */
+ public fun write(data: T) {
+ val exception = exception
+ if (exception != null) {
+ throw IllegalStateException("Writer thread failed", exception)
+ }
+
+ queue.put(data)
+ }
+
+ /**
+ * Signal the writer to stop.
+ */
+ override fun close() {
+ writerThread.interrupt()
+ writerThread.join()
+ }
+
+ init {
+ writerThread.start()
+ }
+}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumn.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumn.kt
new file mode 100644
index 00000000..90e00f4b
--- /dev/null
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumn.kt
@@ -0,0 +1,174 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.util.parquet.exporter
+
+import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.sync.Mutex
+import kotlinx.coroutines.sync.withLock
+import org.apache.parquet.schema.LogicalTypeAnnotation
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32
+import org.apache.parquet.schema.Type
+import org.opendc.common.logger.logger
+import org.slf4j.Logger
+import kotlin.reflect.KClass
+
+/**
+ * A column that can be used to build a parquet schema to export [T] records.
+ *
+ * See [columnSerializer] for deserialization of this class.
+ *
+ * ```kotlin
+ * class Foo: Exportable<Foo> {
+ * ...
+ * val MY_FIELD = ExportColumn<Foo>(
+ * field = Types.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named("my_field_name")
+ * ) { exportable: Foo -> addDouble(exportable.getMyValue()) }
+ * ```
+ *
+ * @param[field]
+ * The apache parquet field, it includes information such as:
+ * - Required (not)
+ * - Field name
+ * - [PrimitiveType] (e.g. [INT32], [DOUBLE] etc.)
+ * - [LogicalTypeAnnotation] (e.g. TIMESTAMP, etc.)
+ *
+ * @param[getValue]
+ * Retrieves the value to be exported from the [Exportable] of [T] passed as param.
+ * The value returned needs to match the expected [PrimitiveType] defined in the field.
+ *
+ * A second type parameter could have been added to the class to enforce the correct type at compile time,
+ * however it would have added too much complexity to the interface. `ExportColumn<Exportable>` -> `ExportColumn<Exportable, *>`
+ *
+ * @param[regex] The pattern used to determine whether a string refers to this column.
+ * The default one matches the column name with either underscores or blank
+ * spaces between words in a case-insensitive manner.
+ *
+ * @param[exportableClass]
+ * The [KClass] of the [Exportable]. Used for intuitive lof messages. This class
+ * can be instantiated with inline constructor [Companion.invoke] without providing this parameter.
+ */
+public class ExportColumn<T : Exportable>
+ @PublishedApi
+ internal constructor(
+ public val field: Type,
+ @PublishedApi internal val regex: Regex,
+ @PublishedApi internal val exportableClass: KClass<T>,
+ internal val getValue: (T) -> Any?,
+ ) {
+ /**
+ * The name of the column (e.g. "timestamp").
+ */
+ public val name: String by lazy { field.name }
+
+ /**
+ * The primitive type of the field (e.g. INT32).
+ */
+ public val primitiveTypeName: PrimitiveTypeName by lazy { field.asPrimitiveType().primitiveTypeName }
+
+ init {
+ // Adds the column among those that can be deserialized.
+ addField(this)
+ }
+
+ override fun toString(): String = "[ExportColumn: name=$name, exportable=${exportableClass.simpleName}]"
+
+ public companion object {
+ @PublishedApi
+ internal val LOG: Logger by logger()
+
+ /**
+ * Reified constructor, needed to store [T] class without providing it as parameter.
+ */
+ public inline operator fun <reified T : Exportable> invoke(
+ field: Type,
+ regex: Regex = Regex("\\s*(?:${field.name}|${field.name.replace('_', ' ')})\\s*", RegexOption.IGNORE_CASE),
+ noinline getValue: (T) -> Any?,
+ ): ExportColumn<T> =
+ ExportColumn(
+ field = field,
+ getValue = getValue,
+ exportableClass = T::class,
+ regex = regex,
+ )
+
+ /**
+ * All the columns that have been instantiated. They are added in `init` block.
+ * Keep in mind that in order to deserialize to a column, that column needs to be loaded by the jvm.
+ */
+ @PublishedApi
+ internal val allColumns: MutableSet<ExportColumn<*>> = mutableSetOf()
+
+ @PublishedApi
+ internal val allColumnsLock: Mutex = Mutex()
+
+ /**
+ * Function invoked in the `init` block of each [ExportColumn].
+ * Adds the column to those that can be deserialized.
+ */
+ private fun <T : Exportable> addField(column: ExportColumn<T>): Unit =
+ runBlocking {
+ allColumnsLock.withLock { allColumns.add(column) }
+ }
+
+ /**
+ * @return the [ExportColumn] whose [ExportColumn.regex] matches [columnName] **and**
+ * whose generic type ([Exportable]) is [T] if any, `null` otherwise.
+ *
+ * This method needs to be inlined and reified cause of runtime type erasure
+ * that does not allow to type check the generic class parameter.
+ */
+ @Suppress("UNCHECKED_CAST") // I do not know why it is needed since the cast is nullable.
+ @PublishedApi
+ internal inline fun <reified T : Exportable> matchingColOrNull(columnName: String): ExportColumn<T>? =
+ runBlocking {
+ val allColumns = allColumnsLock.withLock { allColumns.toSet() }
+
+ allColumns.forEach { column ->
+ // If it is an ExportColumn of same type.
+ if (column.exportableClass == T::class) {
+ // Just a smart cast that always succeeds at runtime cause
+ // of type erasure but is needed at compile time.
+ (column as? ExportColumn<T>)
+ ?.regex
+ // If fieldName matches the field regex.
+ ?.matchEntire(columnName)
+ ?.let { return@runBlocking column }
+ }
+ }
+
+ null
+ }
+
+ /**
+ * Returns all [ExportColumn]s of type [T] that have been loaded up until now.
+ */
+ @Suppress("UNCHECKED_CAST")
+ public inline fun <reified T : Exportable> getAllLoadedColumns(): List<ExportColumn<T>> =
+ runBlocking {
+ allColumnsLock.withLock { allColumns.filter { it.exportableClass == T::class } as List<ExportColumn<T>> }
+ }
+ }
+ }
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumnSerializer.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumnSerializer.kt
new file mode 100644
index 00000000..e07980f9
--- /dev/null
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/ExportColumnSerializer.kt
@@ -0,0 +1,120 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.util.parquet.exporter
+
+import kotlinx.serialization.KSerializer
+import kotlinx.serialization.SerializationException
+import kotlinx.serialization.builtins.ListSerializer
+import kotlinx.serialization.descriptors.SerialDescriptor
+import kotlinx.serialization.descriptors.serialDescriptor
+import kotlinx.serialization.encoding.Decoder
+import kotlinx.serialization.encoding.Encoder
+import kotlinx.serialization.json.Json
+import kotlinx.serialization.json.JsonDecoder
+import kotlinx.serialization.json.jsonArray
+import org.opendc.common.logger.errAndNull
+import org.opendc.common.logger.logger
+
+/**
+ * Returns a serializer for [ExportColumn] of [T] based on [ExportColumn.name]. Export columns can be
+ * deserialized from string values if the string matches a [ExportColumn.regex].
+ *
+ * ###### Note:
+ * - **In order to deserialize columns, they need to be loaded at runtime**.
+ * - **The serializer needs the reified type [T], meaning static deserialization
+ * (e.g. `@Serializable`, `@Serializer`) will not work. The serializer for [ExportColumn] of [T] needs to be retrieved with this method.**
+ *
+ * It is assumed the user always know what type of column is needed from deserialization,
+ * so that column can be encoded only by their name, not including their type (which would be tedious to write in json).
+ *
+ * ```kotlin
+ * // Decode column of Foo
+ * class Foo: Exportable
+ * json.decodeFrom<smth>(deserializer = columnSerializer<Foo>(), <smth>)
+ *
+ * // Decode a collection of columns of Foo
+ * json.decodeFrom<smth>(deserializer = ListSerializer(columnSerializer<Foo>()), <smth>)
+ * ```
+ */
+public inline fun <reified T : Exportable> columnSerializer(): KSerializer<ExportColumn<T>> =
+ object : KSerializer<ExportColumn<T>> {
+ override val descriptor: SerialDescriptor = serialDescriptor<String>()
+
+ override fun deserialize(decoder: Decoder): ExportColumn<T> {
+ val strValue = decoder.decodeString().trim('"')
+ return ExportColumn.matchingColOrNull<T>(strValue)
+ ?: throw SerializationException(
+ "unable to deserialize export column '$strValue'." +
+ "Keep in mind that export columns need to be loaded by the jvm in order to be deserialized",
+ )
+ }
+
+ override fun serialize(
+ encoder: Encoder,
+ value: ExportColumn<T>,
+ ) {
+ encoder.encodeString(value.name)
+ }
+ }
+
+/**
+ * Serializer for a [List] of [ExportColumn] of [T], with the peculiarity of
+ * ignoring unrecognized column names (logging an error when an
+ * unrecognized column is encountered).
+ */
+public class ColListSerializer<T : Exportable>(
+ private val columnSerializer: KSerializer<ExportColumn<T>>,
+) : KSerializer<List<ExportColumn<T>>> {
+ private val listSerializer = ListSerializer(columnSerializer)
+ override val descriptor: SerialDescriptor = ListSerializer(columnSerializer).descriptor
+
+ /**
+ * Unrecognized columns are ignored and an error message is logged.
+ *
+ * @return the decoded list of [ExportColumn]s (might be empty).
+ * @throws[SerializationException] if the current element is not a [jsonArray] or its string representation.
+ */
+ override fun deserialize(decoder: Decoder): List<ExportColumn<T>> =
+ (decoder as? JsonDecoder)?.decodeJsonElement()?.jsonArray?.mapNotNull {
+ try {
+ Json.decodeFromJsonElement(columnSerializer, it)
+ } catch (_: Exception) {
+ LOG.errAndNull("no match found for column $it, ignoring...")
+ }
+ } ?: let {
+ val strValue = decoder.decodeString().trim('"')
+ // Basically a recursive call with a json decoder instead of the argument decoder.
+ Json.decodeFromString(strValue)
+ }
+
+ override fun serialize(
+ encoder: Encoder,
+ value: List<ExportColumn<T>>,
+ ) {
+ listSerializer.serialize(encoder, value)
+ }
+
+ private companion object {
+ val LOG by logger()
+ }
+}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exportable.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exportable.kt
new file mode 100644
index 00000000..61e766d0
--- /dev/null
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exportable.kt
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.util.parquet.exporter
+
+/**
+ * Classes that implement this interface can be exported
+ * as records in a parquet file through an [Exporter].
+ */
+public interface Exportable {
+ public companion object {
+ public inline fun <reified T : Exportable> getAllLoadedColumns(): List<ExportColumn<T>> {
+ return ExportColumn.getAllLoadedColumns()
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exporter.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exporter.kt
new file mode 100644
index 00000000..05f36530
--- /dev/null
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/exporter/Exporter.kt
@@ -0,0 +1,146 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.util.parquet.exporter
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64
+import org.apache.parquet.schema.Types
+import org.opendc.trace.util.parquet.ParquetDataWriter
+import java.io.File
+
+public class Exporter<T : Exportable>
+ @PublishedApi
+ internal constructor(
+ outputFile: File,
+ writeSupp: WriteSupport<T>,
+ bufferSize: Int,
+ ) : ParquetDataWriter<T>(
+ path = outputFile,
+ writeSupport = writeSupp,
+ bufferSize = bufferSize,
+ ) {
+ public companion object {
+ /**
+ * Reified constructor that allows to use the runtime [Class.getSimpleName] name of [T] as the schema name.
+ * @param[outputFile] the output file where the [Exportable]s will be written.
+ * @param[columns] the columns that will be included in the output parquet file.
+ * @param[schemaName] the name of the schema of the output parquet file.
+ */
+ public inline operator fun <reified T : Exportable> invoke(
+ outputFile: File,
+ vararg columns: ExportColumn<T> = emptyArray(),
+ schemaName: String? = null,
+ bufferSize: Int = 4096,
+ ): Exporter<T> =
+ Exporter(
+ outputFile = outputFile,
+ writeSupp = writeSuppFor(columns.toSet(), schemaName = schemaName ?: T::class.simpleName ?: "unknown"),
+ bufferSize = bufferSize,
+ )
+
+ /**
+ * Reified constructor that allows to use the runtime [Class.getSimpleName] name of [T] as the schema name.
+ * @param[outputFile] the output file where the [Exportable]s will be written.
+ * @param[columns] the columns that will be included in the output parquet file.
+ * @param[schemaName] the name of the schema of the output parquet file.
+ */
+ public inline operator fun <reified T : Exportable> invoke(
+ outputFile: File,
+ columns: Collection<ExportColumn<T>> = emptySet(),
+ schemaName: String? = null,
+ bufferSize: Int = 4096,
+ ): Exporter<T> =
+ Exporter(
+ outputFile = outputFile,
+ writeSupp = writeSuppFor(columns.toSet(), schemaName = schemaName ?: T::class.simpleName ?: "unknown"),
+ bufferSize = bufferSize,
+ )
+
+ /**
+ * @return an anonymous [WriteSupport] for [T] with only the columns included in [columns].
+ */
+ @PublishedApi
+ internal fun <T : Exportable> writeSuppFor(
+ columns: Set<ExportColumn<T>>,
+ schemaName: String,
+ ): WriteSupport<T> =
+ object : WriteSupport<T>() {
+ private lateinit var cons: RecordConsumer
+
+ private val schema: MessageType =
+ Types
+ .buildMessage()
+ .addFields(*columns.map { it.field }.toTypedArray())
+ .named(schemaName)
+
+ override fun init(configuration: Configuration): WriteContext = WriteContext(schema, emptyMap())
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ cons = recordConsumer
+ }
+
+ override fun write(record: T) =
+ with(cons) {
+ startMessage()
+
+ columns.forEachIndexed { idx, column ->
+ fun <T> Any.castedOrThrow(): T {
+ @Suppress("UNCHECKED_CAST")
+ return (this as? T) ?: throw TypeCastException(
+ "attempt to add value of type ${this::class} to export " +
+ "field $column which requires a different type",
+ )
+ }
+ val valueToAdd: Any =
+ column.getValue(
+ record,
+ ) ?: return@forEachIndexed // Maybe add explicit check for optional fields
+
+ startField(column.name, idx)
+ when (column.primitiveTypeName) {
+ INT32 -> addInteger(valueToAdd.castedOrThrow())
+ INT64 -> addLong(valueToAdd.castedOrThrow())
+ DOUBLE -> addDouble(valueToAdd.castedOrThrow())
+ BINARY -> addBinary(valueToAdd.castedOrThrow())
+ FLOAT -> addFloat(valueToAdd.castedOrThrow())
+ BOOLEAN -> addBoolean(valueToAdd.castedOrThrow())
+ else -> throw RuntimeException(
+ "parquet primitive type name '${column.primitiveTypeName} is not supported",
+ )
+ }
+ endField(column.name, idx)
+ }
+
+ cons.endMessage()
+ }
+ }
+ }
+ }