summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-calcite/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-trace/opendc-trace-calcite/src')
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt39
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt93
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt47
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt50
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt176
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt138
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt65
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt158
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt78
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/resources/model.json15
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json20
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquetbin0 -> 1679 bytes
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquetbin0 -> 65174 bytes
13 files changed, 879 insertions, 0 deletions
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt
new file mode 100644
index 00000000..9c7b69a2
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.linq4j.Enumerable
+import org.apache.calcite.schema.Table
+
+/**
+ * A Calcite [Table] to which rows can be inserted.
+ */
+internal interface InsertableTable : Table {
+ /**
+ * Insert [rows] into this table.
+ *
+ * @param rows The rows to insert into the table.
+ * @return The number of rows inserted.
+ */
+ fun insert(rows: Enumerable<Array<Any?>>): Long
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
new file mode 100644
index 00000000..1854f262
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.linq4j.Enumerator
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableReader
+import java.sql.Timestamp
+import java.time.Duration
+import java.time.Instant
+import java.util.concurrent.atomic.AtomicBoolean
+
+/**
+ * An [Enumerator] for a [TableReader].
+ */
+internal class TraceReaderEnumerator<E>(
+ private val reader: TableReader,
+ private val columns: List<TableColumn<*>>,
+ private val cancelFlag: AtomicBoolean
+) : Enumerator<E> {
+ private val columnIndices = columns.map { reader.resolve(it) }.toIntArray()
+ private var current: E? = null
+
+ override fun moveNext(): Boolean {
+ if (cancelFlag.get()) {
+ return false
+ }
+
+ val reader = reader
+ val res = reader.nextRow()
+
+ if (res) {
+ @Suppress("UNCHECKED_CAST")
+ current = convertRow(reader) as E
+ } else {
+ current = null
+ }
+
+ return res
+ }
+
+ override fun current(): E = checkNotNull(current)
+
+ override fun reset() {
+ throw UnsupportedOperationException()
+ }
+
+ override fun close() {
+ reader.close()
+ }
+
+ private fun convertRow(reader: TableReader): Array<Any?> {
+ val res = arrayOfNulls<Any?>(columns.size)
+ val columnIndices = columnIndices
+
+ for ((index, column) in columns.withIndex()) {
+ val columnIndex = columnIndices[index]
+ res[index] = convertColumn(reader, column, columnIndex)
+ }
+ return res
+ }
+
+ private fun convertColumn(reader: TableReader, column: TableColumn<*>, columnIndex: Int): Any? {
+ val value = reader.get(columnIndex)
+
+ return when (column.type) {
+ Instant::class.java -> Timestamp.from(value as Instant)
+ Duration::class.java -> (value as Duration).toMillis()
+ Set::class.java -> (value as Set<*>).toTypedArray()
+ else -> value
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt
new file mode 100644
index 00000000..3249546d
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.schema.Schema
+import org.apache.calcite.schema.Table
+import org.apache.calcite.schema.impl.AbstractSchema
+import org.opendc.trace.Trace
+
+/**
+ * A Calcite [Schema] that exposes an OpenDC [Trace] into multiple SQL tables.
+ *
+ * @param trace The [Trace] to create a schema for.
+ */
+public class TraceSchema(private val trace: Trace) : AbstractSchema() {
+ /**
+ * The [Table]s that belong to this schema.
+ */
+ private val tables: Map<String, TraceTable> by lazy {
+ trace.tables.associateWith {
+ val table = checkNotNull(trace.getTable(it)) { "Unexpected null table" }
+ TraceTable(table)
+ }
+ }
+
+ override fun getTableMap(): Map<String, Table> = tables
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt
new file mode 100644
index 00000000..3c6badc8
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.model.ModelHandler
+import org.apache.calcite.schema.Schema
+import org.apache.calcite.schema.SchemaFactory
+import org.apache.calcite.schema.SchemaPlus
+import org.opendc.trace.Trace
+import java.io.File
+import java.nio.file.Paths
+
+/**
+ * Factory that creates a [TraceSchema].
+ *
+ * This factory allows users to include a schema that references a trace in a `model.json` file.
+ */
+public class TraceSchemaFactory : SchemaFactory {
+ override fun create(parentSchema: SchemaPlus, name: String, operand: Map<String, Any>): Schema {
+ val base = operand[ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName] as File?
+ val pathParam = requireNotNull(operand["path"]) { "Trace path not specified" } as String
+ val path = if (base != null) File(base, pathParam).toPath() else Paths.get(pathParam)
+
+ val format = requireNotNull(operand["format"]) { "Trace format not specified" } as String
+ val create = operand.getOrDefault("create", false) as Boolean
+
+ val trace = if (create) Trace.create(path, format) else Trace.open(path, format)
+ return TraceSchema(trace)
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
new file mode 100644
index 00000000..8c571b82
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
@@ -0,0 +1,176 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.DataContext
+import org.apache.calcite.adapter.java.AbstractQueryableTable
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.linq4j.*
+import org.apache.calcite.plan.RelOptCluster
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.prepare.Prepare.CatalogReader
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableModify
+import org.apache.calcite.rel.logical.LogicalTableModify
+import org.apache.calcite.rel.type.RelDataType
+import org.apache.calcite.rel.type.RelDataTypeFactory
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.schema.*
+import org.apache.calcite.schema.impl.AbstractTableQueryable
+import org.apache.calcite.sql.type.SqlTypeName
+import java.time.Duration
+import java.time.Instant
+import java.util.concurrent.atomic.AtomicBoolean
+
+/**
+ * A Calcite [Table] that exposes an OpenDC [org.opendc.trace.Table] as SQL table.
+ */
+internal class TraceTable(private val table: org.opendc.trace.Table) :
+ AbstractQueryableTable(Array<Any?>::class.java),
+ ProjectableFilterableTable,
+ ModifiableTable,
+ InsertableTable {
+ private var rowType: RelDataType? = null
+
+ override fun getRowType(typeFactory: RelDataTypeFactory): RelDataType {
+ var rowType = rowType
+ if (rowType == null) {
+ rowType = deduceRowType(typeFactory as JavaTypeFactory)
+ this.rowType = rowType
+ }
+
+ return rowType
+ }
+
+ override fun scan(root: DataContext, filters: MutableList<RexNode>, projects: IntArray?): Enumerable<Array<Any?>> {
+ // Filters are currently not supported by the OpenDC trace API. By keeping the filters in the list, Calcite
+ // assumes that they are declined and will perform the filters itself.
+
+ val projection = projects?.map { table.columns[it] }
+ val cancelFlag = DataContext.Variable.CANCEL_FLAG.get<AtomicBoolean>(root)
+ return object : AbstractEnumerable<Array<Any?>>() {
+ override fun enumerator(): Enumerator<Array<Any?>> =
+ TraceReaderEnumerator(table.newReader(projection), projection ?: table.columns, cancelFlag)
+ }
+ }
+
+ override fun insert(rows: Enumerable<Array<Any?>>): Long {
+ val table = table
+ val columns = table.columns
+ val writer = table.newWriter()
+ val columnIndices = columns.map { writer.resolve(it) }.toIntArray()
+ var rowCount = 0L
+
+ try {
+ for (row in rows) {
+ writer.startRow()
+
+ for ((index, value) in row.withIndex()) {
+ if (value == null) {
+ continue
+ }
+ val columnType = columns[index].type
+
+ writer.set(
+ columnIndices[index],
+ when (columnType) {
+ Duration::class.java -> Duration.ofMillis(value as Long)
+ Instant::class.java -> Instant.ofEpochMilli(value as Long)
+ Set::class.java -> (value as List<*>).toSet()
+ else -> value
+ }
+ )
+ }
+
+ writer.endRow()
+
+ rowCount++
+ }
+ } finally {
+ writer.close()
+ }
+
+ return rowCount
+ }
+
+ override fun <T> asQueryable(queryProvider: QueryProvider, schema: SchemaPlus, tableName: String): Queryable<T> {
+ return object : AbstractTableQueryable<T>(queryProvider, schema, this@TraceTable, tableName) {
+ override fun enumerator(): Enumerator<T> {
+ val cancelFlag = AtomicBoolean(false)
+ return TraceReaderEnumerator(
+ this@TraceTable.table.newReader(),
+ this@TraceTable.table.columns,
+ cancelFlag
+ )
+ }
+
+ override fun toString(): String = "TraceTableQueryable[table=$tableName]"
+ }
+ }
+
+ override fun getModifiableCollection(): MutableCollection<Any?>? = null
+
+ override fun toModificationRel(
+ cluster: RelOptCluster,
+ table: RelOptTable,
+ catalogReader: CatalogReader,
+ child: RelNode,
+ operation: TableModify.Operation,
+ updateColumnList: MutableList<String>?,
+ sourceExpressionList: MutableList<RexNode>?,
+ flattened: Boolean
+ ): TableModify {
+ cluster.planner.addRule(TraceTableModifyRule.DEFAULT.toRule())
+
+ return LogicalTableModify.create(
+ table,
+ catalogReader,
+ child,
+ operation,
+ updateColumnList,
+ sourceExpressionList,
+ flattened
+ )
+ }
+
+ override fun toString(): String = "TraceTable"
+
+ private fun deduceRowType(typeFactory: JavaTypeFactory): RelDataType {
+ val types = mutableListOf<RelDataType>()
+ val names = mutableListOf<String>()
+
+ for (column in table.columns) {
+ names.add(column.name)
+ types.add(
+ when (column.type) {
+ Instant::class.java -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP)
+ Duration::class.java -> typeFactory.createSqlType(SqlTypeName.BIGINT)
+ Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.UNKNOWN), -1)
+ else -> typeFactory.createType(column.type)
+ }
+ )
+ }
+
+ return typeFactory.createStructType(types, names)
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt
new file mode 100644
index 00000000..64dc0cea
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt
@@ -0,0 +1,138 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.adapter.enumerable.*
+import org.apache.calcite.adapter.enumerable.EnumerableRel.Prefer
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.linq4j.Enumerable
+import org.apache.calcite.linq4j.tree.BlockBuilder
+import org.apache.calcite.linq4j.tree.Expressions
+import org.apache.calcite.linq4j.tree.Types
+import org.apache.calcite.plan.*
+import org.apache.calcite.prepare.Prepare
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableModify
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.schema.ModifiableTable
+import org.apache.calcite.util.BuiltInMethod
+import java.lang.reflect.Method
+
+/**
+ * A [TableModify] expression that modifies a workload trace.
+ */
+internal class TraceTableModify(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ table: RelOptTable,
+ schema: Prepare.CatalogReader,
+ input: RelNode,
+ operation: Operation,
+ updateColumnList: List<String>?,
+ sourceExpressionList: List<RexNode>?,
+ flattened: Boolean
+) : TableModify(cluster, traitSet, table, schema, input, operation, updateColumnList, sourceExpressionList, flattened),
+ EnumerableRel {
+ init {
+ // Make sure the table is modifiable
+ table.unwrap(ModifiableTable::class.java) ?: throw AssertionError() // TODO: user error in validator
+ }
+
+ override fun copy(traitSet: RelTraitSet, inputs: List<RelNode>?): RelNode {
+ return TraceTableModify(
+ cluster,
+ traitSet,
+ table,
+ getCatalogReader(),
+ sole(inputs),
+ operation,
+ updateColumnList,
+ sourceExpressionList,
+ isFlattened
+ )
+ }
+
+ override fun computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery?): RelOptCost {
+ // Prefer this plan compared to the standard EnumerableTableModify.
+ return super.computeSelfCost(planner, mq)!!.multiplyBy(.1)
+ }
+
+ override fun implement(implementor: EnumerableRelImplementor, pref: Prefer): EnumerableRel.Result {
+ val builder = BlockBuilder()
+ val result = implementor.visitChild(this, 0, getInput() as EnumerableRel, pref)
+ val childExp = builder.append("child", result.block)
+ val convertedChildExpr = if (getInput().rowType != rowType) {
+ val typeFactory = cluster.typeFactory as JavaTypeFactory
+ val format = EnumerableTableScan.deduceFormat(table)
+ val physType = PhysTypeImpl.of(typeFactory, table.rowType, format)
+ val childPhysType = result.physType
+ val o = Expressions.parameter(childPhysType.javaRowType, "o")
+ val expressionList = List(childPhysType.rowType.fieldCount) { i ->
+ childPhysType.fieldReference(o, i, physType.getJavaFieldType(i))
+ }
+
+ builder.append(
+ "convertedChild",
+ Expressions.call(
+ childExp,
+ BuiltInMethod.SELECT.method,
+ Expressions.lambda<org.apache.calcite.linq4j.function.Function<*>>(physType.record(expressionList), o)
+ )
+ )
+ } else {
+ childExp
+ }
+
+ if (!isInsert) {
+ throw UnsupportedOperationException("Deletion and update not supported")
+ }
+
+ val expression = table.getExpression(InsertableTable::class.java)
+ builder.add(
+ Expressions.return_(
+ null,
+ Expressions.call(
+ BuiltInMethod.SINGLETON_ENUMERABLE.method,
+ Expressions.call(
+ Long::class.java,
+ expression,
+ INSERT_METHOD,
+ convertedChildExpr,
+ )
+ )
+ )
+ )
+
+ val rowFormat = if (pref === Prefer.ARRAY) JavaRowFormat.ARRAY else JavaRowFormat.SCALAR
+ val physType = PhysTypeImpl.of(implementor.typeFactory, getRowType(), rowFormat)
+ return implementor.result(physType, builder.toBlock())
+ }
+
+ private companion object {
+ /**
+ * Reference to [InsertableTable.insert] method.
+ */
+ val INSERT_METHOD: Method = Types.lookupMethod(InsertableTable::class.java, "insert", Enumerable::class.java)
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt
new file mode 100644
index 00000000..7572e381
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention
+import org.apache.calcite.plan.Convention
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableModify
+import org.apache.calcite.rel.logical.LogicalTableModify
+import org.apache.calcite.schema.ModifiableTable
+
+/**
+ * A [ConverterRule] from a [LogicalTableModify] to a [TraceTableModify].
+ */
+internal class TraceTableModifyRule(config: Config) : ConverterRule(config) {
+ override fun convert(rel: RelNode): RelNode? {
+ val modify = rel as TableModify
+ val table = modify.table!!
+
+ // Make sure that the table is modifiable
+ if (table.unwrap(ModifiableTable::class.java) == null) {
+ return null
+ }
+
+ val traitSet = modify.traitSet.replace(EnumerableConvention.INSTANCE)
+ return TraceTableModify(
+ modify.cluster, traitSet,
+ table,
+ modify.catalogReader,
+ convert(modify.input, traitSet),
+ modify.operation,
+ modify.updateColumnList,
+ modify.sourceExpressionList,
+ modify.isFlattened
+ )
+ }
+
+ companion object {
+ /** Default configuration. */
+ val DEFAULT: Config = Config.INSTANCE
+ .withConversion(LogicalTableModify::class.java, Convention.NONE, EnumerableConvention.INSTANCE, "TraceTableModificationRule")
+ .withRuleFactory { config: Config -> TraceTableModifyRule(config) }
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt
new file mode 100644
index 00000000..d2877d7c
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt
@@ -0,0 +1,158 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.jdbc.CalciteConnection
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.opendc.trace.Trace
+import java.nio.file.Files
+import java.nio.file.Paths
+import java.sql.DriverManager
+import java.sql.ResultSet
+import java.sql.Statement
+import java.sql.Timestamp
+import java.util.*
+
+/**
+ * Smoke test for Apache Calcite integration.
+ */
+class CalciteTest {
+ /**
+ * The trace to experiment with.
+ */
+ private val trace = Trace.open(Paths.get("src/test/resources/trace"), format = "opendc-vm")
+
+ @Test
+ fun testResources() {
+ runQuery(trace, "SELECT * FROM trace.resources") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertEquals("1019", rs.getString("id")) },
+ { assertEquals(1, rs.getInt("cpu_count")) },
+ { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("start_time")) },
+ { assertEquals(181352.0, rs.getDouble("mem_capacity")) },
+ { assertTrue(rs.next()) },
+ { assertEquals("1023", rs.getString("id")) },
+ { assertTrue(rs.next()) },
+ { assertEquals("1052", rs.getString("id")) },
+ { assertTrue(rs.next()) },
+ { assertEquals("1073", rs.getString("id")) },
+ { assertFalse(rs.next()) }
+ )
+ }
+ }
+
+ @Test
+ fun testResourceStates() {
+ runQuery(trace, "SELECT * FROM trace.resource_states") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertEquals("1019", rs.getString("id")) },
+ { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("timestamp")) },
+ { assertEquals(300000, rs.getLong("duration")) },
+ { assertEquals(0.0, rs.getDouble("cpu_usage")) },
+ { assertTrue(rs.next()) },
+ { assertEquals("1019", rs.getString("id")) },
+ )
+ }
+ }
+
+ @Test
+ fun testInterferenceGroups() {
+ runQuery(trace, "SELECT * FROM trace.interference_groups") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertArrayEquals(arrayOf("1019", "1023", "1052"), rs.getArray("members").array as Array<*>) },
+ { assertEquals(0.0, rs.getDouble("target")) },
+ { assertEquals(0.8830158730158756, rs.getDouble("score")) },
+ )
+ }
+ }
+
+ @Test
+ fun testComplexQuery() {
+ runQuery(trace, "SELECT max(cpu_usage) as max_cpu_usage, avg(cpu_usage) as avg_cpu_usage FROM trace.resource_states") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertEquals(249.59993808, rs.getDouble("max_cpu_usage")) },
+ { assertEquals(5.387240309118493, rs.getDouble("avg_cpu_usage")) },
+ )
+ }
+ }
+
+ @Test
+ fun testInsert() {
+ val tmp = Files.createTempDirectory("opendc")
+ val newTrace = Trace.create(tmp, "opendc-vm")
+
+ runStatement(newTrace) { stmt ->
+ val count = stmt.executeUpdate(
+ """
+ INSERT INTO trace.resources (id, start_time, stop_time, cpu_count, cpu_capacity, mem_capacity)
+ VALUES (1234, '2013-08-12 13:35:46.0', '2013-09-11 13:39:58.0', 1, 2926.0, 1024.0)
+ """.trimIndent()
+ )
+ assertEquals(1, count)
+ }
+
+ runQuery(newTrace, "SELECT * FROM trace.resources") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertEquals("1234", rs.getString("id")) },
+ { assertEquals(1, rs.getInt("cpu_count")) },
+ { assertEquals(Timestamp.valueOf("2013-08-12 13:35:46.0"), rs.getTimestamp("start_time")) },
+ { assertEquals(2926.0, rs.getDouble("cpu_capacity")) },
+ { assertEquals(1024.0, rs.getDouble("mem_capacity")) }
+ )
+ }
+ }
+
+ /**
+ * Helper function to run statement for the specified trace.
+ */
+ private fun runQuery(trace: Trace, query: String, block: (ResultSet) -> Unit) {
+ runStatement(trace) { stmt ->
+ val rs = stmt.executeQuery(query)
+ rs.use { block(rs) }
+ }
+ }
+
+ /**
+ * Helper function to run statement for the specified trace.
+ */
+ private fun runStatement(trace: Trace, block: (Statement) -> Unit) {
+ val info = Properties()
+ info.setProperty("lex", "JAVA")
+ val connection = DriverManager.getConnection("jdbc:calcite:", info).unwrap(CalciteConnection::class.java)
+ connection.rootSchema.add("trace", TraceSchema(trace))
+
+ val stmt = connection.createStatement()
+ try {
+ block(stmt)
+ } finally {
+ stmt.close()
+ connection.close()
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt
new file mode 100644
index 00000000..0a552e74
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import java.sql.DriverManager
+import java.sql.Timestamp
+import java.util.*
+
+/**
+ * Test suite for [TraceSchemaFactory].
+ */
+class TraceSchemaFactoryTest {
+ @Test
+ fun testSmoke() {
+ val info = Properties()
+ info.setProperty("lex", "JAVA")
+ val connection = DriverManager.getConnection("jdbc:calcite:model=src/test/resources/model.json", info)
+ val stmt = connection.createStatement()
+ val rs = stmt.executeQuery("SELECT * FROM trace.resources")
+ try {
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertEquals("1019", rs.getString("id")) },
+ { assertEquals(1, rs.getInt("cpu_count")) },
+ { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("start_time")) },
+ { assertEquals(181352.0, rs.getDouble("mem_capacity")) },
+ )
+ } finally {
+ rs.close()
+ stmt.close()
+ connection.close()
+ }
+ }
+
+ @Test
+ fun testWithoutParams() {
+ assertThrows<java.lang.RuntimeException> {
+ DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory")
+ }
+ }
+
+ @Test
+ fun testWithoutPath() {
+ assertThrows<java.lang.RuntimeException> {
+ DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory; schema.format=opendc-vm")
+ }
+ }
+
+ @Test
+ fun testWithoutFormat() {
+ assertThrows<java.lang.RuntimeException> {
+ DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory; schema.path=trace")
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/model.json b/opendc-trace/opendc-trace-calcite/src/test/resources/model.json
new file mode 100644
index 00000000..91e2657f
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/resources/model.json
@@ -0,0 +1,15 @@
+{
+ "version": "1.0",
+ "defaultSchema": "trace",
+ "schemas": [
+ {
+ "name": "trace",
+ "type": "custom",
+ "factory": "org.opendc.trace.calcite.TraceSchemaFactory",
+ "operand": {
+ "path": "trace",
+ "format": "opendc-vm"
+ }
+ }
+ ]
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json
new file mode 100644
index 00000000..6a0616d9
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json
@@ -0,0 +1,20 @@
+[
+ {
+ "vms": [
+ "1019",
+ "1023",
+ "1052"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.8830158730158756
+ },
+ {
+ "vms": [
+ "1023",
+ "1052",
+ "1073"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.7133055555552751
+ }
+]
diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet
new file mode 100644
index 00000000..d8184945
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet
new file mode 100644
index 00000000..00ab5835
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet
Binary files differ