summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-calcite/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-trace/opendc-trace-calcite/src/main')
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt39
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt4
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt107
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt138
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt65
5 files changed, 344 insertions, 9 deletions
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt
new file mode 100644
index 00000000..9c7b69a2
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.linq4j.Enumerable
+import org.apache.calcite.schema.Table
+
+/**
+ * A Calcite [Table] to which rows can be inserted.
+ */
+internal interface InsertableTable : Table {
+ /**
+ * Insert [rows] into this table.
+ *
+ * @param rows The rows to insert into the table.
+ * @return The number of rows inserted.
+ */
+ fun insert(rows: Enumerable<Array<Any?>>): Long
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt
index 298a59dc..3249546d 100644
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt
@@ -33,7 +33,9 @@ import org.opendc.trace.Trace
* @param trace The [Trace] to create a schema for.
*/
public class TraceSchema(private val trace: Trace) : AbstractSchema() {
-
+ /**
+ * The [Table]s that belong to this schema.
+ */
private val tables: Map<String, TraceTable> by lazy {
trace.tables.associateWith {
val table = checkNotNull(trace.getTable(it)) { "Unexpected null table" }
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
index 8c3fe4e2..af521297 100644
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
@@ -23,15 +23,23 @@
package org.opendc.trace.calcite
import org.apache.calcite.DataContext
+import org.apache.calcite.adapter.java.AbstractQueryableTable
import org.apache.calcite.adapter.java.JavaTypeFactory
-import org.apache.calcite.linq4j.AbstractEnumerable
-import org.apache.calcite.linq4j.Enumerable
-import org.apache.calcite.linq4j.Enumerator
+import org.apache.calcite.linq4j.*
+import org.apache.calcite.plan.RelOptCluster
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.prepare.Prepare.CatalogReader
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableModify
+import org.apache.calcite.rel.logical.LogicalTableModify
import org.apache.calcite.rel.type.RelDataType
import org.apache.calcite.rel.type.RelDataTypeFactory
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.schema.ModifiableTable
import org.apache.calcite.schema.ScannableTable
+import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.schema.Table
-import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.schema.impl.AbstractTableQueryable
import org.apache.calcite.sql.type.SqlTypeName
import java.time.Duration
import java.time.Instant
@@ -40,8 +48,11 @@ import java.util.concurrent.atomic.AtomicBoolean
/**
* A Calcite [Table] that exposes an OpenDC [org.opendc.trace.Table] as SQL table.
*/
-internal class TraceTable(private val table: org.opendc.trace.Table) : AbstractTable(), ScannableTable {
-
+internal class TraceTable(private val table: org.opendc.trace.Table) :
+ AbstractQueryableTable(Array<Any?>::class.java),
+ ScannableTable,
+ ModifiableTable,
+ InsertableTable {
private var rowType: RelDataType? = null
override fun getRowType(typeFactory: RelDataTypeFactory): RelDataType {
@@ -57,8 +68,88 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : AbstractT
override fun scan(root: DataContext): Enumerable<Array<Any?>> {
val cancelFlag = DataContext.Variable.CANCEL_FLAG.get<AtomicBoolean>(root)
return object : AbstractEnumerable<Array<Any?>>() {
- override fun enumerator(): Enumerator<Array<Any?>> = TraceReaderEnumerator(table.newReader(), table.columns, cancelFlag)
+ override fun enumerator(): Enumerator<Array<Any?>> =
+ TraceReaderEnumerator(table.newReader(), table.columns, cancelFlag)
+ }
+ }
+
+ override fun insert(rows: Enumerable<Array<Any?>>): Long {
+ val table = table
+ val columns = table.columns
+ val writer = table.newWriter()
+ val columnIndices = columns.map { writer.resolve(it) }.toIntArray()
+ var rowCount = 0L
+
+ try {
+ for (row in rows) {
+ writer.startRow()
+
+ for ((index, value) in row.withIndex()) {
+ if (value == null) {
+ continue
+ }
+ val columnType = columns[index].type
+
+ writer.set(
+ columnIndices[index],
+ when (columnType) {
+ Duration::class.java -> Duration.ofMillis(value as Long)
+ Instant::class.java -> Instant.ofEpochMilli(value as Long)
+ Set::class.java -> (value as List<*>).toSet()
+ else -> value
+ }
+ )
+ }
+
+ writer.endRow()
+
+ rowCount++
+ }
+ } finally {
+ writer.close()
}
+
+ return rowCount
+ }
+
+ override fun <T> asQueryable(queryProvider: QueryProvider, schema: SchemaPlus, tableName: String): Queryable<T> {
+ return object : AbstractTableQueryable<T>(queryProvider, schema, this@TraceTable, tableName) {
+ override fun enumerator(): Enumerator<T> {
+ val cancelFlag = AtomicBoolean(false)
+ return TraceReaderEnumerator(
+ this@TraceTable.table.newReader(),
+ this@TraceTable.table.columns,
+ cancelFlag
+ )
+ }
+
+ override fun toString(): String = "TraceTableQueryable[table=$tableName]"
+ }
+ }
+
+ override fun getModifiableCollection(): MutableCollection<Any?>? = null
+
+ override fun toModificationRel(
+ cluster: RelOptCluster,
+ table: RelOptTable,
+ catalogReader: CatalogReader,
+ child: RelNode,
+ operation: TableModify.Operation,
+ updateColumnList: MutableList<String>?,
+ sourceExpressionList: MutableList<RexNode>?,
+ flattened: Boolean
+ ): TableModify {
+ cluster.planner.addRule(TraceTableModifyRule.DEFAULT.toRule())
+
+ return LogicalTableModify.create(
+ table,
+ catalogReader,
+ child,
+ operation,
+ updateColumnList,
+ sourceExpressionList,
+ flattened
+ )
}
override fun toString(): String = "TraceTable"
@@ -73,7 +164,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : AbstractT
when (column.type) {
Instant::class.java -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP)
Duration::class.java -> typeFactory.createSqlType(SqlTypeName.BIGINT)
- Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.ANY), -1)
+ Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.UNKNOWN), -1)
else -> typeFactory.createType(column.type)
}
)
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt
new file mode 100644
index 00000000..64dc0cea
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt
@@ -0,0 +1,138 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.adapter.enumerable.*
+import org.apache.calcite.adapter.enumerable.EnumerableRel.Prefer
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.linq4j.Enumerable
+import org.apache.calcite.linq4j.tree.BlockBuilder
+import org.apache.calcite.linq4j.tree.Expressions
+import org.apache.calcite.linq4j.tree.Types
+import org.apache.calcite.plan.*
+import org.apache.calcite.prepare.Prepare
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableModify
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.schema.ModifiableTable
+import org.apache.calcite.util.BuiltInMethod
+import java.lang.reflect.Method
+
+/**
+ * A [TableModify] expression that modifies a workload trace.
+ */
+internal class TraceTableModify(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ table: RelOptTable,
+ schema: Prepare.CatalogReader,
+ input: RelNode,
+ operation: Operation,
+ updateColumnList: List<String>?,
+ sourceExpressionList: List<RexNode>?,
+ flattened: Boolean
+) : TableModify(cluster, traitSet, table, schema, input, operation, updateColumnList, sourceExpressionList, flattened),
+ EnumerableRel {
+ init {
+ // Make sure the table is modifiable
+ table.unwrap(ModifiableTable::class.java) ?: throw AssertionError() // TODO: user error in validator
+ }
+
+ override fun copy(traitSet: RelTraitSet, inputs: List<RelNode>?): RelNode {
+ return TraceTableModify(
+ cluster,
+ traitSet,
+ table,
+ getCatalogReader(),
+ sole(inputs),
+ operation,
+ updateColumnList,
+ sourceExpressionList,
+ isFlattened
+ )
+ }
+
+ override fun computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery?): RelOptCost {
+ // Prefer this plan compared to the standard EnumerableTableModify.
+ return super.computeSelfCost(planner, mq)!!.multiplyBy(.1)
+ }
+
+ override fun implement(implementor: EnumerableRelImplementor, pref: Prefer): EnumerableRel.Result {
+ val builder = BlockBuilder()
+ val result = implementor.visitChild(this, 0, getInput() as EnumerableRel, pref)
+ val childExp = builder.append("child", result.block)
+ val convertedChildExpr = if (getInput().rowType != rowType) {
+ val typeFactory = cluster.typeFactory as JavaTypeFactory
+ val format = EnumerableTableScan.deduceFormat(table)
+ val physType = PhysTypeImpl.of(typeFactory, table.rowType, format)
+ val childPhysType = result.physType
+ val o = Expressions.parameter(childPhysType.javaRowType, "o")
+ val expressionList = List(childPhysType.rowType.fieldCount) { i ->
+ childPhysType.fieldReference(o, i, physType.getJavaFieldType(i))
+ }
+
+ builder.append(
+ "convertedChild",
+ Expressions.call(
+ childExp,
+ BuiltInMethod.SELECT.method,
+ Expressions.lambda<org.apache.calcite.linq4j.function.Function<*>>(physType.record(expressionList), o)
+ )
+ )
+ } else {
+ childExp
+ }
+
+ if (!isInsert) {
+ throw UnsupportedOperationException("Deletion and update not supported")
+ }
+
+ val expression = table.getExpression(InsertableTable::class.java)
+ builder.add(
+ Expressions.return_(
+ null,
+ Expressions.call(
+ BuiltInMethod.SINGLETON_ENUMERABLE.method,
+ Expressions.call(
+ Long::class.java,
+ expression,
+ INSERT_METHOD,
+ convertedChildExpr,
+ )
+ )
+ )
+ )
+
+ val rowFormat = if (pref === Prefer.ARRAY) JavaRowFormat.ARRAY else JavaRowFormat.SCALAR
+ val physType = PhysTypeImpl.of(implementor.typeFactory, getRowType(), rowFormat)
+ return implementor.result(physType, builder.toBlock())
+ }
+
+ private companion object {
+ /**
+ * Reference to [InsertableTable.insert] method.
+ */
+ val INSERT_METHOD: Method = Types.lookupMethod(InsertableTable::class.java, "insert", Enumerable::class.java)
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt
new file mode 100644
index 00000000..7572e381
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention
+import org.apache.calcite.plan.Convention
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableModify
+import org.apache.calcite.rel.logical.LogicalTableModify
+import org.apache.calcite.schema.ModifiableTable
+
+/**
+ * A [ConverterRule] from a [LogicalTableModify] to a [TraceTableModify].
+ */
+internal class TraceTableModifyRule(config: Config) : ConverterRule(config) {
+ override fun convert(rel: RelNode): RelNode? {
+ val modify = rel as TableModify
+ val table = modify.table!!
+
+ // Make sure that the table is modifiable
+ if (table.unwrap(ModifiableTable::class.java) == null) {
+ return null
+ }
+
+ val traitSet = modify.traitSet.replace(EnumerableConvention.INSTANCE)
+ return TraceTableModify(
+ modify.cluster, traitSet,
+ table,
+ modify.catalogReader,
+ convert(modify.input, traitSet),
+ modify.operation,
+ modify.updateColumnList,
+ modify.sourceExpressionList,
+ modify.isFlattened
+ )
+ }
+
+ companion object {
+ /** Default configuration. */
+ val DEFAULT: Config = Config.INSTANCE
+ .withConversion(LogicalTableModify::class.java, Convention.NONE, EnumerableConvention.INSTANCE, "TraceTableModificationRule")
+ .withRuleFactory { config: Config -> TraceTableModifyRule(config) }
+ }
+}