summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-calcite/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-06-06 16:21:21 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-06-07 15:46:53 +0200
commit2358257c1080b7ce78270535f82f0b960d48261a (patch)
treebced69c02698e85f995aa9935ddcfb54df23a64f /opendc-trace/opendc-trace-calcite/src/main
parent61b6550d7a476ab1aae45a5b9385dfd6ca4f6b6f (diff)
refactor(trace/api): Introduce type system for trace API
This change updates the trace API by introducing a limited type system for the table columns. Previously, the table columns could have any possible type representable by the JVM. With this change, we limit the available types to a small type system.
Diffstat (limited to 'opendc-trace/opendc-trace-calcite/src/main')
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt45
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt59
2 files changed, 73 insertions, 31 deletions
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
index 1854f262..74bd188b 100644
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
@@ -24,10 +24,10 @@ package org.opendc.trace.calcite
import org.apache.calcite.linq4j.Enumerator
import org.opendc.trace.TableColumn
+import org.opendc.trace.TableColumnType
import org.opendc.trace.TableReader
-import java.sql.Timestamp
-import java.time.Duration
-import java.time.Instant
+import java.nio.ByteBuffer
+import java.nio.ByteOrder
import java.util.concurrent.atomic.AtomicBoolean
/**
@@ -35,10 +35,10 @@ import java.util.concurrent.atomic.AtomicBoolean
*/
internal class TraceReaderEnumerator<E>(
private val reader: TableReader,
- private val columns: List<TableColumn<*>>,
+ private val columns: List<TableColumn>,
private val cancelFlag: AtomicBoolean
) : Enumerator<E> {
- private val columnIndices = columns.map { reader.resolve(it) }.toIntArray()
+ private val columnIndices = columns.map { reader.resolve(it.name) }.toIntArray()
private var current: E? = null
override fun moveNext(): Boolean {
@@ -80,14 +80,35 @@ internal class TraceReaderEnumerator<E>(
return res
}
- private fun convertColumn(reader: TableReader, column: TableColumn<*>, columnIndex: Int): Any? {
- val value = reader.get(columnIndex)
-
+ private fun convertColumn(reader: TableReader, column: TableColumn, columnIndex: Int): Any? {
return when (column.type) {
- Instant::class.java -> Timestamp.from(value as Instant)
- Duration::class.java -> (value as Duration).toMillis()
- Set::class.java -> (value as Set<*>).toTypedArray()
- else -> value
+ is TableColumnType.Boolean -> reader.getBoolean(columnIndex)
+ is TableColumnType.Int -> reader.getInt(columnIndex)
+ is TableColumnType.Long -> reader.getLong(columnIndex)
+ is TableColumnType.Float -> reader.getFloat(columnIndex)
+ is TableColumnType.Double -> reader.getDouble(columnIndex)
+ is TableColumnType.String -> reader.getString(columnIndex)
+ is TableColumnType.UUID -> {
+ val uuid = reader.getUUID(columnIndex)
+
+ if (uuid != null) {
+ val uuidBytes = ByteArray(16)
+
+ ByteBuffer.wrap(uuidBytes)
+ .order(ByteOrder.BIG_ENDIAN)
+ .putLong(uuid.mostSignificantBits)
+ .putLong(uuid.leastSignificantBits)
+
+ uuidBytes
+ } else {
+ null
+ }
+ }
+ is TableColumnType.Instant -> reader.getInstant(columnIndex)?.toEpochMilli()
+ is TableColumnType.Duration -> reader.getDuration(columnIndex)?.toMillis() ?: 0
+ is TableColumnType.List -> reader.getList(columnIndex, Any::class.java)?.toTypedArray()
+ is TableColumnType.Set -> reader.getSet(columnIndex, Any::class.java)?.toTypedArray()
+ is TableColumnType.Map -> reader.getMap(columnIndex, Any::class.java, Any::class.java)
}
}
}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
index 8c571b82..dfcc22a3 100644
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
@@ -38,8 +38,11 @@ import org.apache.calcite.rex.RexNode
import org.apache.calcite.schema.*
import org.apache.calcite.schema.impl.AbstractTableQueryable
import org.apache.calcite.sql.type.SqlTypeName
+import org.opendc.trace.TableColumnType
+import java.nio.ByteBuffer
import java.time.Duration
import java.time.Instant
+import java.util.*
import java.util.concurrent.atomic.AtomicBoolean
/**
@@ -70,7 +73,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) :
val cancelFlag = DataContext.Variable.CANCEL_FLAG.get<AtomicBoolean>(root)
return object : AbstractEnumerable<Array<Any?>>() {
override fun enumerator(): Enumerator<Array<Any?>> =
- TraceReaderEnumerator(table.newReader(projection), projection ?: table.columns, cancelFlag)
+ TraceReaderEnumerator(table.newReader(projection?.map { it.name }), projection ?: table.columns, cancelFlag)
}
}
@@ -78,7 +81,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) :
val table = table
val columns = table.columns
val writer = table.newWriter()
- val columnIndices = columns.map { writer.resolve(it) }.toIntArray()
+ val columnIndices = columns.map { writer.resolve(it.name) }.toIntArray()
var rowCount = 0L
try {
@@ -90,16 +93,24 @@ internal class TraceTable(private val table: org.opendc.trace.Table) :
continue
}
val columnType = columns[index].type
-
- writer.set(
- columnIndices[index],
- when (columnType) {
- Duration::class.java -> Duration.ofMillis(value as Long)
- Instant::class.java -> Instant.ofEpochMilli(value as Long)
- Set::class.java -> (value as List<*>).toSet()
- else -> value
+ val columnIndex = columnIndices[index]
+ when (columnType) {
+ is TableColumnType.Boolean -> writer.setBoolean(columnIndex, value as Boolean)
+ is TableColumnType.Int -> writer.setInt(columnIndex, value as Int)
+ is TableColumnType.Long -> writer.setLong(columnIndex, value as Long)
+ is TableColumnType.Float -> writer.setFloat(columnIndex, value as Float)
+ is TableColumnType.Double -> writer.setDouble(columnIndex, value as Double)
+ is TableColumnType.String -> writer.setString(columnIndex, value as String)
+ is TableColumnType.UUID -> {
+ val bb = ByteBuffer.wrap(value as ByteArray)
+ writer.setUUID(columnIndex, UUID(bb.getLong(), bb.getLong()))
}
- )
+ is TableColumnType.Instant -> writer.setInstant(columnIndex, Instant.ofEpochMilli(value as Long))
+ is TableColumnType.Duration -> writer.setDuration(columnIndex, Duration.ofMillis(value as Long))
+ is TableColumnType.List -> writer.setList(columnIndex, value as List<*>)
+ is TableColumnType.Set -> writer.setSet(columnIndex, (value as List<*>).toSet())
+ is TableColumnType.Map -> writer.setMap(columnIndex, value as Map<*, *>)
+ }
}
writer.endRow()
@@ -161,16 +172,26 @@ internal class TraceTable(private val table: org.opendc.trace.Table) :
for (column in table.columns) {
names.add(column.name)
- types.add(
- when (column.type) {
- Instant::class.java -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP)
- Duration::class.java -> typeFactory.createSqlType(SqlTypeName.BIGINT)
- Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.UNKNOWN), -1)
- else -> typeFactory.createType(column.type)
- }
- )
+ types.add(mapType(typeFactory, column.type))
}
return typeFactory.createStructType(types, names)
}
+
+ private fun mapType(typeFactory: JavaTypeFactory, type: TableColumnType): RelDataType {
+ return when (type) {
+ is TableColumnType.Boolean -> typeFactory.createSqlType(SqlTypeName.BOOLEAN)
+ is TableColumnType.Int -> typeFactory.createSqlType(SqlTypeName.INTEGER)
+ is TableColumnType.Long -> typeFactory.createSqlType(SqlTypeName.BIGINT)
+ is TableColumnType.Float -> typeFactory.createSqlType(SqlTypeName.FLOAT)
+ is TableColumnType.Double -> typeFactory.createSqlType(SqlTypeName.DOUBLE)
+ is TableColumnType.String -> typeFactory.createSqlType(SqlTypeName.VARCHAR)
+ is TableColumnType.UUID -> typeFactory.createSqlType(SqlTypeName.BINARY, 16)
+ is TableColumnType.Instant -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP)
+ is TableColumnType.Duration -> typeFactory.createSqlType(SqlTypeName.BIGINT)
+ is TableColumnType.List -> typeFactory.createArrayType(mapType(typeFactory, type.elementType), -1)
+ is TableColumnType.Set -> typeFactory.createMultisetType(mapType(typeFactory, type.elementType), -1)
+ is TableColumnType.Map -> typeFactory.createMapType(mapType(typeFactory, type.keyType), mapType(typeFactory, type.valueType))
+ }
+ }
}