diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-02 16:06:44 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-05-02 16:06:44 +0200 |
| commit | c78285f6346236053979aa98113ba9e6d7efb21e (patch) | |
| tree | 44221b3a39516a235a0b41adf525a79a60abb998 /opendc-trace/opendc-trace-wtf/src | |
| parent | 44ddd27a745f2dfe4b6ffef1b7657d156dd61489 (diff) | |
| parent | e4d3a8add5388182cf7a12b1099678a0b769b106 (diff) | |
merge: Add support for SQL via Apache Calcite (#78)
This pull request integrates initial support for SQL queries via Apache Calcite into the OpenDC codebase.
Our vision is that users of OpenDC should be able to use SQL queries to access and process most
of the experiment data generated by simulations.
This pull request moves towards this goal by adding the ability to query workload traces supported
by OpenDC using SQL. We also provide a CLI for querying the data in workload traces via `opendc-trace-tools`:
```bash
opendc-trace-tools query -i data/bitbrains-small -f opendc-vm "SELECT MAX(cpu_count) FROM resource_states"
```
## Implementation Notes :hammer_and_pick:
* Add Calcite (SQL) integration
* Add support for writing via SQL
* Add support for writing via SQL
* Support custom Parquet ReadSupport implementations
* Read records using low-level Parquet API
* Do not use Avro when exporting experiment data
* Do not use Avro when reading WTF trace
* Drop dependency on Avro
* Add support for projections
## External Dependencies :four_leaf_clover:
* Apache Calcite
## Breaking API Changes :warning:
* The existing code for reading Parquet traces using Apache Avro has been removed.
* `TraceFormat.newReader` now accepts a nullable `projection` parameter
Diffstat (limited to 'opendc-trace/opendc-trace-wtf/src')
6 files changed, 365 insertions, 65 deletions
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt index 1e332aca..f0db78b7 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt @@ -22,38 +22,30 @@ package org.opendc.trace.wtf -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.conv.* import org.opendc.trace.util.parquet.LocalParquetReader -import java.time.Duration -import java.time.Instant +import org.opendc.trace.wtf.parquet.Task /** * A [TableReader] implementation for the WTF format. */ -internal class WtfTaskTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader { +internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) : TableReader { /** * The current record. */ - private var record: GenericRecord? = null - - /** - * A flag to indicate that the columns have been initialized. - */ - private var hasInitializedColumns = false + private var record: Task? = null override fun nextRow(): Boolean { - val record = reader.read() - this.record = record + try { + val record = reader.read() + this.record = record - if (!hasInitializedColumns && record != null) { - initColumns(record.schema) - hasInitializedColumns = true + return record != null + } catch (e: Throwable) { + this.record = null + throw e } - - return record != null } override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 @@ -65,16 +57,15 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic override fun get(index: Int): Any? { val record = checkNotNull(record) { "Reader in invalid state" } - @Suppress("UNCHECKED_CAST") return when (index) { - COL_ID -> (record[AVRO_COL_ID] as Long).toString() - COL_WORKFLOW_ID -> (record[AVRO_COL_WORKFLOW_ID] as Long).toString() - COL_SUBMIT_TIME -> Instant.ofEpochMilli(record[AVRO_COL_SUBMIT_TIME] as Long) - COL_WAIT_TIME -> Duration.ofMillis(record[AVRO_COL_WAIT_TIME] as Long) - COL_RUNTIME -> Duration.ofMillis(record[AVRO_COL_RUNTIME] as Long) + COL_ID -> record.id + COL_WORKFLOW_ID -> record.workflowId + COL_SUBMIT_TIME -> record.submitTime + COL_WAIT_TIME -> record.waitTime + COL_RUNTIME -> record.runtime COL_REQ_NCPUS, COL_GROUP_ID, COL_USER_ID -> getInt(index) - COL_PARENTS -> (record[AVRO_COL_PARENTS] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet() - COL_CHILDREN -> (record[AVRO_COL_CHILDREN] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet() + COL_PARENTS -> record.parents + COL_CHILDREN -> record.children else -> throw IllegalArgumentException("Invalid column") } } @@ -87,9 +78,9 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_REQ_NCPUS -> (record[AVRO_COL_REQ_NCPUS] as Double).toInt() - COL_GROUP_ID -> record[AVRO_COL_GROUP_ID] as Int - COL_USER_ID -> record[AVRO_COL_USER_ID] as Int + COL_REQ_NCPUS -> record.requestedCpus + COL_GROUP_ID -> record.groupId + COL_USER_ID -> record.userId else -> throw IllegalArgumentException("Invalid column") } } @@ -106,38 +97,6 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic reader.close() } - /** - * Initialize the columns for the reader based on [schema]. - */ - private fun initColumns(schema: Schema) { - try { - AVRO_COL_ID = schema.getField("id").pos() - AVRO_COL_WORKFLOW_ID = schema.getField("workflow_id").pos() - AVRO_COL_SUBMIT_TIME = schema.getField("ts_submit").pos() - AVRO_COL_WAIT_TIME = schema.getField("wait_time").pos() - AVRO_COL_RUNTIME = schema.getField("runtime").pos() - AVRO_COL_REQ_NCPUS = schema.getField("resource_amount_requested").pos() - AVRO_COL_PARENTS = schema.getField("parents").pos() - AVRO_COL_CHILDREN = schema.getField("children").pos() - AVRO_COL_GROUP_ID = schema.getField("group_id").pos() - AVRO_COL_USER_ID = schema.getField("user_id").pos() - } catch (e: NullPointerException) { - // This happens when the field we are trying to access does not exist - throw IllegalArgumentException("Invalid schema", e) - } - } - - private var AVRO_COL_ID = -1 - private var AVRO_COL_WORKFLOW_ID = -1 - private var AVRO_COL_SUBMIT_TIME = -1 - private var AVRO_COL_WAIT_TIME = -1 - private var AVRO_COL_RUNTIME = -1 - private var AVRO_COL_REQ_NCPUS = -1 - private var AVRO_COL_PARENTS = -1 - private var AVRO_COL_CHILDREN = -1 - private var AVRO_COL_GROUP_ID = -1 - private var AVRO_COL_USER_ID = -1 - private val COL_ID = 0 private val COL_WORKFLOW_ID = 1 private val COL_SUBMIT_TIME = 2 diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index c8f9ecaa..e71253ac 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -22,12 +22,12 @@ package org.opendc.trace.wtf -import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.conv.* import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.parquet.LocalParquetReader +import org.opendc.trace.wtf.parquet.TaskReadSupport import java.nio.file.Path /** @@ -63,10 +63,10 @@ public class WtfTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { return when (table) { TABLE_TASKS -> { - val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")) + val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection)) WtfTaskTableReader(reader) } else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt new file mode 100644 index 00000000..71557f96 --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf.parquet + +import java.time.Duration +import java.time.Instant + +/** + * A task in the Workflow Trace Format. + */ +internal data class Task( + val id: String, + val workflowId: String, + val submitTime: Instant, + val waitTime: Duration, + val runtime: Duration, + val requestedCpus: Int, + val groupId: Int, + val userId: Int, + val parents: Set<String>, + val children: Set<String> +) diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt new file mode 100644 index 00000000..8e7325de --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.* +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.* + +/** + * A [ReadSupport] instance for [Task] objects. + * + * @param projection The projection of the table to read. + */ +internal class TaskReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Task>() { + /** + * Mapping of table columns to their Parquet column names. + */ + private val colMap = mapOf<TableColumn<*>, String>( + TASK_ID to "id", + TASK_WORKFLOW_ID to "workflow_id", + TASK_SUBMIT_TIME to "ts_submit", + TASK_WAIT_TIME to "wait_time", + TASK_RUNTIME to "runtime", + TASK_REQ_NCPUS to "resource_amount_requested", + TASK_PARENTS to "parents", + TASK_CHILDREN to "children", + TASK_GROUP_ID to "group_id", + TASK_USER_ID to "user_id" + ) + + override fun init(context: InitContext): ReadContext { + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val fieldByName = READ_SCHEMA.fields.associateBy { it.name } + + for (col in projection) { + val fieldName = colMap[col] ?: continue + addField(fieldByName.getValue(fieldName)) + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + return ReadContext(projectedSchema) + } + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType, + readContext: ReadContext + ): RecordMaterializer<Task> = TaskRecordMaterializer(readContext.requestedSchema) + + companion object { + /** + * Parquet read schema for the "tasks" table in the trace. + */ + @JvmStatic + val READ_SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("workflow_id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("ts_submit"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("wait_time"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("runtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("resource_amount_requested"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("user_id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("group_id"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) + .named("list") + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("children"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) + .named("list") + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("parents"), + ) + .named("task") + } +} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt new file mode 100644 index 00000000..08da5eaf --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf.parquet + +import org.apache.parquet.io.api.* +import org.apache.parquet.schema.MessageType +import java.time.Duration +import java.time.Instant +import kotlin.math.roundToInt + +/** + * A [RecordMaterializer] for [Task] records. + */ +internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<Task>() { + /** + * State of current record being read. + */ + private var _id = "" + private var _workflowId = "" + private var _submitTime = Instant.MIN + private var _waitTime = Duration.ZERO + private var _runtime = Duration.ZERO + private var _requestedCpus = 0 + private var _groupId = 0 + private var _userId = 0 + private var _parents = mutableSetOf<String>() + private var _children = mutableSetOf<String>() + + /** + * Root converter for the record. + */ + private val root = object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = schema.fields.map { type -> + when (type.name) { + "id" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _id = value.toString() + } + } + "workflow_id" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _workflowId = value.toString() + } + } + "ts_submit" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _submitTime = Instant.ofEpochMilli(value) + } + } + "wait_time" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _waitTime = Duration.ofMillis(value) + } + } + "runtime" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _runtime = Duration.ofMillis(value) + } + } + "resource_amount_requested" -> object : PrimitiveConverter() { + override fun addDouble(value: Double) { + _requestedCpus = value.roundToInt() + } + } + "group_id" -> object : PrimitiveConverter() { + override fun addInt(value: Int) { + _groupId = value + } + } + "user_id" -> object : PrimitiveConverter() { + override fun addInt(value: Int) { + _userId = value + } + } + "children" -> RelationConverter(_children) + "parents" -> RelationConverter(_parents) + else -> error("Unknown column $type") + } + } + + override fun start() { + _id = "" + _workflowId = "" + _submitTime = Instant.MIN + _waitTime = Duration.ZERO + _runtime = Duration.ZERO + _requestedCpus = 0 + _groupId = 0 + _userId = 0 + _parents.clear() + _children.clear() + } + + override fun end() {} + + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } + + override fun getCurrentRecord(): Task = Task( + _id, + _workflowId, + _submitTime, + _waitTime, + _runtime, + _requestedCpus, + _groupId, + _userId, + _parents.toSet(), + _children.toSet() + ) + + override fun getRootConverter(): GroupConverter = root + + /** + * Helper class to convert parent and child relations and add them to [relations]. + */ + private class RelationConverter(private val relations: MutableSet<String>) : GroupConverter() { + private val entryConverter = object : PrimitiveConverter() { + override fun addLong(value: Long) { + relations.add(value.toString()) + } + } + + private val listConverter = object : GroupConverter() { + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return entryConverter + } + + override fun start() {} + override fun end() {} + } + + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return listConverter + } + + override fun start() {} + override fun end() {} + } +} diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index 0f0e422d..c0eb3f08 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -61,7 +61,7 @@ class WtfTraceFormatTest { @Test fun testTableReader() { val path = Paths.get("src/test/resources/wtf-trace") - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, listOf(TASK_ID, TASK_WORKFLOW_ID, TASK_SUBMIT_TIME, TASK_RUNTIME, TASK_PARENTS)) assertAll( { assertTrue(reader.nextRow()) }, |
