summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-calcite/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-04-29 22:55:07 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-04-30 17:45:52 +0200
commit157fc322e6e68eb77177d13844e9793aeb6e8850 (patch)
tree3391ac66642dcb40fe70ce9a7a3700a94ed1f627 /opendc-trace/opendc-trace-calcite/src/main
parent412d8d597511122f114d69a4ba64c6b55dd192f9 (diff)
feat(trace/calcite): Add support for writing via SQL
This change updates the Apache Calcite integration to support writing workload traces via SQL. This enables custom conversion scripts between different workload traces.
Diffstat (limited to 'opendc-trace/opendc-trace-calcite/src/main')
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt39
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt4
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt107
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt138
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt65
5 files changed, 344 insertions, 9 deletions
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt
new file mode 100644
index 00000000..9c7b69a2
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.linq4j.Enumerable
+import org.apache.calcite.schema.Table
+
+/**
+ * A Calcite [Table] to which rows can be inserted.
+ */
+internal interface InsertableTable : Table {
+ /**
+ * Insert [rows] into this table.
+ *
+ * @param rows The rows to insert into the table.
+ * @return The number of rows inserted.
+ */
+ fun insert(rows: Enumerable<Array<Any?>>): Long
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt
index 298a59dc..3249546d 100644
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt
@@ -33,7 +33,9 @@ import org.opendc.trace.Trace
* @param trace The [Trace] to create a schema for.
*/
public class TraceSchema(private val trace: Trace) : AbstractSchema() {
-
+ /**
+ * The [Table]s that belong to this schema.
+ */
private val tables: Map<String, TraceTable> by lazy {
trace.tables.associateWith {
val table = checkNotNull(trace.getTable(it)) { "Unexpected null table" }
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
index 8c3fe4e2..af521297 100644
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
@@ -23,15 +23,23 @@
package org.opendc.trace.calcite
import org.apache.calcite.DataContext
+import org.apache.calcite.adapter.java.AbstractQueryableTable
import org.apache.calcite.adapter.java.JavaTypeFactory
-import org.apache.calcite.linq4j.AbstractEnumerable
-import org.apache.calcite.linq4j.Enumerable
-import org.apache.calcite.linq4j.Enumerator
+import org.apache.calcite.linq4j.*
+import org.apache.calcite.plan.RelOptCluster
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.prepare.Prepare.CatalogReader
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableModify
+import org.apache.calcite.rel.logical.LogicalTableModify
import org.apache.calcite.rel.type.RelDataType
import org.apache.calcite.rel.type.RelDataTypeFactory
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.schema.ModifiableTable
import org.apache.calcite.schema.ScannableTable
+import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.schema.Table
-import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.schema.impl.AbstractTableQueryable
import org.apache.calcite.sql.type.SqlTypeName
import java.time.Duration
import java.time.Instant
@@ -40,8 +48,11 @@ import java.util.concurrent.atomic.AtomicBoolean
/**
* A Calcite [Table] that exposes an OpenDC [org.opendc.trace.Table] as SQL table.
*/
-internal class TraceTable(private val table: org.opendc.trace.Table) : AbstractTable(), ScannableTable {
-
+internal class TraceTable(private val table: org.opendc.trace.Table) :
+ AbstractQueryableTable(Array<Any?>::class.java),
+ ScannableTable,
+ ModifiableTable,
+ InsertableTable {
private var rowType: RelDataType? = null
override fun getRowType(typeFactory: RelDataTypeFactory): RelDataType {
@@ -57,8 +68,88 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : AbstractT
override fun scan(root: DataContext): Enumerable<Array<Any?>> {
val cancelFlag = DataContext.Variable.CANCEL_FLAG.get<AtomicBoolean>(root)
return object : AbstractEnumerable<Array<Any?>>() {
- override fun enumerator(): Enumerator<Array<Any?>> = TraceReaderEnumerator(table.newReader(), table.columns, cancelFlag)
+ override fun enumerator(): Enumerator<Array<Any?>> =
+ TraceReaderEnumerator(table.newReader(), table.columns, cancelFlag)
+ }
+ }
+
+ override fun insert(rows: Enumerable<Array<Any?>>): Long {
+ val table = table
+ val columns = table.columns
+ val writer = table.newWriter()
+ val columnIndices = columns.map { writer.resolve(it) }.toIntArray()
+ var rowCount = 0L
+
+ try {
+ for (row in rows) {
+ writer.startRow()
+
+ for ((index, value) in row.withIndex()) {
+ if (value == null) {
+ continue
+ }
+ val columnType = columns[index].type
+
+ writer.set(
+ columnIndices[index],
+ when (columnType) {
+ Duration::class.java -> Duration.ofMillis(value as Long)
+ Instant::class.java -> Instant.ofEpochMilli(value as Long)
+ Set::class.java -> (value as List<*>).toSet()
+ else -> value
+ }
+ )
+ }
+
+ writer.endRow()
+
+ rowCount++
+ }
+ } finally {
+ writer.close()
}
+
+ return rowCount
+ }
+
+ override fun <T> asQueryable(queryProvider: QueryProvider, schema: SchemaPlus, tableName: String): Queryable<T> {
+ return object : AbstractTableQueryable<T>(queryProvider, schema, this@TraceTable, tableName) {
+ override fun enumerator(): Enumerator<T> {
+ val cancelFlag = AtomicBoolean(false)
+ return TraceReaderEnumerator(
+ this@TraceTable.table.newReader(),
+ this@TraceTable.table.columns,
+ cancelFlag
+ )
+ }
+
+ override fun toString(): String = "TraceTableQueryable[table=$tableName]"
+ }
+ }
+
+ override fun getModifiableCollection(): MutableCollection<Any?>? = null
+
+ override fun toModificationRel(
+ cluster: RelOptCluster,
+ table: RelOptTable,
+ catalogReader: CatalogReader,
+ child: RelNode,
+ operation: TableModify.Operation,
+ updateColumnList: MutableList<String>?,
+ sourceExpressionList: MutableList<RexNode>?,
+ flattened: Boolean
+ ): TableModify {
+ cluster.planner.addRule(TraceTableModifyRule.DEFAULT.toRule())
+
+ return LogicalTableModify.create(
+ table,
+ catalogReader,
+ child,
+ operation,
+ updateColumnList,
+ sourceExpressionList,
+ flattened
+ )
}
override fun toString(): String = "TraceTable"
@@ -73,7 +164,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : AbstractT
when (column.type) {
Instant::class.java -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP)
Duration::class.java -> typeFactory.createSqlType(SqlTypeName.BIGINT)
- Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.ANY), -1)
+ Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.UNKNOWN), -1)
else -> typeFactory.createType(column.type)
}
)
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt
new file mode 100644
index 00000000..64dc0cea
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt
@@ -0,0 +1,138 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.adapter.enumerable.*
+import org.apache.calcite.adapter.enumerable.EnumerableRel.Prefer
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.linq4j.Enumerable
+import org.apache.calcite.linq4j.tree.BlockBuilder
+import org.apache.calcite.linq4j.tree.Expressions
+import org.apache.calcite.linq4j.tree.Types
+import org.apache.calcite.plan.*
+import org.apache.calcite.prepare.Prepare
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableModify
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.schema.ModifiableTable
+import org.apache.calcite.util.BuiltInMethod
+import java.lang.reflect.Method
+
+/**
+ * A [TableModify] expression that modifies a workload trace.
+ */
+internal class TraceTableModify(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ table: RelOptTable,
+ schema: Prepare.CatalogReader,
+ input: RelNode,
+ operation: Operation,
+ updateColumnList: List<String>?,
+ sourceExpressionList: List<RexNode>?,
+ flattened: Boolean
+) : TableModify(cluster, traitSet, table, schema, input, operation, updateColumnList, sourceExpressionList, flattened),
+ EnumerableRel {
+ init {
+ // Make sure the table is modifiable
+ table.unwrap(ModifiableTable::class.java) ?: throw AssertionError() // TODO: user error in validator
+ }
+
+ override fun copy(traitSet: RelTraitSet, inputs: List<RelNode>?): RelNode {
+ return TraceTableModify(
+ cluster,
+ traitSet,
+ table,
+ getCatalogReader(),
+ sole(inputs),
+ operation,
+ updateColumnList,
+ sourceExpressionList,
+ isFlattened
+ )
+ }
+
+ override fun computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery?): RelOptCost {
+ // Prefer this plan compared to the standard EnumerableTableModify.
+ return super.computeSelfCost(planner, mq)!!.multiplyBy(.1)
+ }
+
+ override fun implement(implementor: EnumerableRelImplementor, pref: Prefer): EnumerableRel.Result {
+ val builder = BlockBuilder()
+ val result = implementor.visitChild(this, 0, getInput() as EnumerableRel, pref)
+ val childExp = builder.append("child", result.block)
+ val convertedChildExpr = if (getInput().rowType != rowType) {
+ val typeFactory = cluster.typeFactory as JavaTypeFactory
+ val format = EnumerableTableScan.deduceFormat(table)
+ val physType = PhysTypeImpl.of(typeFactory, table.rowType, format)
+ val childPhysType = result.physType
+ val o = Expressions.parameter(childPhysType.javaRowType, "o")
+ val expressionList = List(childPhysType.rowType.fieldCount) { i ->
+ childPhysType.fieldReference(o, i, physType.getJavaFieldType(i))
+ }
+
+ builder.append(
+ "convertedChild",
+ Expressions.call(
+ childExp,
+ BuiltInMethod.SELECT.method,
+ Expressions.lambda<org.apache.calcite.linq4j.function.Function<*>>(physType.record(expressionList), o)
+ )
+ )
+ } else {
+ childExp
+ }
+
+ if (!isInsert) {
+ throw UnsupportedOperationException("Deletion and update not supported")
+ }
+
+ val expression = table.getExpression(InsertableTable::class.java)
+ builder.add(
+ Expressions.return_(
+ null,
+ Expressions.call(
+ BuiltInMethod.SINGLETON_ENUMERABLE.method,
+ Expressions.call(
+ Long::class.java,
+ expression,
+ INSERT_METHOD,
+ convertedChildExpr,
+ )
+ )
+ )
+ )
+
+ val rowFormat = if (pref === Prefer.ARRAY) JavaRowFormat.ARRAY else JavaRowFormat.SCALAR
+ val physType = PhysTypeImpl.of(implementor.typeFactory, getRowType(), rowFormat)
+ return implementor.result(physType, builder.toBlock())
+ }
+
+ private companion object {
+ /**
+ * Reference to [InsertableTable.insert] method.
+ */
+ val INSERT_METHOD: Method = Types.lookupMethod(InsertableTable::class.java, "insert", Enumerable::class.java)
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt
new file mode 100644
index 00000000..7572e381
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention
+import org.apache.calcite.plan.Convention
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableModify
+import org.apache.calcite.rel.logical.LogicalTableModify
+import org.apache.calcite.schema.ModifiableTable
+
+/**
+ * A [ConverterRule] from a [LogicalTableModify] to a [TraceTableModify].
+ */
+internal class TraceTableModifyRule(config: Config) : ConverterRule(config) {
+ override fun convert(rel: RelNode): RelNode? {
+ val modify = rel as TableModify
+ val table = modify.table!!
+
+ // Make sure that the table is modifiable
+ if (table.unwrap(ModifiableTable::class.java) == null) {
+ return null
+ }
+
+ val traitSet = modify.traitSet.replace(EnumerableConvention.INSTANCE)
+ return TraceTableModify(
+ modify.cluster, traitSet,
+ table,
+ modify.catalogReader,
+ convert(modify.input, traitSet),
+ modify.operation,
+ modify.updateColumnList,
+ modify.sourceExpressionList,
+ modify.isFlattened
+ )
+ }
+
+ companion object {
+ /** Default configuration. */
+ val DEFAULT: Config = Config.INSTANCE
+ .withConversion(LogicalTableModify::class.java, Convention.NONE, EnumerableConvention.INSTANCE, "TraceTableModificationRule")
+ .withRuleFactory { config: Config -> TraceTableModifyRule(config) }
+ }
+}