summaryrefslogtreecommitdiff
path: root/opendc-trace
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-02 11:44:48 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-02 15:37:03 +0200
commit9411845b3f26536a1e6ea40504e396f19d25a09a (patch)
treeeccf44c25f0074a40b42797c2a3147f213ddab86 /opendc-trace
parentae0b12987dca93c05e44341963511ac8cf802793 (diff)
refactor(trace/parquet): Drop dependency on Avro
This change updates the Parquet support library in OpenDC to not rely on Avro, but instead interface directly with Parquet's reading and writing functionality, providing less overhead.
Diffstat (limited to 'opendc-trace')
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt4
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt44
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt39
-rw-r--r--opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt118
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt3
5 files changed, 102 insertions, 106 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index 155f8cf3..b455a2cf 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -105,11 +105,11 @@ public class OdcVmTraceFormat : TraceFormat {
override fun newReader(path: Path, table: String): TableReader {
return when (table) {
TABLE_RESOURCES -> {
- val reader = LocalParquetReader(path.resolve("meta.parquet"), LocalParquetReader.custom(ResourceReadSupport()))
+ val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport())
OdcVmResourceTableReader(reader)
}
TABLE_RESOURCE_STATES -> {
- val reader = LocalParquetReader(path.resolve("trace.parquet"), LocalParquetReader.custom(ResourceStateReadSupport()))
+ val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport())
OdcVmResourceStateTableReader(reader)
}
TABLE_INTERFERENCE_GROUPS -> {
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt
deleted file mode 100644
index a655d39f..00000000
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/avro/AvroUtils.kt
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-@file:JvmName("AvroUtils")
-package org.opendc.trace.util.avro
-
-import org.apache.avro.LogicalTypes
-import org.apache.avro.Schema
-
-/**
- * Schema for UUID type.
- */
-public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING))
-
-/**
- * Schema for timestamp type.
- */
-public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))
-
-/**
- * Helper function to make a [Schema] field optional.
- */
-public fun Schema.optional(): Schema {
- return Schema.createUnion(Schema.create(Schema.Type.NULL), this)
-}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
index 3e6f19a2..eef83956 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
@@ -22,7 +22,6 @@
package org.opendc.trace.util.parquet
-import org.apache.parquet.avro.AvroParquetReader
import org.apache.parquet.hadoop.ParquetReader
import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.io.InputFile
@@ -38,11 +37,11 @@ import kotlin.io.path.isDirectory
* This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets.
*
* @param path The path to the Parquet file or directory to read.
- * @param factory Function to construct a [ParquetReader] for a local [InputFile].
+ * @param readSupport Helper class to perform conversion from Parquet to [T].
*/
public class LocalParquetReader<out T>(
path: Path,
- private val factory: (InputFile) -> ParquetReader<T> = avro()
+ private val readSupport: ReadSupport<T>
) : AutoCloseable {
/**
* The input files to process.
@@ -64,7 +63,7 @@ public class LocalParquetReader<out T>(
/**
* Construct a [LocalParquetReader] for the specified [file].
*/
- public constructor(file: File) : this(file.toPath())
+ public constructor(file: File, readSupport: ReadSupport<T>) : this(file.toPath(), readSupport)
/**
* Read a single entry in the Parquet file.
@@ -102,7 +101,7 @@ public class LocalParquetReader<out T>(
try {
this.reader = if (filesIterator.hasNext()) {
- factory(filesIterator.next())
+ createReader(filesIterator.next())
} else {
null
}
@@ -112,28 +111,12 @@ public class LocalParquetReader<out T>(
}
}
- public companion object {
- /**
- * A factory for reading Avro Parquet files.
- */
- public fun <T> avro(): (InputFile) -> ParquetReader<T> {
- return { input ->
- AvroParquetReader
- .builder<T>(input)
- .disableCompatibility()
- .build()
- }
- }
-
- /**
- * A factory for reading Parquet files with custom [ReadSupport].
- */
- public fun <T> custom(readSupport: ReadSupport<T>): (InputFile) -> ParquetReader<T> {
- return { input ->
- object : ParquetReader.Builder<T>(input) {
- override fun getReadSupport(): ReadSupport<T> = readSupport
- }.build()
- }
- }
+ /**
+ * Construct a [ParquetReader] for the specified [input] with a custom [ReadSupport].
+ */
+ private fun createReader(input: InputFile): ParquetReader<T> {
+ return object : ParquetReader.Builder<T>(input) {
+ override fun getReadSupport(): ReadSupport<@UnsafeVariance T> = this@LocalParquetReader.readSupport
+ }.build()
}
}
diff --git a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
index 8ef4d1fb..be354319 100644
--- a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
+++ b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
@@ -22,36 +22,81 @@
package org.opendc.trace.util.parquet
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.parquet.avro.AvroParquetReader
-import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.ParquetFileWriter
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.Converter
+import org.apache.parquet.io.api.GroupConverter
+import org.apache.parquet.io.api.PrimitiveConverter
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.Type
+import org.apache.parquet.schema.Types
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
-import java.io.File
import java.nio.file.FileAlreadyExistsException
+import java.nio.file.Files
import java.nio.file.NoSuchFileException
+import java.nio.file.Path
/**
* Test suite for the Parquet helper classes.
*/
internal class ParquetTest {
- private val schema = SchemaBuilder
- .record("test")
- .namespace("org.opendc.format.util")
- .fields()
- .name("field").type().intType().noDefault()
- .endRecord()
+ private lateinit var path: Path
- private lateinit var file: File
+ private val schema = Types.buildMessage()
+ .addField(
+ Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED)
+ .named("field")
+ )
+ .named("test")
+ private val writeSupport = object : WriteSupport<Int>() {
+ lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(schema, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: Int) {
+ val consumer = recordConsumer
+
+ consumer.startMessage()
+ consumer.startField("field", 0)
+ consumer.addInteger(record)
+ consumer.endField("field", 0)
+ consumer.endMessage()
+ }
+ }
+
+ private val readSupport = object : ReadSupport<Int>() {
+ override fun init(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType
+ ): ReadContext = ReadContext(fileSchema)
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext
+ ): RecordMaterializer<Int> = TestRecordMaterializer()
+ }
/**
- * Setup the test
+ * Set up the test
*/
@BeforeEach
fun setUp() {
- file = File.createTempFile("opendc", "parquet")
+ path = Files.createTempFile("opendc", "parquet")
}
/**
@@ -59,7 +104,7 @@ internal class ParquetTest {
*/
@AfterEach
fun tearDown() {
- file.delete()
+ Files.deleteIfExists(path)
}
/**
@@ -68,29 +113,24 @@ internal class ParquetTest {
@Test
fun testSmoke() {
val n = 4
- val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(file))
- .withSchema(schema)
+ val writer = LocalParquetWriter.builder(path, writeSupport)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()
try {
repeat(n) { i ->
- val record = GenericData.Record(schema)
- record.put("field", i)
- writer.write(record)
+ writer.write(i)
}
} finally {
writer.close()
}
- val reader = AvroParquetReader.builder<GenericData.Record>(LocalInputFile(file))
- .build()
-
+ val reader = LocalParquetReader(path, readSupport)
var counter = 0
try {
while (true) {
val record = reader.read() ?: break
- assertEquals(counter++, record.get("field"))
+ assertEquals(counter++, record)
}
} finally {
reader.close()
@@ -105,9 +145,7 @@ internal class ParquetTest {
@Test
fun testOverwrite() {
assertThrows<FileAlreadyExistsException> {
- AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(file))
- .withSchema(schema)
- .build()
+ LocalParquetWriter.builder(path, writeSupport).build()
}
}
@@ -116,10 +154,30 @@ internal class ParquetTest {
*/
@Test
fun testNonExistent() {
- file.delete()
+ Files.deleteIfExists(path)
assertThrows<NoSuchFileException> {
- AvroParquetReader.builder<GenericData.Record>(LocalInputFile(file))
- .build()
+ LocalParquetReader(path, readSupport)
+ }
+ }
+
+ private class TestRecordMaterializer : RecordMaterializer<Int>() {
+ private var current: Int = 0
+ private val fieldConverter = object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ current = value
+ }
+ }
+ private val root = object : GroupConverter() {
+ override fun getConverter(fieldIndex: Int): Converter {
+ require(fieldIndex == 0)
+ return fieldConverter
+ }
+ override fun start() {}
+ override fun end() {}
}
+
+ override fun getCurrentRecord(): Int = current
+
+ override fun getRootConverter(): GroupConverter = root
}
}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
index aae71c58..d6e42c8c 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
@@ -66,8 +66,7 @@ public class WtfTraceFormat : TraceFormat {
override fun newReader(path: Path, table: String): TableReader {
return when (table) {
TABLE_TASKS -> {
- val factory = LocalParquetReader.custom(TaskReadSupport())
- val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), factory)
+ val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport())
WtfTaskTableReader(reader)
}
else -> throw IllegalArgumentException("Table $table not supported")