diff options
Diffstat (limited to 'opendc-trace/opendc-trace-calcite/src/main')
4 files changed, 272 insertions, 0 deletions
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt new file mode 100644 index 00000000..1854f262 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.linq4j.Enumerator +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader +import java.sql.Timestamp +import java.time.Duration +import java.time.Instant +import java.util.concurrent.atomic.AtomicBoolean + +/** + * An [Enumerator] for a [TableReader]. + */ +internal class TraceReaderEnumerator<E>( + private val reader: TableReader, + private val columns: List<TableColumn<*>>, + private val cancelFlag: AtomicBoolean +) : Enumerator<E> { + private val columnIndices = columns.map { reader.resolve(it) }.toIntArray() + private var current: E? = null + + override fun moveNext(): Boolean { + if (cancelFlag.get()) { + return false + } + + val reader = reader + val res = reader.nextRow() + + if (res) { + @Suppress("UNCHECKED_CAST") + current = convertRow(reader) as E + } else { + current = null + } + + return res + } + + override fun current(): E = checkNotNull(current) + + override fun reset() { + throw UnsupportedOperationException() + } + + override fun close() { + reader.close() + } + + private fun convertRow(reader: TableReader): Array<Any?> { + val res = arrayOfNulls<Any?>(columns.size) + val columnIndices = columnIndices + + for ((index, column) in columns.withIndex()) { + val columnIndex = columnIndices[index] + res[index] = convertColumn(reader, column, columnIndex) + } + return res + } + + private fun convertColumn(reader: TableReader, column: TableColumn<*>, columnIndex: Int): Any? { + val value = reader.get(columnIndex) + + return when (column.type) { + Instant::class.java -> Timestamp.from(value as Instant) + Duration::class.java -> (value as Duration).toMillis() + Set::class.java -> (value as Set<*>).toTypedArray() + else -> value + } + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt new file mode 100644 index 00000000..298a59dc --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.schema.Schema +import org.apache.calcite.schema.Table +import org.apache.calcite.schema.impl.AbstractSchema +import org.opendc.trace.Trace + +/** + * A Calcite [Schema] that exposes an OpenDC [Trace] into multiple SQL tables. + * + * @param trace The [Trace] to create a schema for. + */ +public class TraceSchema(private val trace: Trace) : AbstractSchema() { + + private val tables: Map<String, TraceTable> by lazy { + trace.tables.associateWith { + val table = checkNotNull(trace.getTable(it)) { "Unexpected null table" } + TraceTable(table) + } + } + + override fun getTableMap(): Map<String, Table> = tables +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt new file mode 100644 index 00000000..3c6badc8 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.model.ModelHandler +import org.apache.calcite.schema.Schema +import org.apache.calcite.schema.SchemaFactory +import org.apache.calcite.schema.SchemaPlus +import org.opendc.trace.Trace +import java.io.File +import java.nio.file.Paths + +/** + * Factory that creates a [TraceSchema]. + * + * This factory allows users to include a schema that references a trace in a `model.json` file. + */ +public class TraceSchemaFactory : SchemaFactory { + override fun create(parentSchema: SchemaPlus, name: String, operand: Map<String, Any>): Schema { + val base = operand[ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName] as File? + val pathParam = requireNotNull(operand["path"]) { "Trace path not specified" } as String + val path = if (base != null) File(base, pathParam).toPath() else Paths.get(pathParam) + + val format = requireNotNull(operand["format"]) { "Trace format not specified" } as String + val create = operand.getOrDefault("create", false) as Boolean + + val trace = if (create) Trace.create(path, format) else Trace.open(path, format) + return TraceSchema(trace) + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt new file mode 100644 index 00000000..8c3fe4e2 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.DataContext +import org.apache.calcite.adapter.java.JavaTypeFactory +import org.apache.calcite.linq4j.AbstractEnumerable +import org.apache.calcite.linq4j.Enumerable +import org.apache.calcite.linq4j.Enumerator +import org.apache.calcite.rel.type.RelDataType +import org.apache.calcite.rel.type.RelDataTypeFactory +import org.apache.calcite.schema.ScannableTable +import org.apache.calcite.schema.Table +import org.apache.calcite.schema.impl.AbstractTable +import org.apache.calcite.sql.type.SqlTypeName +import java.time.Duration +import java.time.Instant +import java.util.concurrent.atomic.AtomicBoolean + +/** + * A Calcite [Table] that exposes an OpenDC [org.opendc.trace.Table] as SQL table. + */ +internal class TraceTable(private val table: org.opendc.trace.Table) : AbstractTable(), ScannableTable { + + private var rowType: RelDataType? = null + + override fun getRowType(typeFactory: RelDataTypeFactory): RelDataType { + var rowType = rowType + if (rowType == null) { + rowType = deduceRowType(typeFactory as JavaTypeFactory) + this.rowType = rowType + } + + return rowType + } + + override fun scan(root: DataContext): Enumerable<Array<Any?>> { + val cancelFlag = DataContext.Variable.CANCEL_FLAG.get<AtomicBoolean>(root) + return object : AbstractEnumerable<Array<Any?>>() { + override fun enumerator(): Enumerator<Array<Any?>> = TraceReaderEnumerator(table.newReader(), table.columns, cancelFlag) + } + } + + override fun toString(): String = "TraceTable" + + private fun deduceRowType(typeFactory: JavaTypeFactory): RelDataType { + val types = mutableListOf<RelDataType>() + val names = mutableListOf<String>() + + for (column in table.columns) { + names.add(column.name) + types.add( + when (column.type) { + Instant::class.java -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP) + Duration::class.java -> typeFactory.createSqlType(SqlTypeName.BIGINT) + Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.ANY), -1) + else -> typeFactory.createType(column.type) + } + ) + } + + return typeFactory.createStructType(types, names) + } +} |
