diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-06-06 16:21:21 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-06-07 15:46:53 +0200 |
| commit | 2358257c1080b7ce78270535f82f0b960d48261a (patch) | |
| tree | bced69c02698e85f995aa9935ddcfb54df23a64f /opendc-trace/opendc-trace-calcite | |
| parent | 61b6550d7a476ab1aae45a5b9385dfd6ca4f6b6f (diff) | |
refactor(trace/api): Introduce type system for trace API
This change updates the trace API by introducing a limited type system
for the table columns. Previously, the table columns could have any
possible type representable by the JVM. With this change, we limit the
available types to a small type system.
Diffstat (limited to 'opendc-trace/opendc-trace-calcite')
3 files changed, 164 insertions, 36 deletions
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt index 1854f262..74bd188b 100644 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt @@ -24,10 +24,10 @@ package org.opendc.trace.calcite import org.apache.calcite.linq4j.Enumerator import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType import org.opendc.trace.TableReader -import java.sql.Timestamp -import java.time.Duration -import java.time.Instant +import java.nio.ByteBuffer +import java.nio.ByteOrder import java.util.concurrent.atomic.AtomicBoolean /** @@ -35,10 +35,10 @@ import java.util.concurrent.atomic.AtomicBoolean */ internal class TraceReaderEnumerator<E>( private val reader: TableReader, - private val columns: List<TableColumn<*>>, + private val columns: List<TableColumn>, private val cancelFlag: AtomicBoolean ) : Enumerator<E> { - private val columnIndices = columns.map { reader.resolve(it) }.toIntArray() + private val columnIndices = columns.map { reader.resolve(it.name) }.toIntArray() private var current: E? = null override fun moveNext(): Boolean { @@ -80,14 +80,35 @@ internal class TraceReaderEnumerator<E>( return res } - private fun convertColumn(reader: TableReader, column: TableColumn<*>, columnIndex: Int): Any? { - val value = reader.get(columnIndex) - + private fun convertColumn(reader: TableReader, column: TableColumn, columnIndex: Int): Any? { return when (column.type) { - Instant::class.java -> Timestamp.from(value as Instant) - Duration::class.java -> (value as Duration).toMillis() - Set::class.java -> (value as Set<*>).toTypedArray() - else -> value + is TableColumnType.Boolean -> reader.getBoolean(columnIndex) + is TableColumnType.Int -> reader.getInt(columnIndex) + is TableColumnType.Long -> reader.getLong(columnIndex) + is TableColumnType.Float -> reader.getFloat(columnIndex) + is TableColumnType.Double -> reader.getDouble(columnIndex) + is TableColumnType.String -> reader.getString(columnIndex) + is TableColumnType.UUID -> { + val uuid = reader.getUUID(columnIndex) + + if (uuid != null) { + val uuidBytes = ByteArray(16) + + ByteBuffer.wrap(uuidBytes) + .order(ByteOrder.BIG_ENDIAN) + .putLong(uuid.mostSignificantBits) + .putLong(uuid.leastSignificantBits) + + uuidBytes + } else { + null + } + } + is TableColumnType.Instant -> reader.getInstant(columnIndex)?.toEpochMilli() + is TableColumnType.Duration -> reader.getDuration(columnIndex)?.toMillis() ?: 0 + is TableColumnType.List -> reader.getList(columnIndex, Any::class.java)?.toTypedArray() + is TableColumnType.Set -> reader.getSet(columnIndex, Any::class.java)?.toTypedArray() + is TableColumnType.Map -> reader.getMap(columnIndex, Any::class.java, Any::class.java) } } } diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt index 8c571b82..dfcc22a3 100644 --- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt @@ -38,8 +38,11 @@ import org.apache.calcite.rex.RexNode import org.apache.calcite.schema.* import org.apache.calcite.schema.impl.AbstractTableQueryable import org.apache.calcite.sql.type.SqlTypeName +import org.opendc.trace.TableColumnType +import java.nio.ByteBuffer import java.time.Duration import java.time.Instant +import java.util.* import java.util.concurrent.atomic.AtomicBoolean /** @@ -70,7 +73,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : val cancelFlag = DataContext.Variable.CANCEL_FLAG.get<AtomicBoolean>(root) return object : AbstractEnumerable<Array<Any?>>() { override fun enumerator(): Enumerator<Array<Any?>> = - TraceReaderEnumerator(table.newReader(projection), projection ?: table.columns, cancelFlag) + TraceReaderEnumerator(table.newReader(projection?.map { it.name }), projection ?: table.columns, cancelFlag) } } @@ -78,7 +81,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : val table = table val columns = table.columns val writer = table.newWriter() - val columnIndices = columns.map { writer.resolve(it) }.toIntArray() + val columnIndices = columns.map { writer.resolve(it.name) }.toIntArray() var rowCount = 0L try { @@ -90,16 +93,24 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : continue } val columnType = columns[index].type - - writer.set( - columnIndices[index], - when (columnType) { - Duration::class.java -> Duration.ofMillis(value as Long) - Instant::class.java -> Instant.ofEpochMilli(value as Long) - Set::class.java -> (value as List<*>).toSet() - else -> value + val columnIndex = columnIndices[index] + when (columnType) { + is TableColumnType.Boolean -> writer.setBoolean(columnIndex, value as Boolean) + is TableColumnType.Int -> writer.setInt(columnIndex, value as Int) + is TableColumnType.Long -> writer.setLong(columnIndex, value as Long) + is TableColumnType.Float -> writer.setFloat(columnIndex, value as Float) + is TableColumnType.Double -> writer.setDouble(columnIndex, value as Double) + is TableColumnType.String -> writer.setString(columnIndex, value as String) + is TableColumnType.UUID -> { + val bb = ByteBuffer.wrap(value as ByteArray) + writer.setUUID(columnIndex, UUID(bb.getLong(), bb.getLong())) } - ) + is TableColumnType.Instant -> writer.setInstant(columnIndex, Instant.ofEpochMilli(value as Long)) + is TableColumnType.Duration -> writer.setDuration(columnIndex, Duration.ofMillis(value as Long)) + is TableColumnType.List -> writer.setList(columnIndex, value as List<*>) + is TableColumnType.Set -> writer.setSet(columnIndex, (value as List<*>).toSet()) + is TableColumnType.Map -> writer.setMap(columnIndex, value as Map<*, *>) + } } writer.endRow() @@ -161,16 +172,26 @@ internal class TraceTable(private val table: org.opendc.trace.Table) : for (column in table.columns) { names.add(column.name) - types.add( - when (column.type) { - Instant::class.java -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP) - Duration::class.java -> typeFactory.createSqlType(SqlTypeName.BIGINT) - Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.UNKNOWN), -1) - else -> typeFactory.createType(column.type) - } - ) + types.add(mapType(typeFactory, column.type)) } return typeFactory.createStructType(types, names) } + + private fun mapType(typeFactory: JavaTypeFactory, type: TableColumnType): RelDataType { + return when (type) { + is TableColumnType.Boolean -> typeFactory.createSqlType(SqlTypeName.BOOLEAN) + is TableColumnType.Int -> typeFactory.createSqlType(SqlTypeName.INTEGER) + is TableColumnType.Long -> typeFactory.createSqlType(SqlTypeName.BIGINT) + is TableColumnType.Float -> typeFactory.createSqlType(SqlTypeName.FLOAT) + is TableColumnType.Double -> typeFactory.createSqlType(SqlTypeName.DOUBLE) + is TableColumnType.String -> typeFactory.createSqlType(SqlTypeName.VARCHAR) + is TableColumnType.UUID -> typeFactory.createSqlType(SqlTypeName.BINARY, 16) + is TableColumnType.Instant -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP) + is TableColumnType.Duration -> typeFactory.createSqlType(SqlTypeName.BIGINT) + is TableColumnType.List -> typeFactory.createArrayType(mapType(typeFactory, type.elementType), -1) + is TableColumnType.Set -> typeFactory.createMultisetType(mapType(typeFactory, type.elementType), -1) + is TableColumnType.Map -> typeFactory.createMapType(mapType(typeFactory, type.keyType), mapType(typeFactory, type.valueType)) + } + } } diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt index d2877d7c..d8729034 100644 --- a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt +++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt @@ -22,16 +22,24 @@ package org.opendc.trace.calcite +import io.mockk.every +import io.mockk.mockk import org.apache.calcite.jdbc.CalciteConnection import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test +import org.opendc.trace.TableColumn +import org.opendc.trace.TableColumnType +import org.opendc.trace.TableReader import org.opendc.trace.Trace +import org.opendc.trace.conv.TABLE_RESOURCES import java.nio.file.Files import java.nio.file.Paths import java.sql.DriverManager import java.sql.ResultSet import java.sql.Statement import java.sql.Timestamp +import java.time.Duration +import java.time.Instant import java.util.* /** @@ -41,11 +49,11 @@ class CalciteTest { /** * The trace to experiment with. */ - private val trace = Trace.open(Paths.get("src/test/resources/trace"), format = "opendc-vm") + private val odcTrace = Trace.open(Paths.get("src/test/resources/trace"), format = "opendc-vm") @Test fun testResources() { - runQuery(trace, "SELECT * FROM trace.resources") { rs -> + runQuery(odcTrace, "SELECT * FROM trace.resources") { rs -> assertAll( { assertTrue(rs.next()) }, { assertEquals("1019", rs.getString("id")) }, @@ -65,7 +73,7 @@ class CalciteTest { @Test fun testResourceStates() { - runQuery(trace, "SELECT * FROM trace.resource_states") { rs -> + runQuery(odcTrace, "SELECT * FROM trace.resource_states") { rs -> assertAll( { assertTrue(rs.next()) }, { assertEquals("1019", rs.getString("id")) }, @@ -80,7 +88,7 @@ class CalciteTest { @Test fun testInterferenceGroups() { - runQuery(trace, "SELECT * FROM trace.interference_groups") { rs -> + runQuery(odcTrace, "SELECT * FROM trace.interference_groups") { rs -> assertAll( { assertTrue(rs.next()) }, { assertArrayEquals(arrayOf("1019", "1023", "1052"), rs.getArray("members").array as Array<*>) }, @@ -92,7 +100,7 @@ class CalciteTest { @Test fun testComplexQuery() { - runQuery(trace, "SELECT max(cpu_usage) as max_cpu_usage, avg(cpu_usage) as avg_cpu_usage FROM trace.resource_states") { rs -> + runQuery(odcTrace, "SELECT max(cpu_usage) as max_cpu_usage, avg(cpu_usage) as avg_cpu_usage FROM trace.resource_states") { rs -> assertAll( { assertTrue(rs.next()) }, { assertEquals(249.59993808, rs.getDouble("max_cpu_usage")) }, @@ -128,6 +136,84 @@ class CalciteTest { } } + @Test + fun testUUID() { + val trace = mockk<Trace>() + every { trace.tables } returns listOf(TABLE_RESOURCES) + every { trace.getTable(TABLE_RESOURCES)!!.columns } returns listOf( + TableColumn("id", TableColumnType.UUID) + ) + every { trace.getTable(TABLE_RESOURCES)!!.newReader() } answers { + object : TableReader { + override fun nextRow(): Boolean = true + + override fun resolve(name: String): Int { + return when (name) { + "id" -> 0 + else -> -1 + } + } + + override fun isNull(index: Int): Boolean = false + + override fun getBoolean(index: Int): Boolean { + TODO("not implemented") + } + + override fun getInt(index: Int): Int { + TODO("not implemented") + } + + override fun getLong(index: Int): Long { + TODO("not implemented") + } + + override fun getFloat(index: Int): Float { + TODO("not implemented") + } + + override fun getDouble(index: Int): Double { + TODO("not implemented") + } + + override fun getString(index: Int): String? { + TODO("not implemented") + } + + override fun getUUID(index: Int): UUID = UUID(1, 2) + + override fun getInstant(index: Int): Instant? { + TODO("not implemented") + } + + override fun getDuration(index: Int): Duration? { + TODO("not implemented") + } + + override fun <T> getList(index: Int, elementType: Class<T>): List<T>? { + TODO("not implemented") + } + + override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? { + TODO("not implemented") + } + + override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? { + TODO("not implemented") + } + + override fun close() {} + } + } + + runQuery(trace, "SELECT id FROM trace.resources") { rs -> + assertAll( + { assertTrue(rs.next()) }, + { assertArrayEquals(byteArrayOf(0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2), rs.getBytes("id")) }, + ) + } + } + /** * Helper function to run statement for the specified trace. */ |
