From 2358257c1080b7ce78270535f82f0b960d48261a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 6 Jun 2022 16:21:21 +0200 Subject: refactor(trace/api): Introduce type system for trace API This change updates the trace API by introducing a limited type system for the table columns. Previously, the table columns could have any possible type representable by the JVM. With this change, we limit the available types to a small type system. --- .../opendc/trace/calcite/TraceReaderEnumerator.kt | 45 ++++++++++++----- .../kotlin/org/opendc/trace/calcite/TraceTable.kt | 59 +++++++++++++++------- 2 files changed, 73 insertions(+), 31 deletions(-) (limited to 'opendc-trace/opendc-trace-calcite/src/main') diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt index 1854f262..74bd188b 100644 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt @@ -24,10 +24,10 @@ package org.opendc.trace.calcite import org.apache.calcite.linq4j.Enumerator import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType import org.opendc.trace.TableReader -import java.sql.Timestamp -import java.time.Duration -import java.time.Instant +import java.nio.ByteBuffer +import java.nio.ByteOrder import java.util.concurrent.atomic.AtomicBoolean /** @@ -35,10 +35,10 @@ import java.util.concurrent.atomic.AtomicBoolean */ internal class TraceReaderEnumerator( private val reader: TableReader, - private val columns: List>, + private val columns: List, private val cancelFlag: AtomicBoolean ) : Enumerator { - private val columnIndices = columns.map { reader.resolve(it) }.toIntArray() + private val columnIndices = columns.map { reader.resolve(it.name) }.toIntArray() private var current: E? = null override fun moveNext(): Boolean { @@ -80,14 +80,35 @@ internal class TraceReaderEnumerator( return res } - private fun convertColumn(reader: TableReader, column: TableColumn<*>, columnIndex: Int): Any? { - val value = reader.get(columnIndex) - + private fun convertColumn(reader: TableReader, column: TableColumn, columnIndex: Int): Any? { return when (column.type) { - Instant::class.java -> Timestamp.from(value as Instant) - Duration::class.java -> (value as Duration).toMillis() - Set::class.java -> (value as Set<*>).toTypedArray() - else -> value + is TableColumnType.Boolean -> reader.getBoolean(columnIndex) + is TableColumnType.Int -> reader.getInt(columnIndex) + is TableColumnType.Long -> reader.getLong(columnIndex) + is TableColumnType.Float -> reader.getFloat(columnIndex) + is TableColumnType.Double -> reader.getDouble(columnIndex) + is TableColumnType.String -> reader.getString(columnIndex) + is TableColumnType.UUID -> { + val uuid = reader.getUUID(columnIndex) + + if (uuid != null) { + val uuidBytes = ByteArray(16) + + ByteBuffer.wrap(uuidBytes) + .order(ByteOrder.BIG_ENDIAN) + .putLong(uuid.mostSignificantBits) + .putLong(uuid.leastSignificantBits) + + uuidBytes + } else { + null + } + } + is TableColumnType.Instant -> reader.getInstant(columnIndex)?.toEpochMilli() + is TableColumnType.Duration -> reader.getDuration(columnIndex)?.toMillis() ?: 0 + is TableColumnType.List -> reader.getList(columnIndex, Any::class.java)?.toTypedArray() + is TableColumnType.Set -> reader.getSet(columnIndex, Any::class.java)?.toTypedArray() + is TableColumnType.Map -> reader.getMap(columnIndex, Any::class.java, Any::class.java) } } } diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt index 8c571b82..dfcc22a3 100644 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt @@ -38,8 +38,11 @@ import org.apache.calcite.rex.RexNode import org.apache.calcite.schema.* import org.apache.calcite.schema.impl.AbstractTableQueryable import org.apache.calcite.sql.type.SqlTypeName +import org.opendc.trace.TableColumnType +import java.nio.ByteBuffer import java.time.Duration import java.time.Instant +import java.util.* import java.util.concurrent.atomic.AtomicBoolean /** @@ -70,7 +73,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : val cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root) return object : AbstractEnumerable>() { override fun enumerator(): Enumerator> = - TraceReaderEnumerator(table.newReader(projection), projection ?: table.columns, cancelFlag) + TraceReaderEnumerator(table.newReader(projection?.map { it.name }), projection ?: table.columns, cancelFlag) } } @@ -78,7 +81,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : val table = table val columns = table.columns val writer = table.newWriter() - val columnIndices = columns.map { writer.resolve(it) }.toIntArray() + val columnIndices = columns.map { writer.resolve(it.name) }.toIntArray() var rowCount = 0L try { @@ -90,16 +93,24 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : continue } val columnType = columns[index].type - - writer.set( - columnIndices[index], - when (columnType) { - Duration::class.java -> Duration.ofMillis(value as Long) - Instant::class.java -> Instant.ofEpochMilli(value as Long) - Set::class.java -> (value as List<*>).toSet() - else -> value + val columnIndex = columnIndices[index] + when (columnType) { + is TableColumnType.Boolean -> writer.setBoolean(columnIndex, value as Boolean) + is TableColumnType.Int -> writer.setInt(columnIndex, value as Int) + is TableColumnType.Long -> writer.setLong(columnIndex, value as Long) + is TableColumnType.Float -> writer.setFloat(columnIndex, value as Float) + is TableColumnType.Double -> writer.setDouble(columnIndex, value as Double) + is TableColumnType.String -> writer.setString(columnIndex, value as String) + is TableColumnType.UUID -> { + val bb = ByteBuffer.wrap(value as ByteArray) + writer.setUUID(columnIndex, UUID(bb.getLong(), bb.getLong())) } - ) + is TableColumnType.Instant -> writer.setInstant(columnIndex, Instant.ofEpochMilli(value as Long)) + is TableColumnType.Duration -> writer.setDuration(columnIndex, Duration.ofMillis(value as Long)) + is TableColumnType.List -> writer.setList(columnIndex, value as List<*>) + is TableColumnType.Set -> writer.setSet(columnIndex, (value as List<*>).toSet()) + is TableColumnType.Map -> writer.setMap(columnIndex, value as Map<*, *>) + } } writer.endRow() @@ -161,16 +172,26 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : for (column in table.columns) { names.add(column.name) - types.add( - when (column.type) { - Instant::class.java -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP) - Duration::class.java -> typeFactory.createSqlType(SqlTypeName.BIGINT) - Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.UNKNOWN), -1) - else -> typeFactory.createType(column.type) - } - ) + types.add(mapType(typeFactory, column.type)) } return typeFactory.createStructType(types, names) } + + private fun mapType(typeFactory: JavaTypeFactory, type: TableColumnType): RelDataType { + return when (type) { + is TableColumnType.Boolean -> typeFactory.createSqlType(SqlTypeName.BOOLEAN) + is TableColumnType.Int -> typeFactory.createSqlType(SqlTypeName.INTEGER) + is TableColumnType.Long -> typeFactory.createSqlType(SqlTypeName.BIGINT) + is TableColumnType.Float -> typeFactory.createSqlType(SqlTypeName.FLOAT) + is TableColumnType.Double -> typeFactory.createSqlType(SqlTypeName.DOUBLE) + is TableColumnType.String -> typeFactory.createSqlType(SqlTypeName.VARCHAR) + is TableColumnType.UUID -> typeFactory.createSqlType(SqlTypeName.BINARY, 16) + is TableColumnType.Instant -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP) + is TableColumnType.Duration -> typeFactory.createSqlType(SqlTypeName.BIGINT) + is TableColumnType.List -> typeFactory.createArrayType(mapType(typeFactory, type.elementType), -1) + is TableColumnType.Set -> typeFactory.createMultisetType(mapType(typeFactory, type.elementType), -1) + is TableColumnType.Map -> typeFactory.createMapType(mapType(typeFactory, type.keyType), mapType(typeFactory, type.valueType)) + } + } } -- cgit v1.2.3