diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2025-07-22 15:47:44 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-07-22 15:47:44 +0200 |
| commit | e22c97dcca7478d6941b78bdf7cd873bc0d23cdc (patch) | |
| tree | f1859c16f4c7973d8b16ed693caad4c749d42331 /opendc-trace/opendc-trace-calcite/src/main | |
| parent | 0c0cf25616771cd40a9e401edcba4a5e5016f76e (diff) | |
Updated workload schema (#360)
Diffstat (limited to 'opendc-trace/opendc-trace-calcite/src/main')
7 files changed, 0 insertions, 700 deletions
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt deleted file mode 100644 index 9c7b69a2..00000000 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.calcite - -import org.apache.calcite.linq4j.Enumerable -import org.apache.calcite.schema.Table - -/** - * A Calcite [Table] to which rows can be inserted. - */ -internal interface InsertableTable : Table { - /** - * Insert [rows] into this table. - * - * @param rows The rows to insert into the table. - * @return The number of rows inserted. - */ - fun insert(rows: Enumerable<Array<Any?>>): Long -} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt deleted file mode 100644 index eed52ab3..00000000 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.calcite - -import org.apache.calcite.linq4j.Enumerator -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import java.nio.ByteBuffer -import java.nio.ByteOrder -import java.util.concurrent.atomic.AtomicBoolean - -/** - * An [Enumerator] for a [TableReader]. - */ -internal class TraceReaderEnumerator<E>( - private val reader: TableReader, - private val columns: List<TableColumn>, - private val cancelFlag: AtomicBoolean, -) : Enumerator<E> { - private val columnIndices = columns.map { reader.resolve(it.name) }.toIntArray() - private var current: E? = null - - override fun moveNext(): Boolean { - if (cancelFlag.get()) { - return false - } - - val reader = reader - val res = reader.nextRow() - - if (res) { - @Suppress("UNCHECKED_CAST") - current = convertRow(reader) as E - } else { - current = null - } - - return res - } - - override fun current(): E = checkNotNull(current) - - override fun reset() { - throw UnsupportedOperationException() - } - - override fun close() { - reader.close() - } - - private fun convertRow(reader: TableReader): Array<Any?> { - val res = arrayOfNulls<Any?>(columns.size) - val columnIndices = columnIndices - - for ((index, column) in columns.withIndex()) { - val columnIndex = columnIndices[index] - res[index] = convertColumn(reader, column, columnIndex) - } - return res - } - - private fun convertColumn( - reader: TableReader, - column: TableColumn, - columnIndex: Int, - ): Any? { - return when (column.type) { - is TableColumnType.Boolean -> reader.getBoolean(columnIndex) - is TableColumnType.Int -> reader.getInt(columnIndex) - is TableColumnType.Long -> reader.getLong(columnIndex) - is TableColumnType.Float -> reader.getFloat(columnIndex) - is TableColumnType.Double -> reader.getDouble(columnIndex) - is TableColumnType.String -> reader.getString(columnIndex) - is TableColumnType.UUID -> { - val uuid = reader.getUUID(columnIndex) - - if (uuid != null) { - val uuidBytes = ByteArray(16) - - ByteBuffer.wrap(uuidBytes) - .order(ByteOrder.BIG_ENDIAN) - .putLong(uuid.mostSignificantBits) - .putLong(uuid.leastSignificantBits) - - uuidBytes - } else { - null - } - } - is TableColumnType.Instant -> reader.getInstant(columnIndex)?.toEpochMilli() - is TableColumnType.Duration -> reader.getDuration(columnIndex)?.toMillis() ?: 0 - is TableColumnType.List -> reader.getList(columnIndex, Any::class.java)?.toTypedArray() - is TableColumnType.Set -> reader.getSet(columnIndex, Any::class.java)?.toTypedArray() - is TableColumnType.Map -> reader.getMap(columnIndex, Any::class.java, Any::class.java) - } - } -} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt deleted file mode 100644 index 3249546d..00000000 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.calcite - -import org.apache.calcite.schema.Schema -import org.apache.calcite.schema.Table -import org.apache.calcite.schema.impl.AbstractSchema -import org.opendc.trace.Trace - -/** - * A Calcite [Schema] that exposes an OpenDC [Trace] into multiple SQL tables. - * - * @param trace The [Trace] to create a schema for. - */ -public class TraceSchema(private val trace: Trace) : AbstractSchema() { - /** - * The [Table]s that belong to this schema. - */ - private val tables: Map<String, TraceTable> by lazy { - trace.tables.associateWith { - val table = checkNotNull(trace.getTable(it)) { "Unexpected null table" } - TraceTable(table) - } - } - - override fun getTableMap(): Map<String, Table> = tables -} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt deleted file mode 100644 index cbf7ec43..00000000 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.calcite - -import org.apache.calcite.model.ModelHandler -import org.apache.calcite.schema.Schema -import org.apache.calcite.schema.SchemaFactory -import org.apache.calcite.schema.SchemaPlus -import org.opendc.trace.Trace -import java.io.File -import java.nio.file.Paths - -/** - * Factory that creates a [TraceSchema]. - * - * This factory allows users to include a schema that references a trace in a `model.json` file. - */ -public class TraceSchemaFactory : SchemaFactory { - override fun create( - parentSchema: SchemaPlus, - name: String, - operand: Map<String, Any>, - ): Schema { - val base = operand[ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName] as File? - val pathParam = requireNotNull(operand["path"]) { "Trace path not specified" } as String - val path = if (base != null) File(base, pathParam).toPath() else Paths.get(pathParam) - - val format = requireNotNull(operand["format"]) { "Trace format not specified" } as String - val create = operand.getOrDefault("create", false) as Boolean - - val trace = if (create) Trace.create(path, format) else Trace.open(path, format) - return TraceSchema(trace) - } -} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt deleted file mode 100644 index e74d2ee8..00000000 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.calcite - -import org.apache.calcite.DataContext -import org.apache.calcite.adapter.java.AbstractQueryableTable -import org.apache.calcite.adapter.java.JavaTypeFactory -import org.apache.calcite.linq4j.AbstractEnumerable -import org.apache.calcite.linq4j.Enumerable -import org.apache.calcite.linq4j.Enumerator -import org.apache.calcite.linq4j.QueryProvider -import org.apache.calcite.linq4j.Queryable -import org.apache.calcite.plan.RelOptCluster -import org.apache.calcite.plan.RelOptTable -import org.apache.calcite.prepare.Prepare.CatalogReader -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.core.TableModify -import org.apache.calcite.rel.logical.LogicalTableModify -import org.apache.calcite.rel.type.RelDataType -import org.apache.calcite.rel.type.RelDataTypeFactory -import org.apache.calcite.rex.RexNode -import org.apache.calcite.schema.ModifiableTable -import org.apache.calcite.schema.ProjectableFilterableTable -import org.apache.calcite.schema.SchemaPlus -import org.apache.calcite.schema.impl.AbstractTableQueryable -import org.apache.calcite.sql.type.SqlTypeName -import org.opendc.trace.TableColumnType -import java.nio.ByteBuffer -import java.time.Duration -import java.time.Instant -import java.util.UUID -import java.util.concurrent.atomic.AtomicBoolean - -/** - * A Calcite [Table] that exposes an OpenDC [org.opendc.trace.Table] as SQL table. - */ -internal class TraceTable(private val table: org.opendc.trace.Table) : - AbstractQueryableTable(Array<Any?>::class.java), - ProjectableFilterableTable, - ModifiableTable, - InsertableTable { - private var rowType: RelDataType? = null - - override fun getRowType(typeFactory: RelDataTypeFactory): RelDataType { - var rowType = rowType - if (rowType == null) { - rowType = deduceRowType(typeFactory as JavaTypeFactory) - this.rowType = rowType - } - - return rowType - } - - override fun scan( - root: DataContext, - filters: MutableList<RexNode>, - projects: IntArray?, - ): Enumerable<Array<Any?>> { - // Filters are currently not supported by the OpenDC trace API. By keeping the filters in the list, Calcite - // assumes that they are declined and will perform the filters itself. - - val projection = projects?.map { table.columns[it] } - val cancelFlag = DataContext.Variable.CANCEL_FLAG.get<AtomicBoolean>(root) - return object : AbstractEnumerable<Array<Any?>>() { - override fun enumerator(): Enumerator<Array<Any?>> = - TraceReaderEnumerator(table.newReader(projection?.map { it.name }), projection ?: table.columns, cancelFlag) - } - } - - override fun insert(rows: Enumerable<Array<Any?>>): Long { - val table = table - val columns = table.columns - val writer = table.newWriter() - val columnIndices = columns.map { writer.resolve(it.name) }.toIntArray() - var rowCount = 0L - - try { - for (row in rows) { - writer.startRow() - - for ((index, value) in row.withIndex()) { - if (value == null) { - continue - } - val columnType = columns[index].type - val columnIndex = columnIndices[index] - when (columnType) { - is TableColumnType.Boolean -> writer.setBoolean(columnIndex, value as Boolean) - is TableColumnType.Int -> writer.setInt(columnIndex, value as Int) - is TableColumnType.Long -> writer.setLong(columnIndex, value as Long) - is TableColumnType.Float -> writer.setFloat(columnIndex, value as Float) - is TableColumnType.Double -> writer.setDouble(columnIndex, value as Double) - is TableColumnType.String -> writer.setString(columnIndex, value as String) - is TableColumnType.UUID -> { - val bb = ByteBuffer.wrap(value as ByteArray) - writer.setUUID(columnIndex, UUID(bb.getLong(), bb.getLong())) - } - is TableColumnType.Instant -> writer.setInstant(columnIndex, Instant.ofEpochMilli(value as Long)) - is TableColumnType.Duration -> writer.setDuration(columnIndex, Duration.ofMillis(value as Long)) - is TableColumnType.List -> writer.setList(columnIndex, value as List<*>) - is TableColumnType.Set -> writer.setSet(columnIndex, (value as List<*>).toSet()) - is TableColumnType.Map -> writer.setMap(columnIndex, value as Map<*, *>) - } - } - - writer.endRow() - - rowCount++ - } - } finally { - writer.close() - } - - return rowCount - } - - override fun <T> asQueryable( - queryProvider: QueryProvider, - schema: SchemaPlus, - tableName: String, - ): Queryable<T> { - return object : AbstractTableQueryable<T>(queryProvider, schema, this@TraceTable, tableName) { - override fun enumerator(): Enumerator<T> { - val cancelFlag = AtomicBoolean(false) - return TraceReaderEnumerator( - this@TraceTable.table.newReader(), - this@TraceTable.table.columns, - cancelFlag, - ) - } - - override fun toString(): String = "TraceTableQueryable[table=$tableName]" - } - } - - override fun getModifiableCollection(): MutableCollection<Any?>? = null - - override fun toModificationRel( - cluster: RelOptCluster, - table: RelOptTable, - catalogReader: CatalogReader, - child: RelNode, - operation: TableModify.Operation, - updateColumnList: MutableList<String>?, - sourceExpressionList: MutableList<RexNode>?, - flattened: Boolean, - ): TableModify { - cluster.planner.addRule(TraceTableModifyRule.DEFAULT.toRule()) - - return LogicalTableModify.create( - table, - catalogReader, - child, - operation, - updateColumnList, - sourceExpressionList, - flattened, - ) - } - - override fun toString(): String = "TraceTable" - - private fun deduceRowType(typeFactory: JavaTypeFactory): RelDataType { - val types = mutableListOf<RelDataType>() - val names = mutableListOf<String>() - - for (column in table.columns) { - names.add(column.name) - types.add(mapType(typeFactory, column.type)) - } - - return typeFactory.createStructType(types, names) - } - - private fun mapType( - typeFactory: JavaTypeFactory, - type: TableColumnType, - ): RelDataType { - return when (type) { - is TableColumnType.Boolean -> typeFactory.createSqlType(SqlTypeName.BOOLEAN) - is TableColumnType.Int -> typeFactory.createSqlType(SqlTypeName.INTEGER) - is TableColumnType.Long -> typeFactory.createSqlType(SqlTypeName.BIGINT) - is TableColumnType.Float -> typeFactory.createSqlType(SqlTypeName.FLOAT) - is TableColumnType.Double -> typeFactory.createSqlType(SqlTypeName.DOUBLE) - is TableColumnType.String -> typeFactory.createSqlType(SqlTypeName.VARCHAR) - is TableColumnType.UUID -> typeFactory.createSqlType(SqlTypeName.BINARY, 16) - is TableColumnType.Instant -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP) - is TableColumnType.Duration -> typeFactory.createSqlType(SqlTypeName.BIGINT) - is TableColumnType.List -> typeFactory.createArrayType(mapType(typeFactory, type.elementType), -1) - is TableColumnType.Set -> typeFactory.createMultisetType(mapType(typeFactory, type.elementType), -1) - is TableColumnType.Map -> typeFactory.createMapType(mapType(typeFactory, type.keyType), mapType(typeFactory, type.valueType)) - } - } -} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt deleted file mode 100644 index eedff00d..00000000 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.calcite - -import org.apache.calcite.adapter.enumerable.EnumerableRel -import org.apache.calcite.adapter.enumerable.EnumerableRel.Prefer -import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor -import org.apache.calcite.adapter.enumerable.EnumerableTableScan -import org.apache.calcite.adapter.enumerable.JavaRowFormat -import org.apache.calcite.adapter.enumerable.PhysTypeImpl -import org.apache.calcite.adapter.java.JavaTypeFactory -import org.apache.calcite.linq4j.Enumerable -import org.apache.calcite.linq4j.tree.BlockBuilder -import org.apache.calcite.linq4j.tree.Expressions -import org.apache.calcite.linq4j.tree.Types -import org.apache.calcite.plan.RelOptCluster -import org.apache.calcite.plan.RelOptCost -import org.apache.calcite.plan.RelOptPlanner -import org.apache.calcite.plan.RelOptTable -import org.apache.calcite.plan.RelTraitSet -import org.apache.calcite.prepare.Prepare -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.core.TableModify -import org.apache.calcite.rel.metadata.RelMetadataQuery -import org.apache.calcite.rex.RexNode -import org.apache.calcite.schema.ModifiableTable -import org.apache.calcite.util.BuiltInMethod -import java.lang.reflect.Method - -/** - * A [TableModify] expression that modifies a workload trace. - */ -internal class TraceTableModify( - cluster: RelOptCluster, - traitSet: RelTraitSet, - table: RelOptTable, - schema: Prepare.CatalogReader, - input: RelNode, - operation: Operation, - updateColumnList: List<String>?, - sourceExpressionList: List<RexNode>?, - flattened: Boolean, -) : TableModify(cluster, traitSet, table, schema, input, operation, updateColumnList, sourceExpressionList, flattened), - EnumerableRel { - init { - // Make sure the table is modifiable - table.unwrap(ModifiableTable::class.java) ?: throw AssertionError() // TODO: user error in validator - } - - override fun copy( - traitSet: RelTraitSet, - inputs: List<RelNode>?, - ): RelNode { - return TraceTableModify( - cluster, - traitSet, - table, - getCatalogReader(), - sole(inputs), - operation, - updateColumnList, - sourceExpressionList, - isFlattened, - ) - } - - override fun computeSelfCost( - planner: RelOptPlanner, - mq: RelMetadataQuery?, - ): RelOptCost { - // Prefer this plan compared to the standard EnumerableTableModify. - return super.computeSelfCost(planner, mq)!!.multiplyBy(.1) - } - - override fun implement( - implementor: EnumerableRelImplementor, - pref: Prefer, - ): EnumerableRel.Result { - val builder = BlockBuilder() - val result = implementor.visitChild(this, 0, getInput() as EnumerableRel, pref) - val childExp = builder.append("child", result.block) - val convertedChildExpr = - if (getInput().rowType != rowType) { - val typeFactory = cluster.typeFactory as JavaTypeFactory - val format = EnumerableTableScan.deduceFormat(table) - val physType = PhysTypeImpl.of(typeFactory, table.rowType, format) - val childPhysType = result.physType - val o = Expressions.parameter(childPhysType.javaRowType, "o") - val expressionList = - List(childPhysType.rowType.fieldCount) { i -> - childPhysType.fieldReference(o, i, physType.getJavaFieldType(i)) - } - - builder.append( - "convertedChild", - Expressions.call( - childExp, - BuiltInMethod.SELECT.method, - Expressions.lambda<org.apache.calcite.linq4j.function.Function<*>>(physType.record(expressionList), o), - ), - ) - } else { - childExp - } - - if (!isInsert) { - throw UnsupportedOperationException("Deletion and update not supported") - } - - val expression = table.getExpression(InsertableTable::class.java) - builder.add( - Expressions.return_( - null, - Expressions.call( - BuiltInMethod.SINGLETON_ENUMERABLE.method, - Expressions.call( - Long::class.java, - expression, - INSERT_METHOD, - convertedChildExpr, - ), - ), - ), - ) - - val rowFormat = if (pref === Prefer.ARRAY) JavaRowFormat.ARRAY else JavaRowFormat.SCALAR - val physType = PhysTypeImpl.of(implementor.typeFactory, getRowType(), rowFormat) - return implementor.result(physType, builder.toBlock()) - } - - private companion object { - /** - * Reference to [InsertableTable.insert] method. - */ - val INSERT_METHOD: Method = Types.lookupMethod(InsertableTable::class.java, "insert", Enumerable::class.java) - } -} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt deleted file mode 100644 index 9c560984..00000000 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.calcite - -import org.apache.calcite.adapter.enumerable.EnumerableConvention -import org.apache.calcite.plan.Convention -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.core.TableModify -import org.apache.calcite.rel.logical.LogicalTableModify -import org.apache.calcite.schema.ModifiableTable - -/** - * A [ConverterRule] from a [LogicalTableModify] to a [TraceTableModify]. - */ -internal class TraceTableModifyRule(config: Config) : ConverterRule(config) { - override fun convert(rel: RelNode): RelNode? { - val modify = rel as TableModify - val table = modify.table!! - - // Make sure that the table is modifiable - if (table.unwrap(ModifiableTable::class.java) == null) { - return null - } - - val traitSet = modify.traitSet.replace(EnumerableConvention.INSTANCE) - return TraceTableModify( - modify.cluster, traitSet, - table, - modify.catalogReader, - convert(modify.input, traitSet), - modify.operation, - modify.updateColumnList, - modify.sourceExpressionList, - modify.isFlattened, - ) - } - - companion object { - /** Default configuration. */ - val DEFAULT: Config = - Config.INSTANCE - .withConversion( - LogicalTableModify::class.java, - Convention.NONE, - EnumerableConvention.INSTANCE, - "TraceTableModificationRule", - ) - .withRuleFactory { config: Config -> TraceTableModifyRule(config) } - } -} |
