diff options
Diffstat (limited to 'opendc-trace/opendc-trace-calcite/src/main')
7 files changed, 608 insertions, 0 deletions
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt new file mode 100644 index 00000000..9c7b69a2 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.linq4j.Enumerable +import org.apache.calcite.schema.Table + +/** + * A Calcite [Table] to which rows can be inserted. + */ +internal interface InsertableTable : Table { + /** + * Insert [rows] into this table. + * + * @param rows The rows to insert into the table. + * @return The number of rows inserted. + */ + fun insert(rows: Enumerable<Array<Any?>>): Long +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt new file mode 100644 index 00000000..1854f262 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.linq4j.Enumerator +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader +import java.sql.Timestamp +import java.time.Duration +import java.time.Instant +import java.util.concurrent.atomic.AtomicBoolean + +/** + * An [Enumerator] for a [TableReader]. + */ +internal class TraceReaderEnumerator<E>( + private val reader: TableReader, + private val columns: List<TableColumn<*>>, + private val cancelFlag: AtomicBoolean +) : Enumerator<E> { + private val columnIndices = columns.map { reader.resolve(it) }.toIntArray() + private var current: E? = null + + override fun moveNext(): Boolean { + if (cancelFlag.get()) { + return false + } + + val reader = reader + val res = reader.nextRow() + + if (res) { + @Suppress("UNCHECKED_CAST") + current = convertRow(reader) as E + } else { + current = null + } + + return res + } + + override fun current(): E = checkNotNull(current) + + override fun reset() { + throw UnsupportedOperationException() + } + + override fun close() { + reader.close() + } + + private fun convertRow(reader: TableReader): Array<Any?> { + val res = arrayOfNulls<Any?>(columns.size) + val columnIndices = columnIndices + + for ((index, column) in columns.withIndex()) { + val columnIndex = columnIndices[index] + res[index] = convertColumn(reader, column, columnIndex) + } + return res + } + + private fun convertColumn(reader: TableReader, column: TableColumn<*>, columnIndex: Int): Any? { + val value = reader.get(columnIndex) + + return when (column.type) { + Instant::class.java -> Timestamp.from(value as Instant) + Duration::class.java -> (value as Duration).toMillis() + Set::class.java -> (value as Set<*>).toTypedArray() + else -> value + } + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt new file mode 100644 index 00000000..3249546d --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.schema.Schema +import org.apache.calcite.schema.Table +import org.apache.calcite.schema.impl.AbstractSchema +import org.opendc.trace.Trace + +/** + * A Calcite [Schema] that exposes an OpenDC [Trace] into multiple SQL tables. + * + * @param trace The [Trace] to create a schema for. + */ +public class TraceSchema(private val trace: Trace) : AbstractSchema() { + /** + * The [Table]s that belong to this schema. + */ + private val tables: Map<String, TraceTable> by lazy { + trace.tables.associateWith { + val table = checkNotNull(trace.getTable(it)) { "Unexpected null table" } + TraceTable(table) + } + } + + override fun getTableMap(): Map<String, Table> = tables +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt new file mode 100644 index 00000000..3c6badc8 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.model.ModelHandler +import org.apache.calcite.schema.Schema +import org.apache.calcite.schema.SchemaFactory +import org.apache.calcite.schema.SchemaPlus +import org.opendc.trace.Trace +import java.io.File +import java.nio.file.Paths + +/** + * Factory that creates a [TraceSchema]. + * + * This factory allows users to include a schema that references a trace in a `model.json` file. + */ +public class TraceSchemaFactory : SchemaFactory { + override fun create(parentSchema: SchemaPlus, name: String, operand: Map<String, Any>): Schema { + val base = operand[ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName] as File? + val pathParam = requireNotNull(operand["path"]) { "Trace path not specified" } as String + val path = if (base != null) File(base, pathParam).toPath() else Paths.get(pathParam) + + val format = requireNotNull(operand["format"]) { "Trace format not specified" } as String + val create = operand.getOrDefault("create", false) as Boolean + + val trace = if (create) Trace.create(path, format) else Trace.open(path, format) + return TraceSchema(trace) + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt new file mode 100644 index 00000000..8c571b82 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.DataContext +import org.apache.calcite.adapter.java.AbstractQueryableTable +import org.apache.calcite.adapter.java.JavaTypeFactory +import org.apache.calcite.linq4j.* +import org.apache.calcite.plan.RelOptCluster +import org.apache.calcite.plan.RelOptTable +import org.apache.calcite.prepare.Prepare.CatalogReader +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableModify +import org.apache.calcite.rel.logical.LogicalTableModify +import org.apache.calcite.rel.type.RelDataType +import org.apache.calcite.rel.type.RelDataTypeFactory +import org.apache.calcite.rex.RexNode +import org.apache.calcite.schema.* +import org.apache.calcite.schema.impl.AbstractTableQueryable +import org.apache.calcite.sql.type.SqlTypeName +import java.time.Duration +import java.time.Instant +import java.util.concurrent.atomic.AtomicBoolean + +/** + * A Calcite [Table] that exposes an OpenDC [org.opendc.trace.Table] as SQL table. + */ +internal class TraceTable(private val table: org.opendc.trace.Table) : + AbstractQueryableTable(Array<Any?>::class.java), + ProjectableFilterableTable, + ModifiableTable, + InsertableTable { + private var rowType: RelDataType? = null + + override fun getRowType(typeFactory: RelDataTypeFactory): RelDataType { + var rowType = rowType + if (rowType == null) { + rowType = deduceRowType(typeFactory as JavaTypeFactory) + this.rowType = rowType + } + + return rowType + } + + override fun scan(root: DataContext, filters: MutableList<RexNode>, projects: IntArray?): Enumerable<Array<Any?>> { + // Filters are currently not supported by the OpenDC trace API. By keeping the filters in the list, Calcite + // assumes that they are declined and will perform the filters itself. + + val projection = projects?.map { table.columns[it] } + val cancelFlag = DataContext.Variable.CANCEL_FLAG.get<AtomicBoolean>(root) + return object : AbstractEnumerable<Array<Any?>>() { + override fun enumerator(): Enumerator<Array<Any?>> = + TraceReaderEnumerator(table.newReader(projection), projection ?: table.columns, cancelFlag) + } + } + + override fun insert(rows: Enumerable<Array<Any?>>): Long { + val table = table + val columns = table.columns + val writer = table.newWriter() + val columnIndices = columns.map { writer.resolve(it) }.toIntArray() + var rowCount = 0L + + try { + for (row in rows) { + writer.startRow() + + for ((index, value) in row.withIndex()) { + if (value == null) { + continue + } + val columnType = columns[index].type + + writer.set( + columnIndices[index], + when (columnType) { + Duration::class.java -> Duration.ofMillis(value as Long) + Instant::class.java -> Instant.ofEpochMilli(value as Long) + Set::class.java -> (value as List<*>).toSet() + else -> value + } + ) + } + + writer.endRow() + + rowCount++ + } + } finally { + writer.close() + } + + return rowCount + } + + override fun <T> asQueryable(queryProvider: QueryProvider, schema: SchemaPlus, tableName: String): Queryable<T> { + return object : AbstractTableQueryable<T>(queryProvider, schema, this@TraceTable, tableName) { + override fun enumerator(): Enumerator<T> { + val cancelFlag = AtomicBoolean(false) + return TraceReaderEnumerator( + this@TraceTable.table.newReader(), + this@TraceTable.table.columns, + cancelFlag + ) + } + + override fun toString(): String = "TraceTableQueryable[table=$tableName]" + } + } + + override fun getModifiableCollection(): MutableCollection<Any?>? = null + + override fun toModificationRel( + cluster: RelOptCluster, + table: RelOptTable, + catalogReader: CatalogReader, + child: RelNode, + operation: TableModify.Operation, + updateColumnList: MutableList<String>?, + sourceExpressionList: MutableList<RexNode>?, + flattened: Boolean + ): TableModify { + cluster.planner.addRule(TraceTableModifyRule.DEFAULT.toRule()) + + return LogicalTableModify.create( + table, + catalogReader, + child, + operation, + updateColumnList, + sourceExpressionList, + flattened + ) + } + + override fun toString(): String = "TraceTable" + + private fun deduceRowType(typeFactory: JavaTypeFactory): RelDataType { + val types = mutableListOf<RelDataType>() + val names = mutableListOf<String>() + + for (column in table.columns) { + names.add(column.name) + types.add( + when (column.type) { + Instant::class.java -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP) + Duration::class.java -> typeFactory.createSqlType(SqlTypeName.BIGINT) + Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.UNKNOWN), -1) + else -> typeFactory.createType(column.type) + } + ) + } + + return typeFactory.createStructType(types, names) + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt new file mode 100644 index 00000000..64dc0cea --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.adapter.enumerable.* +import org.apache.calcite.adapter.enumerable.EnumerableRel.Prefer +import org.apache.calcite.adapter.java.JavaTypeFactory +import org.apache.calcite.linq4j.Enumerable +import org.apache.calcite.linq4j.tree.BlockBuilder +import org.apache.calcite.linq4j.tree.Expressions +import org.apache.calcite.linq4j.tree.Types +import org.apache.calcite.plan.* +import org.apache.calcite.prepare.Prepare +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableModify +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.RexNode +import org.apache.calcite.schema.ModifiableTable +import org.apache.calcite.util.BuiltInMethod +import java.lang.reflect.Method + +/** + * A [TableModify] expression that modifies a workload trace. + */ +internal class TraceTableModify( + cluster: RelOptCluster, + traitSet: RelTraitSet, + table: RelOptTable, + schema: Prepare.CatalogReader, + input: RelNode, + operation: Operation, + updateColumnList: List<String>?, + sourceExpressionList: List<RexNode>?, + flattened: Boolean +) : TableModify(cluster, traitSet, table, schema, input, operation, updateColumnList, sourceExpressionList, flattened), + EnumerableRel { + init { + // Make sure the table is modifiable + table.unwrap(ModifiableTable::class.java) ?: throw AssertionError() // TODO: user error in validator + } + + override fun copy(traitSet: RelTraitSet, inputs: List<RelNode>?): RelNode { + return TraceTableModify( + cluster, + traitSet, + table, + getCatalogReader(), + sole(inputs), + operation, + updateColumnList, + sourceExpressionList, + isFlattened + ) + } + + override fun computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery?): RelOptCost { + // Prefer this plan compared to the standard EnumerableTableModify. + return super.computeSelfCost(planner, mq)!!.multiplyBy(.1) + } + + override fun implement(implementor: EnumerableRelImplementor, pref: Prefer): EnumerableRel.Result { + val builder = BlockBuilder() + val result = implementor.visitChild(this, 0, getInput() as EnumerableRel, pref) + val childExp = builder.append("child", result.block) + val convertedChildExpr = if (getInput().rowType != rowType) { + val typeFactory = cluster.typeFactory as JavaTypeFactory + val format = EnumerableTableScan.deduceFormat(table) + val physType = PhysTypeImpl.of(typeFactory, table.rowType, format) + val childPhysType = result.physType + val o = Expressions.parameter(childPhysType.javaRowType, "o") + val expressionList = List(childPhysType.rowType.fieldCount) { i -> + childPhysType.fieldReference(o, i, physType.getJavaFieldType(i)) + } + + builder.append( + "convertedChild", + Expressions.call( + childExp, + BuiltInMethod.SELECT.method, + Expressions.lambda<org.apache.calcite.linq4j.function.Function<*>>(physType.record(expressionList), o) + ) + ) + } else { + childExp + } + + if (!isInsert) { + throw UnsupportedOperationException("Deletion and update not supported") + } + + val expression = table.getExpression(InsertableTable::class.java) + builder.add( + Expressions.return_( + null, + Expressions.call( + BuiltInMethod.SINGLETON_ENUMERABLE.method, + Expressions.call( + Long::class.java, + expression, + INSERT_METHOD, + convertedChildExpr, + ) + ) + ) + ) + + val rowFormat = if (pref === Prefer.ARRAY) JavaRowFormat.ARRAY else JavaRowFormat.SCALAR + val physType = PhysTypeImpl.of(implementor.typeFactory, getRowType(), rowFormat) + return implementor.result(physType, builder.toBlock()) + } + + private companion object { + /** + * Reference to [InsertableTable.insert] method. + */ + val INSERT_METHOD: Method = Types.lookupMethod(InsertableTable::class.java, "insert", Enumerable::class.java) + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt new file mode 100644 index 00000000..7572e381 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.adapter.enumerable.EnumerableConvention +import org.apache.calcite.plan.Convention +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.TableModify +import org.apache.calcite.rel.logical.LogicalTableModify +import org.apache.calcite.schema.ModifiableTable + +/** + * A [ConverterRule] from a [LogicalTableModify] to a [TraceTableModify]. + */ +internal class TraceTableModifyRule(config: Config) : ConverterRule(config) { + override fun convert(rel: RelNode): RelNode? { + val modify = rel as TableModify + val table = modify.table!! + + // Make sure that the table is modifiable + if (table.unwrap(ModifiableTable::class.java) == null) { + return null + } + + val traitSet = modify.traitSet.replace(EnumerableConvention.INSTANCE) + return TraceTableModify( + modify.cluster, traitSet, + table, + modify.catalogReader, + convert(modify.input, traitSet), + modify.operation, + modify.updateColumnList, + modify.sourceExpressionList, + modify.isFlattened + ) + } + + companion object { + /** Default configuration. */ + val DEFAULT: Config = Config.INSTANCE + .withConversion(LogicalTableModify::class.java, Convention.NONE, EnumerableConvention.INSTANCE, "TraceTableModificationRule") + .withRuleFactory { config: Config -> TraceTableModifyRule(config) } + } +} |
