From ae0b12987dca93c05e44341963511ac8cf802793 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 2 May 2022 11:06:31 +0200 Subject: refactor(trace/wtf): Do not use Avro when reading WTF trace This change updates the Workflow Trace format implementation in OpenDC to not use the `parquet-avro` library for exporting experiment data, but instead to use the low-level APIs to directly read the data from Parquet. This reduces the amount of conversions necessary before reaching the OpenDC trace API. --- .../org/opendc/trace/wtf/WtfTaskTableReader.kt | 81 +++------- .../kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt | 5 +- .../kotlin/org/opendc/trace/wtf/parquet/Task.kt | 42 ++++++ .../opendc/trace/wtf/parquet/TaskReadSupport.kt | 99 +++++++++++++ .../trace/wtf/parquet/TaskRecordMaterializer.kt | 165 +++++++++++++++++++++ 5 files changed, 329 insertions(+), 63 deletions(-) create mode 100644 opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt create mode 100644 opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt create mode 100644 opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt (limited to 'opendc-trace/opendc-trace-wtf/src') diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt index 1e332aca..f0db78b7 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt @@ -22,38 +22,30 @@ package org.opendc.trace.wtf -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.conv.* import org.opendc.trace.util.parquet.LocalParquetReader -import java.time.Duration -import java.time.Instant +import org.opendc.trace.wtf.parquet.Task /** * A [TableReader] implementation for the WTF format. */ -internal class WtfTaskTableReader(private val reader: LocalParquetReader) : TableReader { +internal class WtfTaskTableReader(private val reader: LocalParquetReader) : TableReader { /** * The current record. */ - private var record: GenericRecord? = null - - /** - * A flag to indicate that the columns have been initialized. - */ - private var hasInitializedColumns = false + private var record: Task? = null override fun nextRow(): Boolean { - val record = reader.read() - this.record = record + try { + val record = reader.read() + this.record = record - if (!hasInitializedColumns && record != null) { - initColumns(record.schema) - hasInitializedColumns = true + return record != null + } catch (e: Throwable) { + this.record = null + throw e } - - return record != null } override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 @@ -65,16 +57,15 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader (record[AVRO_COL_ID] as Long).toString() - COL_WORKFLOW_ID -> (record[AVRO_COL_WORKFLOW_ID] as Long).toString() - COL_SUBMIT_TIME -> Instant.ofEpochMilli(record[AVRO_COL_SUBMIT_TIME] as Long) - COL_WAIT_TIME -> Duration.ofMillis(record[AVRO_COL_WAIT_TIME] as Long) - COL_RUNTIME -> Duration.ofMillis(record[AVRO_COL_RUNTIME] as Long) + COL_ID -> record.id + COL_WORKFLOW_ID -> record.workflowId + COL_SUBMIT_TIME -> record.submitTime + COL_WAIT_TIME -> record.waitTime + COL_RUNTIME -> record.runtime COL_REQ_NCPUS, COL_GROUP_ID, COL_USER_ID -> getInt(index) - COL_PARENTS -> (record[AVRO_COL_PARENTS] as ArrayList).map { it["item"].toString() }.toSet() - COL_CHILDREN -> (record[AVRO_COL_CHILDREN] as ArrayList).map { it["item"].toString() }.toSet() + COL_PARENTS -> record.parents + COL_CHILDREN -> record.children else -> throw IllegalArgumentException("Invalid column") } } @@ -87,9 +78,9 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader (record[AVRO_COL_REQ_NCPUS] as Double).toInt() - COL_GROUP_ID -> record[AVRO_COL_GROUP_ID] as Int - COL_USER_ID -> record[AVRO_COL_USER_ID] as Int + COL_REQ_NCPUS -> record.requestedCpus + COL_GROUP_ID -> record.groupId + COL_USER_ID -> record.userId else -> throw IllegalArgumentException("Invalid column") } } @@ -106,38 +97,6 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader { - val reader = LocalParquetReader(path.resolve("tasks/schema-1.0")) + val factory = LocalParquetReader.custom(TaskReadSupport()) + val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), factory) WtfTaskTableReader(reader) } else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt new file mode 100644 index 00000000..71557f96 --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf.parquet + +import java.time.Duration +import java.time.Instant + +/** + * A task in the Workflow Trace Format. + */ +internal data class Task( + val id: String, + val workflowId: String, + val submitTime: Instant, + val waitTime: Duration, + val runtime: Duration, + val requestedCpus: Int, + val groupId: Int, + val userId: Int, + val parents: Set, + val children: Set +) diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt new file mode 100644 index 00000000..0017a4a9 --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.* + +/** + * A [ReadSupport] instance for [Task] objects. + */ +internal class TaskReadSupport : ReadSupport() { + override fun init(context: InitContext): ReadContext { + return ReadContext(READ_SCHEMA) + } + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map, + fileSchema: MessageType, + readContext: ReadContext + ): RecordMaterializer = TaskRecordMaterializer(readContext.requestedSchema) + + companion object { + /** + * Parquet read schema for the "tasks" table in the trace. + */ + @JvmStatic + val READ_SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("workflow_id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("ts_submit"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("wait_time"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("runtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("resource_amount_requested"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("user_id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("group_id"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) + .named("list") + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("children"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) + .named("list") + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("parents"), + ) + .named("task") + } +} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt new file mode 100644 index 00000000..08da5eaf --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf.parquet + +import org.apache.parquet.io.api.* +import org.apache.parquet.schema.MessageType +import java.time.Duration +import java.time.Instant +import kotlin.math.roundToInt + +/** + * A [RecordMaterializer] for [Task] records. + */ +internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer() { + /** + * State of current record being read. + */ + private var _id = "" + private var _workflowId = "" + private var _submitTime = Instant.MIN + private var _waitTime = Duration.ZERO + private var _runtime = Duration.ZERO + private var _requestedCpus = 0 + private var _groupId = 0 + private var _userId = 0 + private var _parents = mutableSetOf() + private var _children = mutableSetOf() + + /** + * Root converter for the record. + */ + private val root = object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = schema.fields.map { type -> + when (type.name) { + "id" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _id = value.toString() + } + } + "workflow_id" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _workflowId = value.toString() + } + } + "ts_submit" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _submitTime = Instant.ofEpochMilli(value) + } + } + "wait_time" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _waitTime = Duration.ofMillis(value) + } + } + "runtime" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _runtime = Duration.ofMillis(value) + } + } + "resource_amount_requested" -> object : PrimitiveConverter() { + override fun addDouble(value: Double) { + _requestedCpus = value.roundToInt() + } + } + "group_id" -> object : PrimitiveConverter() { + override fun addInt(value: Int) { + _groupId = value + } + } + "user_id" -> object : PrimitiveConverter() { + override fun addInt(value: Int) { + _userId = value + } + } + "children" -> RelationConverter(_children) + "parents" -> RelationConverter(_parents) + else -> error("Unknown column $type") + } + } + + override fun start() { + _id = "" + _workflowId = "" + _submitTime = Instant.MIN + _waitTime = Duration.ZERO + _runtime = Duration.ZERO + _requestedCpus = 0 + _groupId = 0 + _userId = 0 + _parents.clear() + _children.clear() + } + + override fun end() {} + + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } + + override fun getCurrentRecord(): Task = Task( + _id, + _workflowId, + _submitTime, + _waitTime, + _runtime, + _requestedCpus, + _groupId, + _userId, + _parents.toSet(), + _children.toSet() + ) + + override fun getRootConverter(): GroupConverter = root + + /** + * Helper class to convert parent and child relations and add them to [relations]. + */ + private class RelationConverter(private val relations: MutableSet) : GroupConverter() { + private val entryConverter = object : PrimitiveConverter() { + override fun addLong(value: Long) { + relations.add(value.toString()) + } + } + + private val listConverter = object : GroupConverter() { + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return entryConverter + } + + override fun start() {} + override fun end() {} + } + + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return listConverter + } + + override fun start() {} + override fun end() {} + } +} -- cgit v1.2.3 From 9411845b3f26536a1e6ea40504e396f19d25a09a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 2 May 2022 11:44:48 +0200 Subject: refactor(trace/parquet): Drop dependency on Avro This change updates the Parquet support library in OpenDC to not rely on Avro, but instead interface directly with Parquet's reading and writing functionality, providing less overhead. --- .../src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'opendc-trace/opendc-trace-wtf/src') diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index aae71c58..d6e42c8c 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -66,8 +66,7 @@ public class WtfTraceFormat : TraceFormat { override fun newReader(path: Path, table: String): TableReader { return when (table) { TABLE_TASKS -> { - val factory = LocalParquetReader.custom(TaskReadSupport()) - val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), factory) + val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport()) WtfTaskTableReader(reader) } else -> throw IllegalArgumentException("Table $table not supported") -- cgit v1.2.3 From 670cd279ea7789e07b6d778a21fdec68347ab305 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 2 May 2022 14:17:55 +0200 Subject: feat(trace/api): Add support for projecting tables This change adds support for projecting certain columns of a table. This enables faster reading for tables with high number of columns. Currently, we support projection in the Parquet-based workload formats. Other formats are text-based and will probably not benefit much from projection. --- .../kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt | 4 +-- .../opendc/trace/wtf/parquet/TaskReadSupport.kt | 39 ++++++++++++++++++++-- .../org/opendc/trace/wtf/WtfTraceFormatTest.kt | 2 +- 3 files changed, 40 insertions(+), 5 deletions(-) (limited to 'opendc-trace/opendc-trace-wtf/src') diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index d6e42c8c..e71253ac 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -63,10 +63,10 @@ public class WtfTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List>?): TableReader { return when (table) { TABLE_TASKS -> { - val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport()) + val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection)) WtfTaskTableReader(reader) } else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt index 0017a4a9..8e7325de 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt @@ -27,13 +27,48 @@ import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.* +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.* /** * A [ReadSupport] instance for [Task] objects. + * + * @param projection The projection of the table to read. */ -internal class TaskReadSupport : ReadSupport() { +internal class TaskReadSupport(private val projection: List>?) : ReadSupport() { + /** + * Mapping of table columns to their Parquet column names. + */ + private val colMap = mapOf, String>( + TASK_ID to "id", + TASK_WORKFLOW_ID to "workflow_id", + TASK_SUBMIT_TIME to "ts_submit", + TASK_WAIT_TIME to "wait_time", + TASK_RUNTIME to "runtime", + TASK_REQ_NCPUS to "resource_amount_requested", + TASK_PARENTS to "parents", + TASK_CHILDREN to "children", + TASK_GROUP_ID to "group_id", + TASK_USER_ID to "user_id" + ) + override fun init(context: InitContext): ReadContext { - return ReadContext(READ_SCHEMA) + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val fieldByName = READ_SCHEMA.fields.associateBy { it.name } + + for (col in projection) { + val fieldName = colMap[col] ?: continue + addField(fieldByName.getValue(fieldName)) + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + return ReadContext(projectedSchema) } override fun prepareForRead( diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index 0f0e422d..c0eb3f08 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -61,7 +61,7 @@ class WtfTraceFormatTest { @Test fun testTableReader() { val path = Paths.get("src/test/resources/wtf-trace") - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, listOf(TASK_ID, TASK_WORKFLOW_ID, TASK_SUBMIT_TIME, TASK_RUNTIME, TASK_PARENTS)) assertAll( { assertTrue(reader.nextRow()) }, -- cgit v1.2.3