summaryrefslogtreecommitdiff
path: root/opendc-trace/opendc-trace-calcite
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-06-06 16:21:21 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-06-07 15:46:53 +0200
commit2358257c1080b7ce78270535f82f0b960d48261a (patch)
treebced69c02698e85f995aa9935ddcfb54df23a64f /opendc-trace/opendc-trace-calcite
parent61b6550d7a476ab1aae45a5b9385dfd6ca4f6b6f (diff)
refactor(trace/api): Introduce type system for trace API
This change updates the trace API by introducing a limited type system for the table columns. Previously, the table columns could have any possible type representable by the JVM. With this change, we limit the available types to a small type system.
Diffstat (limited to 'opendc-trace/opendc-trace-calcite')
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt45
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt59
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt96
3 files changed, 164 insertions, 36 deletions
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
index 1854f262..74bd188b 100644
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
@@ -24,10 +24,10 @@ package org.opendc.trace.calcite
import org.apache.calcite.linq4j.Enumerator
import org.opendc.trace.TableColumn
+import org.opendc.trace.TableColumnType
import org.opendc.trace.TableReader
-import java.sql.Timestamp
-import java.time.Duration
-import java.time.Instant
+import java.nio.ByteBuffer
+import java.nio.ByteOrder
import java.util.concurrent.atomic.AtomicBoolean
/**
@@ -35,10 +35,10 @@ import java.util.concurrent.atomic.AtomicBoolean
*/
internal class TraceReaderEnumerator<E>(
private val reader: TableReader,
- private val columns: List<TableColumn<*>>,
+ private val columns: List<TableColumn>,
private val cancelFlag: AtomicBoolean
) : Enumerator<E> {
- private val columnIndices = columns.map { reader.resolve(it) }.toIntArray()
+ private val columnIndices = columns.map { reader.resolve(it.name) }.toIntArray()
private var current: E? = null
override fun moveNext(): Boolean {
@@ -80,14 +80,35 @@ internal class TraceReaderEnumerator<E>(
return res
}
- private fun convertColumn(reader: TableReader, column: TableColumn<*>, columnIndex: Int): Any? {
- val value = reader.get(columnIndex)
-
+ private fun convertColumn(reader: TableReader, column: TableColumn, columnIndex: Int): Any? {
return when (column.type) {
- Instant::class.java -> Timestamp.from(value as Instant)
- Duration::class.java -> (value as Duration).toMillis()
- Set::class.java -> (value as Set<*>).toTypedArray()
- else -> value
+ is TableColumnType.Boolean -> reader.getBoolean(columnIndex)
+ is TableColumnType.Int -> reader.getInt(columnIndex)
+ is TableColumnType.Long -> reader.getLong(columnIndex)
+ is TableColumnType.Float -> reader.getFloat(columnIndex)
+ is TableColumnType.Double -> reader.getDouble(columnIndex)
+ is TableColumnType.String -> reader.getString(columnIndex)
+ is TableColumnType.UUID -> {
+ val uuid = reader.getUUID(columnIndex)
+
+ if (uuid != null) {
+ val uuidBytes = ByteArray(16)
+
+ ByteBuffer.wrap(uuidBytes)
+ .order(ByteOrder.BIG_ENDIAN)
+ .putLong(uuid.mostSignificantBits)
+ .putLong(uuid.leastSignificantBits)
+
+ uuidBytes
+ } else {
+ null
+ }
+ }
+ is TableColumnType.Instant -> reader.getInstant(columnIndex)?.toEpochMilli()
+ is TableColumnType.Duration -> reader.getDuration(columnIndex)?.toMillis() ?: 0
+ is TableColumnType.List -> reader.getList(columnIndex, Any::class.java)?.toTypedArray()
+ is TableColumnType.Set -> reader.getSet(columnIndex, Any::class.java)?.toTypedArray()
+ is TableColumnType.Map -> reader.getMap(columnIndex, Any::class.java, Any::class.java)
}
}
}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
index 8c571b82..dfcc22a3 100644
--- a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
@@ -38,8 +38,11 @@ import org.apache.calcite.rex.RexNode
import org.apache.calcite.schema.*
import org.apache.calcite.schema.impl.AbstractTableQueryable
import org.apache.calcite.sql.type.SqlTypeName
+import org.opendc.trace.TableColumnType
+import java.nio.ByteBuffer
import java.time.Duration
import java.time.Instant
+import java.util.*
import java.util.concurrent.atomic.AtomicBoolean
/**
@@ -70,7 +73,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) :
val cancelFlag = DataContext.Variable.CANCEL_FLAG.get<AtomicBoolean>(root)
return object : AbstractEnumerable<Array<Any?>>() {
override fun enumerator(): Enumerator<Array<Any?>> =
- TraceReaderEnumerator(table.newReader(projection), projection ?: table.columns, cancelFlag)
+ TraceReaderEnumerator(table.newReader(projection?.map { it.name }), projection ?: table.columns, cancelFlag)
}
}
@@ -78,7 +81,7 @@ internal class TraceTable(private val table: org.opendc.trace.Table) :
val table = table
val columns = table.columns
val writer = table.newWriter()
- val columnIndices = columns.map { writer.resolve(it) }.toIntArray()
+ val columnIndices = columns.map { writer.resolve(it.name) }.toIntArray()
var rowCount = 0L
try {
@@ -90,16 +93,24 @@ internal class TraceTable(private val table: org.opendc.trace.Table) :
continue
}
val columnType = columns[index].type
-
- writer.set(
- columnIndices[index],
- when (columnType) {
- Duration::class.java -> Duration.ofMillis(value as Long)
- Instant::class.java -> Instant.ofEpochMilli(value as Long)
- Set::class.java -> (value as List<*>).toSet()
- else -> value
+ val columnIndex = columnIndices[index]
+ when (columnType) {
+ is TableColumnType.Boolean -> writer.setBoolean(columnIndex, value as Boolean)
+ is TableColumnType.Int -> writer.setInt(columnIndex, value as Int)
+ is TableColumnType.Long -> writer.setLong(columnIndex, value as Long)
+ is TableColumnType.Float -> writer.setFloat(columnIndex, value as Float)
+ is TableColumnType.Double -> writer.setDouble(columnIndex, value as Double)
+ is TableColumnType.String -> writer.setString(columnIndex, value as String)
+ is TableColumnType.UUID -> {
+ val bb = ByteBuffer.wrap(value as ByteArray)
+ writer.setUUID(columnIndex, UUID(bb.getLong(), bb.getLong()))
}
- )
+ is TableColumnType.Instant -> writer.setInstant(columnIndex, Instant.ofEpochMilli(value as Long))
+ is TableColumnType.Duration -> writer.setDuration(columnIndex, Duration.ofMillis(value as Long))
+ is TableColumnType.List -> writer.setList(columnIndex, value as List<*>)
+ is TableColumnType.Set -> writer.setSet(columnIndex, (value as List<*>).toSet())
+ is TableColumnType.Map -> writer.setMap(columnIndex, value as Map<*, *>)
+ }
}
writer.endRow()
@@ -161,16 +172,26 @@ internal class TraceTable(private val table: org.opendc.trace.Table) :
for (column in table.columns) {
names.add(column.name)
- types.add(
- when (column.type) {
- Instant::class.java -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP)
- Duration::class.java -> typeFactory.createSqlType(SqlTypeName.BIGINT)
- Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.UNKNOWN), -1)
- else -> typeFactory.createType(column.type)
- }
- )
+ types.add(mapType(typeFactory, column.type))
}
return typeFactory.createStructType(types, names)
}
+
+ private fun mapType(typeFactory: JavaTypeFactory, type: TableColumnType): RelDataType {
+ return when (type) {
+ is TableColumnType.Boolean -> typeFactory.createSqlType(SqlTypeName.BOOLEAN)
+ is TableColumnType.Int -> typeFactory.createSqlType(SqlTypeName.INTEGER)
+ is TableColumnType.Long -> typeFactory.createSqlType(SqlTypeName.BIGINT)
+ is TableColumnType.Float -> typeFactory.createSqlType(SqlTypeName.FLOAT)
+ is TableColumnType.Double -> typeFactory.createSqlType(SqlTypeName.DOUBLE)
+ is TableColumnType.String -> typeFactory.createSqlType(SqlTypeName.VARCHAR)
+ is TableColumnType.UUID -> typeFactory.createSqlType(SqlTypeName.BINARY, 16)
+ is TableColumnType.Instant -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP)
+ is TableColumnType.Duration -> typeFactory.createSqlType(SqlTypeName.BIGINT)
+ is TableColumnType.List -> typeFactory.createArrayType(mapType(typeFactory, type.elementType), -1)
+ is TableColumnType.Set -> typeFactory.createMultisetType(mapType(typeFactory, type.elementType), -1)
+ is TableColumnType.Map -> typeFactory.createMapType(mapType(typeFactory, type.keyType), mapType(typeFactory, type.valueType))
+ }
+ }
}
diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt
index d2877d7c..d8729034 100644
--- a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt
+++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt
@@ -22,16 +22,24 @@
package org.opendc.trace.calcite
+import io.mockk.every
+import io.mockk.mockk
import org.apache.calcite.jdbc.CalciteConnection
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableColumnType
+import org.opendc.trace.TableReader
import org.opendc.trace.Trace
+import org.opendc.trace.conv.TABLE_RESOURCES
import java.nio.file.Files
import java.nio.file.Paths
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.Statement
import java.sql.Timestamp
+import java.time.Duration
+import java.time.Instant
import java.util.*
/**
@@ -41,11 +49,11 @@ class CalciteTest {
/**
* The trace to experiment with.
*/
- private val trace = Trace.open(Paths.get("src/test/resources/trace"), format = "opendc-vm")
+ private val odcTrace = Trace.open(Paths.get("src/test/resources/trace"), format = "opendc-vm")
@Test
fun testResources() {
- runQuery(trace, "SELECT * FROM trace.resources") { rs ->
+ runQuery(odcTrace, "SELECT * FROM trace.resources") { rs ->
assertAll(
{ assertTrue(rs.next()) },
{ assertEquals("1019", rs.getString("id")) },
@@ -65,7 +73,7 @@ class CalciteTest {
@Test
fun testResourceStates() {
- runQuery(trace, "SELECT * FROM trace.resource_states") { rs ->
+ runQuery(odcTrace, "SELECT * FROM trace.resource_states") { rs ->
assertAll(
{ assertTrue(rs.next()) },
{ assertEquals("1019", rs.getString("id")) },
@@ -80,7 +88,7 @@ class CalciteTest {
@Test
fun testInterferenceGroups() {
- runQuery(trace, "SELECT * FROM trace.interference_groups") { rs ->
+ runQuery(odcTrace, "SELECT * FROM trace.interference_groups") { rs ->
assertAll(
{ assertTrue(rs.next()) },
{ assertArrayEquals(arrayOf("1019", "1023", "1052"), rs.getArray("members").array as Array<*>) },
@@ -92,7 +100,7 @@ class CalciteTest {
@Test
fun testComplexQuery() {
- runQuery(trace, "SELECT max(cpu_usage) as max_cpu_usage, avg(cpu_usage) as avg_cpu_usage FROM trace.resource_states") { rs ->
+ runQuery(odcTrace, "SELECT max(cpu_usage) as max_cpu_usage, avg(cpu_usage) as avg_cpu_usage FROM trace.resource_states") { rs ->
assertAll(
{ assertTrue(rs.next()) },
{ assertEquals(249.59993808, rs.getDouble("max_cpu_usage")) },
@@ -128,6 +136,84 @@ class CalciteTest {
}
}
+ @Test
+ fun testUUID() {
+ val trace = mockk<Trace>()
+ every { trace.tables } returns listOf(TABLE_RESOURCES)
+ every { trace.getTable(TABLE_RESOURCES)!!.columns } returns listOf(
+ TableColumn("id", TableColumnType.UUID)
+ )
+ every { trace.getTable(TABLE_RESOURCES)!!.newReader() } answers {
+ object : TableReader {
+ override fun nextRow(): Boolean = true
+
+ override fun resolve(name: String): Int {
+ return when (name) {
+ "id" -> 0
+ else -> -1
+ }
+ }
+
+ override fun isNull(index: Int): Boolean = false
+
+ override fun getBoolean(index: Int): Boolean {
+ TODO("not implemented")
+ }
+
+ override fun getInt(index: Int): Int {
+ TODO("not implemented")
+ }
+
+ override fun getLong(index: Int): Long {
+ TODO("not implemented")
+ }
+
+ override fun getFloat(index: Int): Float {
+ TODO("not implemented")
+ }
+
+ override fun getDouble(index: Int): Double {
+ TODO("not implemented")
+ }
+
+ override fun getString(index: Int): String? {
+ TODO("not implemented")
+ }
+
+ override fun getUUID(index: Int): UUID = UUID(1, 2)
+
+ override fun getInstant(index: Int): Instant? {
+ TODO("not implemented")
+ }
+
+ override fun getDuration(index: Int): Duration? {
+ TODO("not implemented")
+ }
+
+ override fun <T> getList(index: Int, elementType: Class<T>): List<T>? {
+ TODO("not implemented")
+ }
+
+ override fun <T> getSet(index: Int, elementType: Class<T>): Set<T>? {
+ TODO("not implemented")
+ }
+
+ override fun <K, V> getMap(index: Int, keyType: Class<K>, valueType: Class<V>): Map<K, V>? {
+ TODO("not implemented")
+ }
+
+ override fun close() {}
+ }
+ }
+
+ runQuery(trace, "SELECT id FROM trace.resources") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertArrayEquals(byteArrayOf(0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2), rs.getBytes("id")) },
+ )
+ }
+ }
+
/**
* Helper function to run statement for the specified trace.
*/