summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gradle/libs.versions.toml2
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt4
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt44
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt39
-rw-r--r--opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt118
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt3
6 files changed, 103 insertions, 107 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index a5c0f184..b05af368 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -70,7 +70,7 @@ jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", ver
jackson-module-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" }
jackson-datatype-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" }
jackson-dataformat-csv = { module = "com.fasterxml.jackson.dataformat:jackson-dataformat-csv", version.ref = "jackson" }
-parquet = { module = "org.apache.parquet:parquet-avro", version.ref = "parquet" }
+parquet = { module = "org.apache.parquet:parquet-hadoop", version.ref = "parquet" }
config = { module = "com.typesafe:config", version.ref = "config" }
# Quarkus
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index 155f8cf3..b455a2cf 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -105,11 +105,11 @@ public class OdcVmTraceFormat : TraceFormat {
override fun newReader(path: Path, table: String): TableReader {
return when (table) {
TABLE_RESOURCES -> {
- val reader = LocalParquetReader(path.resolve("meta.parquet"), LocalParquetReader.custom(ResourceReadSupport()))
+ val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport())
OdcVmResourceTableReader(reader)
}
TABLE_RESOURCE_STATES -> {
- val reader = LocalParquetReader(path.resolve("trace.parquet"), LocalParquetReader.custom(ResourceStateReadSupport()))
+ val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport())
OdcVmResourceStateTableReader(reader)
}
TABLE_INTERFERENCE_GROUPS -> {
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt
deleted file mode 100644
index a655d39f..00000000
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-@file:JvmName("AvroUtils")
-package org.opendc.trace.util.avro
-
-import org.apache.avro.LogicalTypes
-import org.apache.avro.Schema
-
-/**
- * Schema for UUID type.
- */
-public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING))
-
-/**
- * Schema for timestamp type.
- */
-public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))
-
-/**
- * Helper function to make a [Schema] field optional.
- */
-public fun Schema.optional(): Schema {
- return Schema.createUnion(Schema.create(Schema.Type.NULL), this)
-}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
index 3e6f19a2..eef83956 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
@@ -22,7 +22,6 @@
package org.opendc.trace.util.parquet
-import org.apache.parquet.avro.AvroParquetReader
import org.apache.parquet.hadoop.ParquetReader
import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.io.InputFile
@@ -38,11 +37,11 @@ import kotlin.io.path.isDirectory
* This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets.
*
* @param path The path to the Parquet file or directory to read.
- * @param factory Function to construct a [ParquetReader] for a local [InputFile].
+ * @param readSupport Helper class to perform conversion from Parquet to [T].
*/
public class LocalParquetReader<out T>(
path: Path,
- private val factory: (InputFile) -> ParquetReader<T> = avro()
+ private val readSupport: ReadSupport<T>
) : AutoCloseable {
/**
* The input files to process.
@@ -64,7 +63,7 @@ public class LocalParquetReader<out T>(
/**
* Construct a [LocalParquetReader] for the specified [file].
*/
- public constructor(file: File) : this(file.toPath())
+ public constructor(file: File, readSupport: ReadSupport<T>) : this(file.toPath(), readSupport)
/**
* Read a single entry in the Parquet file.
@@ -102,7 +101,7 @@ public class LocalParquetReader<out T>(
try {
this.reader = if (filesIterator.hasNext()) {
- factory(filesIterator.next())
+ createReader(filesIterator.next())
} else {
null
}
@@ -112,28 +111,12 @@ public class LocalParquetReader<out T>(
}
}
- public companion object {
- /**
- * A factory for reading Avro Parquet files.
- */
- public fun <T> avro(): (InputFile) -> ParquetReader<T> {
- return { input ->
- AvroParquetReader
- .builder<T>(input)
- .disableCompatibility()
- .build()
- }
- }
-
- /**
- * A factory for reading Parquet files with custom [ReadSupport].
- */
- public fun <T> custom(readSupport: ReadSupport<T>): (InputFile) -> ParquetReader<T> {
- return { input ->
- object : ParquetReader.Builder<T>(input) {
- override fun getReadSupport(): ReadSupport<T> = readSupport
- }.build()
- }
- }
+ /**
+ * Construct a [ParquetReader] for the specified [input] with a custom [ReadSupport].
+ */
+ private fun createReader(input: InputFile): ParquetReader<T> {
+ return object : ParquetReader.Builder<T>(input) {
+ override fun getReadSupport(): ReadSupport<@UnsafeVariance T> = this@LocalParquetReader.readSupport
+ }.build()
}
}
diff --git a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
index 8ef4d1fb..be354319 100644
--- a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
+++ b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
@@ -22,36 +22,81 @@
package org.opendc.trace.util.parquet
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.parquet.avro.AvroParquetReader
-import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.ParquetFileWriter
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.Converter
+import org.apache.parquet.io.api.GroupConverter
+import org.apache.parquet.io.api.PrimitiveConverter
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.Type
+import org.apache.parquet.schema.Types
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
-import java.io.File
import java.nio.file.FileAlreadyExistsException
+import java.nio.file.Files
import java.nio.file.NoSuchFileException
+import java.nio.file.Path
/**
* Test suite for the Parquet helper classes.
*/
internal class ParquetTest {
- private val schema = SchemaBuilder
- .record("test")
- .namespace("org.opendc.format.util")
- .fields()
- .name("field").type().intType().noDefault()
- .endRecord()
+ private lateinit var path: Path
- private lateinit var file: File
+ private val schema = Types.buildMessage()
+ .addField(
+ Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED)
+ .named("field")
+ )
+ .named("test")
+ private val writeSupport = object : WriteSupport<Int>() {
+ lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(schema, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: Int) {
+ val consumer = recordConsumer
+
+ consumer.startMessage()
+ consumer.startField("field", 0)
+ consumer.addInteger(record)
+ consumer.endField("field", 0)
+ consumer.endMessage()
+ }
+ }
+
+ private val readSupport = object : ReadSupport<Int>() {
+ override fun init(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType
+ ): ReadContext = ReadContext(fileSchema)
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext
+ ): RecordMaterializer<Int> = TestRecordMaterializer()
+ }
/**
- * Setup the test
+ * Set up the test
*/
@BeforeEach
fun setUp() {
- file = File.createTempFile("opendc", "parquet")
+ path = Files.createTempFile("opendc", "parquet")
}
/**
@@ -59,7 +104,7 @@ internal class ParquetTest {
*/
@AfterEach
fun tearDown() {
- file.delete()
+ Files.deleteIfExists(path)
}
/**
@@ -68,29 +113,24 @@ internal class ParquetTest {
@Test
fun testSmoke() {
val n = 4
- val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(file))
- .withSchema(schema)
+ val writer = LocalParquetWriter.builder(path, writeSupport)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()
try {
repeat(n) { i ->
- val record = GenericData.Record(schema)
- record.put("field", i)
- writer.write(record)
+ writer.write(i)
}
} finally {
writer.close()
}
- val reader = AvroParquetReader.builder<GenericData.Record>(LocalInputFile(file))
- .build()
-
+ val reader = LocalParquetReader(path, readSupport)
var counter = 0
try {
while (true) {
val record = reader.read() ?: break
- assertEquals(counter++, record.get("field"))
+ assertEquals(counter++, record)
}
} finally {
reader.close()
@@ -105,9 +145,7 @@ internal class ParquetTest {
@Test
fun testOverwrite() {
assertThrows<FileAlreadyExistsException> {
- AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(file))
- .withSchema(schema)
- .build()
+ LocalParquetWriter.builder(path, writeSupport).build()
}
}
@@ -116,10 +154,30 @@ internal class ParquetTest {
*/
@Test
fun testNonExistent() {
- file.delete()
+ Files.deleteIfExists(path)
assertThrows<NoSuchFileException> {
- AvroParquetReader.builder<GenericData.Record>(LocalInputFile(file))
- .build()
+ LocalParquetReader(path, readSupport)
+ }
+ }
+
+ private class TestRecordMaterializer : RecordMaterializer<Int>() {
+ private var current: Int = 0
+ private val fieldConverter = object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ current = value
+ }
+ }
+ private val root = object : GroupConverter() {
+ override fun getConverter(fieldIndex: Int): Converter {
+ require(fieldIndex == 0)
+ return fieldConverter
+ }
+ override fun start() {}
+ override fun end() {}
}
+
+ override fun getCurrentRecord(): Int = current
+
+ override fun getRootConverter(): GroupConverter = root
}
}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
index aae71c58..d6e42c8c 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
@@ -66,8 +66,7 @@ public class WtfTraceFormat : TraceFormat {
override fun newReader(path: Path, table: String): TableReader {
return when (table) {
TABLE_TASKS -> {
- val factory = LocalParquetReader.custom(TaskReadSupport())
- val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), factory)
+ val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport())
WtfTaskTableReader(reader)
}
else -> throw IllegalArgumentException("Table $table not supported")