From e22c97dcca7478d6941b78bdf7cd873bc0d23cdc Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Tue, 22 Jul 2025 15:47:44 +0200 Subject: Updated workload schema (#360) --- .../org/opendc/trace/calcite/InsertableTable.kt | 39 --- .../opendc/trace/calcite/TraceReaderEnumerator.kt | 118 --------- .../kotlin/org/opendc/trace/calcite/TraceSchema.kt | 47 ---- .../org/opendc/trace/calcite/TraceSchemaFactory.kt | 54 ----- .../kotlin/org/opendc/trace/calcite/TraceTable.kt | 214 ---------------- .../org/opendc/trace/calcite/TraceTableModify.kt | 157 ------------ .../opendc/trace/calcite/TraceTableModifyRule.kt | 71 ------ .../kotlin/org/opendc/trace/calcite/CalciteTest.kt | 268 --------------------- .../opendc/trace/calcite/TraceSchemaFactoryTest.kt | 80 ------ .../src/test/resources/model.json | 15 -- .../src/test/resources/trace/fragments.parquet | Bin 65174 -> 0 bytes .../test/resources/trace/interference-model.json | 20 -- .../src/test/resources/trace/tasks.parquet | Bin 1679 -> 0 bytes 13 files changed, 1083 deletions(-) delete mode 100644 opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt delete mode 100644 opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt delete mode 100644 opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt delete mode 100644 opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt delete mode 100644 opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt delete mode 100644 opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt delete mode 100644 opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt delete mode 100644 opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt delete mode 100644 opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt delete mode 100644 opendc-trace/opendc-trace-calcite/src/test/resources/model.json delete mode 100644 opendc-trace/opendc-trace-calcite/src/test/resources/trace/fragments.parquet delete mode 100644 opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json delete mode 100644 opendc-trace/opendc-trace-calcite/src/test/resources/trace/tasks.parquet (limited to 'opendc-trace/opendc-trace-calcite/src') diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt deleted file mode 100644 index 9c7b69a2..00000000 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.calcite - -import org.apache.calcite.linq4j.Enumerable -import org.apache.calcite.schema.Table - -/** - * A Calcite [Table] to which rows can be inserted. - */ -internal interface InsertableTable : Table { - /** - * Insert [rows] into this table. - * - * @param rows The rows to insert into the table. - * @return The number of rows inserted. - */ - fun insert(rows: Enumerable>): Long -} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt deleted file mode 100644 index eed52ab3..00000000 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.calcite - -import org.apache.calcite.linq4j.Enumerator -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import java.nio.ByteBuffer -import java.nio.ByteOrder -import java.util.concurrent.atomic.AtomicBoolean - -/** - * An [Enumerator] for a [TableReader]. - */ -internal class TraceReaderEnumerator( - private val reader: TableReader, - private val columns: List, - private val cancelFlag: AtomicBoolean, -) : Enumerator { - private val columnIndices = columns.map { reader.resolve(it.name) }.toIntArray() - private var current: E? = null - - override fun moveNext(): Boolean { - if (cancelFlag.get()) { - return false - } - - val reader = reader - val res = reader.nextRow() - - if (res) { - @Suppress("UNCHECKED_CAST") - current = convertRow(reader) as E - } else { - current = null - } - - return res - } - - override fun current(): E = checkNotNull(current) - - override fun reset() { - throw UnsupportedOperationException() - } - - override fun close() { - reader.close() - } - - private fun convertRow(reader: TableReader): Array { - val res = arrayOfNulls(columns.size) - val columnIndices = columnIndices - - for ((index, column) in columns.withIndex()) { - val columnIndex = columnIndices[index] - res[index] = convertColumn(reader, column, columnIndex) - } - return res - } - - private fun convertColumn( - reader: TableReader, - column: TableColumn, - columnIndex: Int, - ): Any? { - return when (column.type) { - is TableColumnType.Boolean -> reader.getBoolean(columnIndex) - is TableColumnType.Int -> reader.getInt(columnIndex) - is TableColumnType.Long -> reader.getLong(columnIndex) - is TableColumnType.Float -> reader.getFloat(columnIndex) - is TableColumnType.Double -> reader.getDouble(columnIndex) - is TableColumnType.String -> reader.getString(columnIndex) - is TableColumnType.UUID -> { - val uuid = reader.getUUID(columnIndex) - - if (uuid != null) { - val uuidBytes = ByteArray(16) - - ByteBuffer.wrap(uuidBytes) - .order(ByteOrder.BIG_ENDIAN) - .putLong(uuid.mostSignificantBits) - .putLong(uuid.leastSignificantBits) - - uuidBytes - } else { - null - } - } - is TableColumnType.Instant -> reader.getInstant(columnIndex)?.toEpochMilli() - is TableColumnType.Duration -> reader.getDuration(columnIndex)?.toMillis() ?: 0 - is TableColumnType.List -> reader.getList(columnIndex, Any::class.java)?.toTypedArray() - is TableColumnType.Set -> reader.getSet(columnIndex, Any::class.java)?.toTypedArray() - is TableColumnType.Map -> reader.getMap(columnIndex, Any::class.java, Any::class.java) - } - } -} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt deleted file mode 100644 index 3249546d..00000000 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.calcite - -import org.apache.calcite.schema.Schema -import org.apache.calcite.schema.Table -import org.apache.calcite.schema.impl.AbstractSchema -import org.opendc.trace.Trace - -/** - * A Calcite [Schema] that exposes an OpenDC [Trace] into multiple SQL tables. - * - * @param trace The [Trace] to create a schema for. - */ -public class TraceSchema(private val trace: Trace) : AbstractSchema() { - /** - * The [Table]s that belong to this schema. - */ - private val tables: Map by lazy { - trace.tables.associateWith { - val table = checkNotNull(trace.getTable(it)) { "Unexpected null table" } - TraceTable(table) - } - } - - override fun getTableMap(): Map = tables -} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt deleted file mode 100644 index cbf7ec43..00000000 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.calcite - -import org.apache.calcite.model.ModelHandler -import org.apache.calcite.schema.Schema -import org.apache.calcite.schema.SchemaFactory -import org.apache.calcite.schema.SchemaPlus -import org.opendc.trace.Trace -import java.io.File -import java.nio.file.Paths - -/** - * Factory that creates a [TraceSchema]. - * - * This factory allows users to include a schema that references a trace in a `model.json` file. - */ -public class TraceSchemaFactory : SchemaFactory { - override fun create( - parentSchema: SchemaPlus, - name: String, - operand: Map, - ): Schema { - val base = operand[ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName] as File? - val pathParam = requireNotNull(operand["path"]) { "Trace path not specified" } as String - val path = if (base != null) File(base, pathParam).toPath() else Paths.get(pathParam) - - val format = requireNotNull(operand["format"]) { "Trace format not specified" } as String - val create = operand.getOrDefault("create", false) as Boolean - - val trace = if (create) Trace.create(path, format) else Trace.open(path, format) - return TraceSchema(trace) - } -} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt deleted file mode 100644 index e74d2ee8..00000000 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.calcite - -import org.apache.calcite.DataContext -import org.apache.calcite.adapter.java.AbstractQueryableTable -import org.apache.calcite.adapter.java.JavaTypeFactory -import org.apache.calcite.linq4j.AbstractEnumerable -import org.apache.calcite.linq4j.Enumerable -import org.apache.calcite.linq4j.Enumerator -import org.apache.calcite.linq4j.QueryProvider -import org.apache.calcite.linq4j.Queryable -import org.apache.calcite.plan.RelOptCluster -import org.apache.calcite.plan.RelOptTable -import org.apache.calcite.prepare.Prepare.CatalogReader -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.core.TableModify -import org.apache.calcite.rel.logical.LogicalTableModify -import org.apache.calcite.rel.type.RelDataType -import org.apache.calcite.rel.type.RelDataTypeFactory -import org.apache.calcite.rex.RexNode -import org.apache.calcite.schema.ModifiableTable -import org.apache.calcite.schema.ProjectableFilterableTable -import org.apache.calcite.schema.SchemaPlus -import org.apache.calcite.schema.impl.AbstractTableQueryable -import org.apache.calcite.sql.type.SqlTypeName -import org.opendc.trace.TableColumnType -import java.nio.ByteBuffer -import java.time.Duration -import java.time.Instant -import java.util.UUID -import java.util.concurrent.atomic.AtomicBoolean - -/** - * A Calcite [Table] that exposes an OpenDC [org.opendc.trace.Table] as SQL table. - */ -internal class TraceTable(private val table: org.opendc.trace.Table) : - AbstractQueryableTable(Array::class.java), - ProjectableFilterableTable, - ModifiableTable, - InsertableTable { - private var rowType: RelDataType? = null - - override fun getRowType(typeFactory: RelDataTypeFactory): RelDataType { - var rowType = rowType - if (rowType == null) { - rowType = deduceRowType(typeFactory as JavaTypeFactory) - this.rowType = rowType - } - - return rowType - } - - override fun scan( - root: DataContext, - filters: MutableList, - projects: IntArray?, - ): Enumerable> { - // Filters are currently not supported by the OpenDC trace API. By keeping the filters in the list, Calcite - // assumes that they are declined and will perform the filters itself. - - val projection = projects?.map { table.columns[it] } - val cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root) - return object : AbstractEnumerable>() { - override fun enumerator(): Enumerator> = - TraceReaderEnumerator(table.newReader(projection?.map { it.name }), projection ?: table.columns, cancelFlag) - } - } - - override fun insert(rows: Enumerable>): Long { - val table = table - val columns = table.columns - val writer = table.newWriter() - val columnIndices = columns.map { writer.resolve(it.name) }.toIntArray() - var rowCount = 0L - - try { - for (row in rows) { - writer.startRow() - - for ((index, value) in row.withIndex()) { - if (value == null) { - continue - } - val columnType = columns[index].type - val columnIndex = columnIndices[index] - when (columnType) { - is TableColumnType.Boolean -> writer.setBoolean(columnIndex, value as Boolean) - is TableColumnType.Int -> writer.setInt(columnIndex, value as Int) - is TableColumnType.Long -> writer.setLong(columnIndex, value as Long) - is TableColumnType.Float -> writer.setFloat(columnIndex, value as Float) - is TableColumnType.Double -> writer.setDouble(columnIndex, value as Double) - is TableColumnType.String -> writer.setString(columnIndex, value as String) - is TableColumnType.UUID -> { - val bb = ByteBuffer.wrap(value as ByteArray) - writer.setUUID(columnIndex, UUID(bb.getLong(), bb.getLong())) - } - is TableColumnType.Instant -> writer.setInstant(columnIndex, Instant.ofEpochMilli(value as Long)) - is TableColumnType.Duration -> writer.setDuration(columnIndex, Duration.ofMillis(value as Long)) - is TableColumnType.List -> writer.setList(columnIndex, value as List<*>) - is TableColumnType.Set -> writer.setSet(columnIndex, (value as List<*>).toSet()) - is TableColumnType.Map -> writer.setMap(columnIndex, value as Map<*, *>) - } - } - - writer.endRow() - - rowCount++ - } - } finally { - writer.close() - } - - return rowCount - } - - override fun asQueryable( - queryProvider: QueryProvider, - schema: SchemaPlus, - tableName: String, - ): Queryable { - return object : AbstractTableQueryable(queryProvider, schema, this@TraceTable, tableName) { - override fun enumerator(): Enumerator { - val cancelFlag = AtomicBoolean(false) - return TraceReaderEnumerator( - this@TraceTable.table.newReader(), - this@TraceTable.table.columns, - cancelFlag, - ) - } - - override fun toString(): String = "TraceTableQueryable[table=$tableName]" - } - } - - override fun getModifiableCollection(): MutableCollection? = null - - override fun toModificationRel( - cluster: RelOptCluster, - table: RelOptTable, - catalogReader: CatalogReader, - child: RelNode, - operation: TableModify.Operation, - updateColumnList: MutableList?, - sourceExpressionList: MutableList?, - flattened: Boolean, - ): TableModify { - cluster.planner.addRule(TraceTableModifyRule.DEFAULT.toRule()) - - return LogicalTableModify.create( - table, - catalogReader, - child, - operation, - updateColumnList, - sourceExpressionList, - flattened, - ) - } - - override fun toString(): String = "TraceTable" - - private fun deduceRowType(typeFactory: JavaTypeFactory): RelDataType { - val types = mutableListOf() - val names = mutableListOf() - - for (column in table.columns) { - names.add(column.name) - types.add(mapType(typeFactory, column.type)) - } - - return typeFactory.createStructType(types, names) - } - - private fun mapType( - typeFactory: JavaTypeFactory, - type: TableColumnType, - ): RelDataType { - return when (type) { - is TableColumnType.Boolean -> typeFactory.createSqlType(SqlTypeName.BOOLEAN) - is TableColumnType.Int -> typeFactory.createSqlType(SqlTypeName.INTEGER) - is TableColumnType.Long -> typeFactory.createSqlType(SqlTypeName.BIGINT) - is TableColumnType.Float -> typeFactory.createSqlType(SqlTypeName.FLOAT) - is TableColumnType.Double -> typeFactory.createSqlType(SqlTypeName.DOUBLE) - is TableColumnType.String -> typeFactory.createSqlType(SqlTypeName.VARCHAR) - is TableColumnType.UUID -> typeFactory.createSqlType(SqlTypeName.BINARY, 16) - is TableColumnType.Instant -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP) - is TableColumnType.Duration -> typeFactory.createSqlType(SqlTypeName.BIGINT) - is TableColumnType.List -> typeFactory.createArrayType(mapType(typeFactory, type.elementType), -1) - is TableColumnType.Set -> typeFactory.createMultisetType(mapType(typeFactory, type.elementType), -1) - is TableColumnType.Map -> typeFactory.createMapType(mapType(typeFactory, type.keyType), mapType(typeFactory, type.valueType)) - } - } -} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt deleted file mode 100644 index eedff00d..00000000 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.calcite - -import org.apache.calcite.adapter.enumerable.EnumerableRel -import org.apache.calcite.adapter.enumerable.EnumerableRel.Prefer -import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor -import org.apache.calcite.adapter.enumerable.EnumerableTableScan -import org.apache.calcite.adapter.enumerable.JavaRowFormat -import org.apache.calcite.adapter.enumerable.PhysTypeImpl -import org.apache.calcite.adapter.java.JavaTypeFactory -import org.apache.calcite.linq4j.Enumerable -import org.apache.calcite.linq4j.tree.BlockBuilder -import org.apache.calcite.linq4j.tree.Expressions -import org.apache.calcite.linq4j.tree.Types -import org.apache.calcite.plan.RelOptCluster -import org.apache.calcite.plan.RelOptCost -import org.apache.calcite.plan.RelOptPlanner -import org.apache.calcite.plan.RelOptTable -import org.apache.calcite.plan.RelTraitSet -import org.apache.calcite.prepare.Prepare -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.core.TableModify -import org.apache.calcite.rel.metadata.RelMetadataQuery -import org.apache.calcite.rex.RexNode -import org.apache.calcite.schema.ModifiableTable -import org.apache.calcite.util.BuiltInMethod -import java.lang.reflect.Method - -/** - * A [TableModify] expression that modifies a workload trace. - */ -internal class TraceTableModify( - cluster: RelOptCluster, - traitSet: RelTraitSet, - table: RelOptTable, - schema: Prepare.CatalogReader, - input: RelNode, - operation: Operation, - updateColumnList: List?, - sourceExpressionList: List?, - flattened: Boolean, -) : TableModify(cluster, traitSet, table, schema, input, operation, updateColumnList, sourceExpressionList, flattened), - EnumerableRel { - init { - // Make sure the table is modifiable - table.unwrap(ModifiableTable::class.java) ?: throw AssertionError() // TODO: user error in validator - } - - override fun copy( - traitSet: RelTraitSet, - inputs: List?, - ): RelNode { - return TraceTableModify( - cluster, - traitSet, - table, - getCatalogReader(), - sole(inputs), - operation, - updateColumnList, - sourceExpressionList, - isFlattened, - ) - } - - override fun computeSelfCost( - planner: RelOptPlanner, - mq: RelMetadataQuery?, - ): RelOptCost { - // Prefer this plan compared to the standard EnumerableTableModify. - return super.computeSelfCost(planner, mq)!!.multiplyBy(.1) - } - - override fun implement( - implementor: EnumerableRelImplementor, - pref: Prefer, - ): EnumerableRel.Result { - val builder = BlockBuilder() - val result = implementor.visitChild(this, 0, getInput() as EnumerableRel, pref) - val childExp = builder.append("child", result.block) - val convertedChildExpr = - if (getInput().rowType != rowType) { - val typeFactory = cluster.typeFactory as JavaTypeFactory - val format = EnumerableTableScan.deduceFormat(table) - val physType = PhysTypeImpl.of(typeFactory, table.rowType, format) - val childPhysType = result.physType - val o = Expressions.parameter(childPhysType.javaRowType, "o") - val expressionList = - List(childPhysType.rowType.fieldCount) { i -> - childPhysType.fieldReference(o, i, physType.getJavaFieldType(i)) - } - - builder.append( - "convertedChild", - Expressions.call( - childExp, - BuiltInMethod.SELECT.method, - Expressions.lambda>(physType.record(expressionList), o), - ), - ) - } else { - childExp - } - - if (!isInsert) { - throw UnsupportedOperationException("Deletion and update not supported") - } - - val expression = table.getExpression(InsertableTable::class.java) - builder.add( - Expressions.return_( - null, - Expressions.call( - BuiltInMethod.SINGLETON_ENUMERABLE.method, - Expressions.call( - Long::class.java, - expression, - INSERT_METHOD, - convertedChildExpr, - ), - ), - ), - ) - - val rowFormat = if (pref === Prefer.ARRAY) JavaRowFormat.ARRAY else JavaRowFormat.SCALAR - val physType = PhysTypeImpl.of(implementor.typeFactory, getRowType(), rowFormat) - return implementor.result(physType, builder.toBlock()) - } - - private companion object { - /** - * Reference to [InsertableTable.insert] method. - */ - val INSERT_METHOD: Method = Types.lookupMethod(InsertableTable::class.java, "insert", Enumerable::class.java) - } -} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt deleted file mode 100644 index 9c560984..00000000 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.calcite - -import org.apache.calcite.adapter.enumerable.EnumerableConvention -import org.apache.calcite.plan.Convention -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.core.TableModify -import org.apache.calcite.rel.logical.LogicalTableModify -import org.apache.calcite.schema.ModifiableTable - -/** - * A [ConverterRule] from a [LogicalTableModify] to a [TraceTableModify]. - */ -internal class TraceTableModifyRule(config: Config) : ConverterRule(config) { - override fun convert(rel: RelNode): RelNode? { - val modify = rel as TableModify - val table = modify.table!! - - // Make sure that the table is modifiable - if (table.unwrap(ModifiableTable::class.java) == null) { - return null - } - - val traitSet = modify.traitSet.replace(EnumerableConvention.INSTANCE) - return TraceTableModify( - modify.cluster, traitSet, - table, - modify.catalogReader, - convert(modify.input, traitSet), - modify.operation, - modify.updateColumnList, - modify.sourceExpressionList, - modify.isFlattened, - ) - } - - companion object { - /** Default configuration. */ - val DEFAULT: Config = - Config.INSTANCE - .withConversion( - LogicalTableModify::class.java, - Convention.NONE, - EnumerableConvention.INSTANCE, - "TraceTableModificationRule", - ) - .withRuleFactory { config: Config -> TraceTableModifyRule(config) } - } -} diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt deleted file mode 100644 index 6a945580..00000000 --- a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.calcite - -import io.mockk.every -import io.mockk.mockk -import org.apache.calcite.jdbc.CalciteConnection -import org.junit.jupiter.api.Assertions.assertAll -import org.junit.jupiter.api.Assertions.assertArrayEquals -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertFalse -import org.junit.jupiter.api.Assertions.assertTrue -import org.junit.jupiter.api.Test -import org.opendc.trace.TableColumn -import org.opendc.trace.TableColumnType -import org.opendc.trace.TableReader -import org.opendc.trace.Trace -import org.opendc.trace.conv.TABLE_RESOURCES -import java.nio.file.Files -import java.nio.file.Paths -import java.sql.DriverManager -import java.sql.ResultSet -import java.sql.Statement -import java.sql.Timestamp -import java.time.Duration -import java.time.Instant -import java.util.Properties -import java.util.UUID - -/** - * Smoke test for Apache Calcite integration. - */ -class CalciteTest { - /** - * The trace to experiment with. - */ - private val odcTrace = Trace.open(Paths.get("src/test/resources/trace"), format = "opendc-vm") - -// @Test - fun testResources() { - runQuery(odcTrace, "SELECT * FROM trace.resources") { rs -> - assertAll( - { assertTrue(rs.next()) }, - { assertEquals("1019", rs.getString("id")) }, - { assertEquals(1, rs.getInt("cpu_count")) }, - { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("start_time")) }, - { assertEquals(181352.0, rs.getDouble("mem_capacity")) }, - { assertTrue(rs.next()) }, - { assertEquals("1023", rs.getString("id")) }, - { assertTrue(rs.next()) }, - { assertEquals("1052", rs.getString("id")) }, - { assertTrue(rs.next()) }, - { assertEquals("1073", rs.getString("id")) }, - { assertFalse(rs.next()) }, - ) - } - } - - @Test - fun testResourceStates() { - runQuery(odcTrace, "SELECT * FROM trace.resource_states") { rs -> - assertAll( - { assertTrue(rs.next()) }, - { assertEquals("1019", rs.getString("id")) }, - { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("timestamp")) }, - { assertEquals(300000, rs.getLong("duration")) }, - { assertEquals(0.0, rs.getDouble("cpu_usage")) }, - { assertTrue(rs.next()) }, - { assertEquals("1019", rs.getString("id")) }, - ) - } - } - - @Test - fun testInterferenceGroups() { - runQuery(odcTrace, "SELECT * FROM trace.interference_groups") { rs -> - assertAll( - { assertTrue(rs.next()) }, - { assertArrayEquals(arrayOf("1019", "1023", "1052"), rs.getArray("members").array as Array<*>) }, - { assertEquals(0.0, rs.getDouble("target")) }, - { assertEquals(0.8830158730158756, rs.getDouble("score")) }, - ) - } - } - - @Test - fun testComplexQuery() { - runQuery(odcTrace, "SELECT max(cpu_usage) as max_cpu_usage, avg(cpu_usage) as avg_cpu_usage FROM trace.resource_states") { rs -> - assertAll( - { assertTrue(rs.next()) }, - { assertEquals(249.59993808, rs.getDouble("max_cpu_usage")) }, - { assertEquals(5.387240309118493, rs.getDouble("avg_cpu_usage")) }, - ) - } - } - -// @Test - fun testInsert() { - val tmp = Files.createTempDirectory("opendc") - val newTrace = Trace.create(tmp, "opendc-vm") - - runStatement(newTrace) { stmt -> - val count = - stmt.executeUpdate( - """ - INSERT INTO trace.resources (id, start_time, stop_time, cpu_count, cpu_capacity, mem_capacity) - VALUES (1234, '2013-08-12 13:35:46.0', '2013-09-11 13:39:58.0', 1, 2926.0, 1024.0) - """.trimIndent(), - ) - assertEquals(1, count) - } - - runQuery(newTrace, "SELECT * FROM trace.resources") { rs -> - assertAll( - { assertTrue(rs.next()) }, - { assertEquals("1234", rs.getString("id")) }, - { assertEquals(1, rs.getInt("cpu_count")) }, - { assertEquals(Timestamp.valueOf("2013-08-12 13:35:46.0"), rs.getTimestamp("start_time")) }, - { assertEquals(2926.0, rs.getDouble("cpu_capacity")) }, - { assertEquals(1024.0, rs.getDouble("mem_capacity")) }, - ) - } - } - - @Test - fun testUUID() { - val trace = mockk() - every { trace.tables } returns listOf(TABLE_RESOURCES) - every { trace.getTable(TABLE_RESOURCES)!!.columns } returns - listOf( - TableColumn("id", TableColumnType.UUID), - ) - every { trace.getTable(TABLE_RESOURCES)!!.newReader() } answers { - object : TableReader { - override fun nextRow(): Boolean = true - - override fun resolve(name: String): Int { - return when (name) { - "id" -> 0 - else -> -1 - } - } - - override fun isNull(index: Int): Boolean = false - - override fun getBoolean(index: Int): Boolean { - TODO("not implemented") - } - - override fun getInt(index: Int): Int { - TODO("not implemented") - } - - override fun getLong(index: Int): Long { - TODO("not implemented") - } - - override fun getFloat(index: Int): Float { - TODO("not implemented") - } - - override fun getDouble(index: Int): Double { - TODO("not implemented") - } - - override fun getString(index: Int): String? { - TODO("not implemented") - } - - override fun getUUID(index: Int): UUID = UUID(1, 2) - - override fun getInstant(index: Int): Instant? { - TODO("not implemented") - } - - override fun getDuration(index: Int): Duration? { - TODO("not implemented") - } - - override fun getList( - index: Int, - elementType: Class, - ): List? { - TODO("not implemented") - } - - override fun getSet( - index: Int, - elementType: Class, - ): Set? { - TODO("not implemented") - } - - override fun getMap( - index: Int, - keyType: Class, - valueType: Class, - ): Map? { - TODO("not implemented") - } - - override fun close() {} - } - } - - runQuery(trace, "SELECT id FROM trace.resources") { rs -> - assertAll( - { assertTrue(rs.next()) }, - { assertArrayEquals(byteArrayOf(0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2), rs.getBytes("id")) }, - ) - } - } - - /** - * Helper function to run statement for the specified trace. - */ - private fun runQuery( - trace: Trace, - query: String, - block: (ResultSet) -> Unit, - ) { - runStatement(trace) { stmt -> - val rs = stmt.executeQuery(query) - rs.use { block(rs) } - } - } - - /** - * Helper function to run statement for the specified trace. - */ - private fun runStatement( - trace: Trace, - block: (Statement) -> Unit, - ) { - val info = Properties() - info.setProperty("lex", "JAVA") - val connection = DriverManager.getConnection("jdbc:calcite:", info).unwrap(CalciteConnection::class.java) - connection.rootSchema.add("trace", TraceSchema(trace)) - - val stmt = connection.createStatement() - try { - block(stmt) - } finally { - stmt.close() - connection.close() - } - } -} diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt deleted file mode 100644 index ddf325e8..00000000 --- a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.trace.calcite - -import org.junit.jupiter.api.Assertions.assertAll -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertTrue -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows -import java.sql.DriverManager -import java.sql.Timestamp -import java.util.Properties - -/** - * Test suite for [TraceSchemaFactory]. - */ -class TraceSchemaFactoryTest { -// @Test - fun testSmoke() { - val info = Properties() - info.setProperty("lex", "JAVA") - val connection = DriverManager.getConnection("jdbc:calcite:model=src/test/resources/model.json", info) - val stmt = connection.createStatement() - val rs = stmt.executeQuery("SELECT * FROM trace.resources") - try { - assertAll( - { assertTrue(rs.next()) }, - { assertEquals("1019", rs.getString("id")) }, - { assertEquals(1, rs.getInt("cpu_count")) }, - { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("start_time")) }, - { assertEquals(181352.0, rs.getDouble("mem_capacity")) }, - ) - } finally { - rs.close() - stmt.close() - connection.close() - } - } - - @Test - fun testWithoutParams() { - assertThrows { - DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory") - } - } - - @Test - fun testWithoutPath() { - assertThrows { - DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory; schema.format=opendc-vm") - } - } - - @Test - fun testWithoutFormat() { - assertThrows { - DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory; schema.path=trace") - } - } -} diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/model.json b/opendc-trace/opendc-trace-calcite/src/test/resources/model.json deleted file mode 100644 index 91e2657f..00000000 --- a/opendc-trace/opendc-trace-calcite/src/test/resources/model.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "version": "1.0", - "defaultSchema": "trace", - "schemas": [ - { - "name": "trace", - "type": "custom", - "factory": "org.opendc.trace.calcite.TraceSchemaFactory", - "operand": { - "path": "trace", - "format": "opendc-vm" - } - } - ] -} diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/fragments.parquet b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/fragments.parquet deleted file mode 100644 index 00ab5835..00000000 Binary files a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/fragments.parquet and /dev/null differ diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json deleted file mode 100644 index 6a0616d9..00000000 --- a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json +++ /dev/null @@ -1,20 +0,0 @@ -[ - { - "vms": [ - "1019", - "1023", - "1052" - ], - "minServerLoad": 0.0, - "performanceScore": 0.8830158730158756 - }, - { - "vms": [ - "1023", - "1052", - "1073" - ], - "minServerLoad": 0.0, - "performanceScore": 0.7133055555552751 - } -] diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/tasks.parquet b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/tasks.parquet deleted file mode 100644 index d8184945..00000000 Binary files a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/tasks.parquet and /dev/null differ -- cgit v1.2.3