summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-calcite/src/main
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2025-07-22 15:47:44 +0200
committerGitHub <noreply@github.com>2025-07-22 15:47:44 +0200
commite22c97dcca7478d6941b78bdf7cd873bc0d23cdc (patch)
treef1859c16f4c7973d8b16ed693caad4c749d42331 /opendc-trace/opendc-trace-calcite/src/main
parent0c0cf25616771cd40a9e401edcba4a5e5016f76e (diff)
Updated workload schema (#360)
Diffstat (limited to 'opendc-trace/opendc-trace-calcite/src/main')
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt39
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt118
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt47
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt54
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt214
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt157
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt71
7 files changed, 0 insertions, 700 deletions
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt
deleted file mode 100644
index 9c7b69a2..00000000
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.calcite
-
-import org.apache.calcite.linq4j.Enumerable
-import org.apache.calcite.schema.Table
-
-/**
- * A Calcite [Table] to which rows can be inserted.
- */
-internal interface InsertableTable : Table {
- /**
- * Insert [rows] into this table.
- *
- * @param rows The rows to insert into the table.
- * @return The number of rows inserted.
- */
- fun insert(rows: Enumerable<Array<Any?>>): Long
-}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
deleted file mode 100644
index eed52ab3..00000000
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.calcite
-
-import org.apache.calcite.linq4j.Enumerator
-import org.opendc.trace.TableColumn
-import org.opendc.trace.TableColumnType
-import org.opendc.trace.TableReader
-import java.nio.ByteBuffer
-import java.nio.ByteOrder
-import java.util.concurrent.atomic.AtomicBoolean
-
-/**
- * An [Enumerator] for a [TableReader].
- */
-internal class TraceReaderEnumerator<E>(
- private val reader: TableReader,
- private val columns: List<TableColumn>,
- private val cancelFlag: AtomicBoolean,
-) : Enumerator<E> {
- private val columnIndices = columns.map { reader.resolve(it.name) }.toIntArray()
- private var current: E? = null
-
- override fun moveNext(): Boolean {
- if (cancelFlag.get()) {
- return false
- }
-
- val reader = reader
- val res = reader.nextRow()
-
- if (res) {
- @Suppress("UNCHECKED_CAST")
- current = convertRow(reader) as E
- } else {
- current = null
- }
-
- return res
- }
-
- override fun current(): E = checkNotNull(current)
-
- override fun reset() {
- throw UnsupportedOperationException()
- }
-
- override fun close() {
- reader.close()
- }
-
- private fun convertRow(reader: TableReader): Array<Any?> {
- val res = arrayOfNulls<Any?>(columns.size)
- val columnIndices = columnIndices
-
- for ((index, column) in columns.withIndex()) {
- val columnIndex = columnIndices[index]
- res[index] = convertColumn(reader, column, columnIndex)
- }
- return res
- }
-
- private fun convertColumn(
- reader: TableReader,
- column: TableColumn,
- columnIndex: Int,
- ): Any? {
- return when (column.type) {
- is TableColumnType.Boolean -> reader.getBoolean(columnIndex)
- is TableColumnType.Int -> reader.getInt(columnIndex)
- is TableColumnType.Long -> reader.getLong(columnIndex)
- is TableColumnType.Float -> reader.getFloat(columnIndex)
- is TableColumnType.Double -> reader.getDouble(columnIndex)
- is TableColumnType.String -> reader.getString(columnIndex)
- is TableColumnType.UUID -> {
- val uuid = reader.getUUID(columnIndex)
-
- if (uuid != null) {
- val uuidBytes = ByteArray(16)
-
- ByteBuffer.wrap(uuidBytes)
- .order(ByteOrder.BIG_ENDIAN)
- .putLong(uuid.mostSignificantBits)
- .putLong(uuid.leastSignificantBits)
-
- uuidBytes
- } else {
- null
- }
- }
- is TableColumnType.Instant -> reader.getInstant(columnIndex)?.toEpochMilli()
- is TableColumnType.Duration -> reader.getDuration(columnIndex)?.toMillis() ?: 0
- is TableColumnType.List -> reader.getList(columnIndex, Any::class.java)?.toTypedArray()
- is TableColumnType.Set -> reader.getSet(columnIndex, Any::class.java)?.toTypedArray()
- is TableColumnType.Map -> reader.getMap(columnIndex, Any::class.java, Any::class.java)
- }
- }
-}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt
deleted file mode 100644
index 3249546d..00000000
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.calcite
-
-import org.apache.calcite.schema.Schema
-import org.apache.calcite.schema.Table
-import org.apache.calcite.schema.impl.AbstractSchema
-import org.opendc.trace.Trace
-
-/**
- * A Calcite [Schema] that exposes an OpenDC [Trace] into multiple SQL tables.
- *
- * @param trace The [Trace] to create a schema for.
- */
-public class TraceSchema(private val trace: Trace) : AbstractSchema() {
- /**
- * The [Table]s that belong to this schema.
- */
- private val tables: Map<String, TraceTable> by lazy {
- trace.tables.associateWith {
- val table = checkNotNull(trace.getTable(it)) { "Unexpected null table" }
- TraceTable(table)
- }
- }
-
- override fun getTableMap(): Map<String, Table> = tables
-}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt
deleted file mode 100644
index cbf7ec43..00000000
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.calcite
-
-import org.apache.calcite.model.ModelHandler
-import org.apache.calcite.schema.Schema
-import org.apache.calcite.schema.SchemaFactory
-import org.apache.calcite.schema.SchemaPlus
-import org.opendc.trace.Trace
-import java.io.File
-import java.nio.file.Paths
-
-/**
- * Factory that creates a [TraceSchema].
- *
- * This factory allows users to include a schema that references a trace in a `model.json` file.
- */
-public class TraceSchemaFactory : SchemaFactory {
- override fun create(
- parentSchema: SchemaPlus,
- name: String,
- operand: Map<String, Any>,
- ): Schema {
- val base = operand[ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName] as File?
- val pathParam = requireNotNull(operand["path"]) { "Trace path not specified" } as String
- val path = if (base != null) File(base, pathParam).toPath() else Paths.get(pathParam)
-
- val format = requireNotNull(operand["format"]) { "Trace format not specified" } as String
- val create = operand.getOrDefault("create", false) as Boolean
-
- val trace = if (create) Trace.create(path, format) else Trace.open(path, format)
- return TraceSchema(trace)
- }
-}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
deleted file mode 100644
index e74d2ee8..00000000
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.calcite
-
-import org.apache.calcite.DataContext
-import org.apache.calcite.adapter.java.AbstractQueryableTable
-import org.apache.calcite.adapter.java.JavaTypeFactory
-import org.apache.calcite.linq4j.AbstractEnumerable
-import org.apache.calcite.linq4j.Enumerable
-import org.apache.calcite.linq4j.Enumerator
-import org.apache.calcite.linq4j.QueryProvider
-import org.apache.calcite.linq4j.Queryable
-import org.apache.calcite.plan.RelOptCluster
-import org.apache.calcite.plan.RelOptTable
-import org.apache.calcite.prepare.Prepare.CatalogReader
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.TableModify
-import org.apache.calcite.rel.logical.LogicalTableModify
-import org.apache.calcite.rel.type.RelDataType
-import org.apache.calcite.rel.type.RelDataTypeFactory
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.schema.ModifiableTable
-import org.apache.calcite.schema.ProjectableFilterableTable
-import org.apache.calcite.schema.SchemaPlus
-import org.apache.calcite.schema.impl.AbstractTableQueryable
-import org.apache.calcite.sql.type.SqlTypeName
-import org.opendc.trace.TableColumnType
-import java.nio.ByteBuffer
-import java.time.Duration
-import java.time.Instant
-import java.util.UUID
-import java.util.concurrent.atomic.AtomicBoolean
-
-/**
- * A Calcite [Table] that exposes an OpenDC [org.opendc.trace.Table] as SQL table.
- */
-internal class TraceTable(private val table: org.opendc.trace.Table) :
- AbstractQueryableTable(Array<Any?>::class.java),
- ProjectableFilterableTable,
- ModifiableTable,
- InsertableTable {
- private var rowType: RelDataType? = null
-
- override fun getRowType(typeFactory: RelDataTypeFactory): RelDataType {
- var rowType = rowType
- if (rowType == null) {
- rowType = deduceRowType(typeFactory as JavaTypeFactory)
- this.rowType = rowType
- }
-
- return rowType
- }
-
- override fun scan(
- root: DataContext,
- filters: MutableList<RexNode>,
- projects: IntArray?,
- ): Enumerable<Array<Any?>> {
- // Filters are currently not supported by the OpenDC trace API. By keeping the filters in the list, Calcite
- // assumes that they are declined and will perform the filters itself.
-
- val projection = projects?.map { table.columns[it] }
- val cancelFlag = DataContext.Variable.CANCEL_FLAG.get<AtomicBoolean>(root)
- return object : AbstractEnumerable<Array<Any?>>() {
- override fun enumerator(): Enumerator<Array<Any?>> =
- TraceReaderEnumerator(table.newReader(projection?.map { it.name }), projection ?: table.columns, cancelFlag)
- }
- }
-
- override fun insert(rows: Enumerable<Array<Any?>>): Long {
- val table = table
- val columns = table.columns
- val writer = table.newWriter()
- val columnIndices = columns.map { writer.resolve(it.name) }.toIntArray()
- var rowCount = 0L
-
- try {
- for (row in rows) {
- writer.startRow()
-
- for ((index, value) in row.withIndex()) {
- if (value == null) {
- continue
- }
- val columnType = columns[index].type
- val columnIndex = columnIndices[index]
- when (columnType) {
- is TableColumnType.Boolean -> writer.setBoolean(columnIndex, value as Boolean)
- is TableColumnType.Int -> writer.setInt(columnIndex, value as Int)
- is TableColumnType.Long -> writer.setLong(columnIndex, value as Long)
- is TableColumnType.Float -> writer.setFloat(columnIndex, value as Float)
- is TableColumnType.Double -> writer.setDouble(columnIndex, value as Double)
- is TableColumnType.String -> writer.setString(columnIndex, value as String)
- is TableColumnType.UUID -> {
- val bb = ByteBuffer.wrap(value as ByteArray)
- writer.setUUID(columnIndex, UUID(bb.getLong(), bb.getLong()))
- }
- is TableColumnType.Instant -> writer.setInstant(columnIndex, Instant.ofEpochMilli(value as Long))
- is TableColumnType.Duration -> writer.setDuration(columnIndex, Duration.ofMillis(value as Long))
- is TableColumnType.List -> writer.setList(columnIndex, value as List<*>)
- is TableColumnType.Set -> writer.setSet(columnIndex, (value as List<*>).toSet())
- is TableColumnType.Map -> writer.setMap(columnIndex, value as Map<*, *>)
- }
- }
-
- writer.endRow()
-
- rowCount++
- }
- } finally {
- writer.close()
- }
-
- return rowCount
- }
-
- override fun <T> asQueryable(
- queryProvider: QueryProvider,
- schema: SchemaPlus,
- tableName: String,
- ): Queryable<T> {
- return object : AbstractTableQueryable<T>(queryProvider, schema, this@TraceTable, tableName) {
- override fun enumerator(): Enumerator<T> {
- val cancelFlag = AtomicBoolean(false)
- return TraceReaderEnumerator(
- this@TraceTable.table.newReader(),
- this@TraceTable.table.columns,
- cancelFlag,
- )
- }
-
- override fun toString(): String = "TraceTableQueryable[table=$tableName]"
- }
- }
-
- override fun getModifiableCollection(): MutableCollection<Any?>? = null
-
- override fun toModificationRel(
- cluster: RelOptCluster,
- table: RelOptTable,
- catalogReader: CatalogReader,
- child: RelNode,
- operation: TableModify.Operation,
- updateColumnList: MutableList<String>?,
- sourceExpressionList: MutableList<RexNode>?,
- flattened: Boolean,
- ): TableModify {
- cluster.planner.addRule(TraceTableModifyRule.DEFAULT.toRule())
-
- return LogicalTableModify.create(
- table,
- catalogReader,
- child,
- operation,
- updateColumnList,
- sourceExpressionList,
- flattened,
- )
- }
-
- override fun toString(): String = "TraceTable"
-
- private fun deduceRowType(typeFactory: JavaTypeFactory): RelDataType {
- val types = mutableListOf<RelDataType>()
- val names = mutableListOf<String>()
-
- for (column in table.columns) {
- names.add(column.name)
- types.add(mapType(typeFactory, column.type))
- }
-
- return typeFactory.createStructType(types, names)
- }
-
- private fun mapType(
- typeFactory: JavaTypeFactory,
- type: TableColumnType,
- ): RelDataType {
- return when (type) {
- is TableColumnType.Boolean -> typeFactory.createSqlType(SqlTypeName.BOOLEAN)
- is TableColumnType.Int -> typeFactory.createSqlType(SqlTypeName.INTEGER)
- is TableColumnType.Long -> typeFactory.createSqlType(SqlTypeName.BIGINT)
- is TableColumnType.Float -> typeFactory.createSqlType(SqlTypeName.FLOAT)
- is TableColumnType.Double -> typeFactory.createSqlType(SqlTypeName.DOUBLE)
- is TableColumnType.String -> typeFactory.createSqlType(SqlTypeName.VARCHAR)
- is TableColumnType.UUID -> typeFactory.createSqlType(SqlTypeName.BINARY, 16)
- is TableColumnType.Instant -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP)
- is TableColumnType.Duration -> typeFactory.createSqlType(SqlTypeName.BIGINT)
- is TableColumnType.List -> typeFactory.createArrayType(mapType(typeFactory, type.elementType), -1)
- is TableColumnType.Set -> typeFactory.createMultisetType(mapType(typeFactory, type.elementType), -1)
- is TableColumnType.Map -> typeFactory.createMapType(mapType(typeFactory, type.keyType), mapType(typeFactory, type.valueType))
- }
- }
-}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt
deleted file mode 100644
index eedff00d..00000000
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.calcite
-
-import org.apache.calcite.adapter.enumerable.EnumerableRel
-import org.apache.calcite.adapter.enumerable.EnumerableRel.Prefer
-import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor
-import org.apache.calcite.adapter.enumerable.EnumerableTableScan
-import org.apache.calcite.adapter.enumerable.JavaRowFormat
-import org.apache.calcite.adapter.enumerable.PhysTypeImpl
-import org.apache.calcite.adapter.java.JavaTypeFactory
-import org.apache.calcite.linq4j.Enumerable
-import org.apache.calcite.linq4j.tree.BlockBuilder
-import org.apache.calcite.linq4j.tree.Expressions
-import org.apache.calcite.linq4j.tree.Types
-import org.apache.calcite.plan.RelOptCluster
-import org.apache.calcite.plan.RelOptCost
-import org.apache.calcite.plan.RelOptPlanner
-import org.apache.calcite.plan.RelOptTable
-import org.apache.calcite.plan.RelTraitSet
-import org.apache.calcite.prepare.Prepare
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.TableModify
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.schema.ModifiableTable
-import org.apache.calcite.util.BuiltInMethod
-import java.lang.reflect.Method
-
-/**
- * A [TableModify] expression that modifies a workload trace.
- */
-internal class TraceTableModify(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- table: RelOptTable,
- schema: Prepare.CatalogReader,
- input: RelNode,
- operation: Operation,
- updateColumnList: List<String>?,
- sourceExpressionList: List<RexNode>?,
- flattened: Boolean,
-) : TableModify(cluster, traitSet, table, schema, input, operation, updateColumnList, sourceExpressionList, flattened),
- EnumerableRel {
- init {
- // Make sure the table is modifiable
- table.unwrap(ModifiableTable::class.java) ?: throw AssertionError() // TODO: user error in validator
- }
-
- override fun copy(
- traitSet: RelTraitSet,
- inputs: List<RelNode>?,
- ): RelNode {
- return TraceTableModify(
- cluster,
- traitSet,
- table,
- getCatalogReader(),
- sole(inputs),
- operation,
- updateColumnList,
- sourceExpressionList,
- isFlattened,
- )
- }
-
- override fun computeSelfCost(
- planner: RelOptPlanner,
- mq: RelMetadataQuery?,
- ): RelOptCost {
- // Prefer this plan compared to the standard EnumerableTableModify.
- return super.computeSelfCost(planner, mq)!!.multiplyBy(.1)
- }
-
- override fun implement(
- implementor: EnumerableRelImplementor,
- pref: Prefer,
- ): EnumerableRel.Result {
- val builder = BlockBuilder()
- val result = implementor.visitChild(this, 0, getInput() as EnumerableRel, pref)
- val childExp = builder.append("child", result.block)
- val convertedChildExpr =
- if (getInput().rowType != rowType) {
- val typeFactory = cluster.typeFactory as JavaTypeFactory
- val format = EnumerableTableScan.deduceFormat(table)
- val physType = PhysTypeImpl.of(typeFactory, table.rowType, format)
- val childPhysType = result.physType
- val o = Expressions.parameter(childPhysType.javaRowType, "o")
- val expressionList =
- List(childPhysType.rowType.fieldCount) { i ->
- childPhysType.fieldReference(o, i, physType.getJavaFieldType(i))
- }
-
- builder.append(
- "convertedChild",
- Expressions.call(
- childExp,
- BuiltInMethod.SELECT.method,
- Expressions.lambda<org.apache.calcite.linq4j.function.Function<*>>(physType.record(expressionList), o),
- ),
- )
- } else {
- childExp
- }
-
- if (!isInsert) {
- throw UnsupportedOperationException("Deletion and update not supported")
- }
-
- val expression = table.getExpression(InsertableTable::class.java)
- builder.add(
- Expressions.return_(
- null,
- Expressions.call(
- BuiltInMethod.SINGLETON_ENUMERABLE.method,
- Expressions.call(
- Long::class.java,
- expression,
- INSERT_METHOD,
- convertedChildExpr,
- ),
- ),
- ),
- )
-
- val rowFormat = if (pref === Prefer.ARRAY) JavaRowFormat.ARRAY else JavaRowFormat.SCALAR
- val physType = PhysTypeImpl.of(implementor.typeFactory, getRowType(), rowFormat)
- return implementor.result(physType, builder.toBlock())
- }
-
- private companion object {
- /**
- * Reference to [InsertableTable.insert] method.
- */
- val INSERT_METHOD: Method = Types.lookupMethod(InsertableTable::class.java, "insert", Enumerable::class.java)
- }
-}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt
deleted file mode 100644
index 9c560984..00000000
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.trace.calcite
-
-import org.apache.calcite.adapter.enumerable.EnumerableConvention
-import org.apache.calcite.plan.Convention
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.TableModify
-import org.apache.calcite.rel.logical.LogicalTableModify
-import org.apache.calcite.schema.ModifiableTable
-
-/**
- * A [ConverterRule] from a [LogicalTableModify] to a [TraceTableModify].
- */
-internal class TraceTableModifyRule(config: Config) : ConverterRule(config) {
- override fun convert(rel: RelNode): RelNode? {
- val modify = rel as TableModify
- val table = modify.table!!
-
- // Make sure that the table is modifiable
- if (table.unwrap(ModifiableTable::class.java) == null) {
- return null
- }
-
- val traitSet = modify.traitSet.replace(EnumerableConvention.INSTANCE)
- return TraceTableModify(
- modify.cluster, traitSet,
- table,
- modify.catalogReader,
- convert(modify.input, traitSet),
- modify.operation,
- modify.updateColumnList,
- modify.sourceExpressionList,
- modify.isFlattened,
- )
- }
-
- companion object {
- /** Default configuration. */
- val DEFAULT: Config =
- Config.INSTANCE
- .withConversion(
- LogicalTableModify::class.java,
- Convention.NONE,
- EnumerableConvention.INSTANCE,
- "TraceTableModificationRule",
- )
- .withRuleFactory { config: Config -> TraceTableModifyRule(config) }
- }
-}