summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-parquet/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-02 16:06:44 +0200
committerGitHub <noreply@github.com>2022-05-02 16:06:44 +0200
commitc78285f6346236053979aa98113ba9e6d7efb21e (patch)
tree44221b3a39516a235a0b41adf525a79a60abb998 /opendc-trace/opendc-trace-parquet/src/main
parent44ddd27a745f2dfe4b6ffef1b7657d156dd61489 (diff)
parente4d3a8add5388182cf7a12b1099678a0b769b106 (diff)
merge: Add support for SQL via Apache Calcite (#78)
This pull request integrates initial support for SQL queries via Apache Calcite into the OpenDC codebase. Our vision is that users of OpenDC should be able to use SQL queries to access and process most of the experiment data generated by simulations. This pull request moves towards this goal by adding the ability to query workload traces supported by OpenDC using SQL. We also provide a CLI for querying the data in workload traces via `opendc-trace-tools`: ```bash opendc-trace-tools query -i data/bitbrains-small -f opendc-vm "SELECT MAX(cpu_count) FROM resource_states" ``` ## Implementation Notes :hammer_and_pick: * Add Calcite (SQL) integration * Add support for writing via SQL * Add support for writing via SQL * Support custom Parquet ReadSupport implementations * Read records using low-level Parquet API * Do not use Avro when exporting experiment data * Do not use Avro when reading WTF trace * Drop dependency on Avro * Add support for projections ## External Dependencies :four_leaf_clover: * Apache Calcite ## Breaking API Changes :warning: * The existing code for reading Parquet traces using Apache Avro has been removed. * `TraceFormat.newReader` now accepts a nullable `projection` parameter
Diffstat (limited to 'opendc-trace/opendc-trace-parquet/src/main')
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt36
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt (renamed from opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt)41
2 files changed, 49 insertions, 28 deletions
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
index ef9eaeb3..eef83956 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
@@ -22,8 +22,8 @@
package org.opendc.trace.util.parquet
-import org.apache.parquet.avro.AvroParquetReader
import org.apache.parquet.hadoop.ParquetReader
+import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.io.InputFile
import java.io.File
import java.io.IOException
@@ -32,11 +32,17 @@ import java.nio.file.Path
import kotlin.io.path.isDirectory
/**
- * A helper class to read Parquet files.
+ * A helper class to read Parquet files from the filesystem.
+ *
+ * This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets.
*
* @param path The path to the Parquet file or directory to read.
+ * @param readSupport Helper class to perform conversion from Parquet to [T].
*/
-public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
+public class LocalParquetReader<out T>(
+ path: Path,
+ private val readSupport: ReadSupport<T>
+) : AutoCloseable {
/**
* The input files to process.
*/
@@ -57,7 +63,7 @@ public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
/**
* Construct a [LocalParquetReader] for the specified [file].
*/
- public constructor(file: File) : this(file.toPath())
+ public constructor(file: File, readSupport: ReadSupport<T>) : this(file.toPath(), readSupport)
/**
* Read a single entry in the Parquet file.
@@ -93,20 +99,24 @@ public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
private fun initReader() {
reader?.close()
- this.reader = if (filesIterator.hasNext()) {
- createReader(filesIterator.next())
- } else {
- null
+ try {
+ this.reader = if (filesIterator.hasNext()) {
+ createReader(filesIterator.next())
+ } else {
+ null
+ }
+ } catch (e: Throwable) {
+ this.reader = null
+ throw e
}
}
/**
- * Create a Parquet reader for the specified file.
+ * Construct a [ParquetReader] for the specified [input] with a custom [ReadSupport].
*/
private fun createReader(input: InputFile): ParquetReader<T> {
- return AvroParquetReader
- .builder<T>(input)
- .disableCompatibility()
- .build()
+ return object : ParquetReader.Builder<T>(input) {
+ override fun getReadSupport(): ReadSupport<@UnsafeVariance T> = this@LocalParquetReader.readSupport
+ }.build()
}
}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt
index 086b900b..b5eb1deb 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,25 +20,36 @@
* SOFTWARE.
*/
-@file:JvmName("AvroUtils")
package org.opendc.trace.util.parquet
-import org.apache.avro.LogicalTypes
-import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.OutputFile
+import java.nio.file.Path
/**
- * Schema for UUID type.
+ * Helper class for writing Parquet records to local disk.
*/
-public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING))
+public class LocalParquetWriter {
+ /**
+ * A [ParquetWriter.Builder] implementation supporting custom [OutputFile]s and [WriteSupport] implementations.
+ */
+ public class Builder<T> internal constructor(
+ output: OutputFile,
+ private val writeSupport: WriteSupport<T>
+ ) : ParquetWriter.Builder<T, Builder<T>>(output) {
+ override fun self(): Builder<T> = this
-/**
- * Schema for timestamp type.
- */
-public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))
+ override fun getWriteSupport(conf: Configuration): WriteSupport<T> = writeSupport
+ }
-/**
- * Helper function to make a [Schema] field optional.
- */
-public fun Schema.optional(): Schema {
- return Schema.createUnion(Schema.create(Schema.Type.NULL), this)
+ public companion object {
+ /**
+ * Create a [Builder] instance that writes a Parquet file at the specified [path].
+ */
+ @JvmStatic
+ public fun <T> builder(path: Path, writeSupport: WriteSupport<T>): Builder<T> =
+ Builder(LocalOutputFile(path), writeSupport)
+ }
}