diff options
Diffstat (limited to 'opendc-trace')
16 files changed, 566 insertions, 40 deletions
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt index 776c40c0..b77a2982 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt @@ -33,7 +33,7 @@ public class TableColumn<out T>(public val name: String, type: Class<T>) { /** * The type of the column. */ - private val type: Class<*> = type + public val type: Class<*> = type /** * Determine whether the type of the column is a subtype of [column]. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt index 532f6d24..5e8859e4 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt @@ -24,22 +24,21 @@ package org.opendc.trace.conv import org.opendc.trace.TableColumn -import org.opendc.trace.column /** * Members of the interference group. */ @JvmField -public val INTERFERENCE_GROUP_MEMBERS: TableColumn<Set<String>> = column("interference_group:members") +public val INTERFERENCE_GROUP_MEMBERS: TableColumn<Set<String>> = column("members") /** * Target load after which the interference occurs. */ @JvmField -public val INTERFERENCE_GROUP_TARGET: TableColumn<Double> = column("interference_group:target") +public val INTERFERENCE_GROUP_TARGET: TableColumn<Double> = column("target") /** * Performance score when the interference occurs. */ @JvmField -public val INTERFERENCE_GROUP_SCORE: TableColumn<Double> = column("interference_group:score") +public val INTERFERENCE_GROUP_SCORE: TableColumn<Double> = column("score") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt index e9fc5d44..e602e534 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt @@ -24,47 +24,46 @@ package org.opendc.trace.conv import org.opendc.trace.TableColumn -import org.opendc.trace.column import java.time.Instant /** * Identifier of the resource. */ @JvmField -public val RESOURCE_ID: TableColumn<String> = column("resource:id") +public val RESOURCE_ID: TableColumn<String> = column("id") /** * The cluster to which the resource belongs. */ @JvmField -public val RESOURCE_CLUSTER_ID: TableColumn<String> = column("resource:cluster_id") +public val RESOURCE_CLUSTER_ID: TableColumn<String> = column("cluster_id") /** * Start time for the resource. */ @JvmField -public val RESOURCE_START_TIME: TableColumn<Instant> = column("resource:start_time") +public val RESOURCE_START_TIME: TableColumn<Instant> = column("start_time") /** * End time for the resource. */ @JvmField -public val RESOURCE_STOP_TIME: TableColumn<Instant> = column("resource:stop_time") +public val RESOURCE_STOP_TIME: TableColumn<Instant> = column("stop_time") /** * Number of CPUs for the resource. */ @JvmField -public val RESOURCE_CPU_COUNT: TableColumn<Int> = column("resource:cpu_count") +public val RESOURCE_CPU_COUNT: TableColumn<Int> = column("cpu_count") /** * Total CPU capacity of the resource in MHz. */ @JvmField -public val RESOURCE_CPU_CAPACITY: TableColumn<Double> = column("resource:cpu_capacity") +public val RESOURCE_CPU_CAPACITY: TableColumn<Double> = column("cpu_capacity") /** * Memory capacity for the resource in KB. */ @JvmField -public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = column("resource:mem_capacity") +public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = column("mem_capacity") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt index d5bbafd7..3a44f817 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt @@ -24,7 +24,6 @@ package org.opendc.trace.conv import org.opendc.trace.TableColumn -import org.opendc.trace.column import java.time.Duration import java.time.Instant @@ -32,70 +31,70 @@ import java.time.Instant * The timestamp at which the state was recorded. */ @JvmField -public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = column("resource_state:timestamp") +public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = column("timestamp") /** * Duration for the state. */ @JvmField -public val RESOURCE_STATE_DURATION: TableColumn<Duration> = column("resource_state:duration") +public val RESOURCE_STATE_DURATION: TableColumn<Duration> = column("duration") /** * A flag to indicate that the resource is powered on. */ @JvmField -public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = column("resource_state:powered_on") +public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = column("powered_on") /** * Total CPU usage of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = column("resource_state:cpu_usage") +public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = column("cpu_usage") /** * Total CPU usage of the resource in percentage. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = column("resource_state:cpu_usage_pct") +public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = column("cpu_usage_pct") /** * Total CPU demand of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = column("resource_state:cpu_demand") +public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = column("cpu_demand") /** * CPU ready percentage. */ @JvmField -public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = column("resource_state:cpu_ready_pct") +public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = column("cpu_ready_pct") /** * Memory usage of the resource in KB. */ @JvmField -public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = column("resource_state:mem_usage") +public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = column("mem_usage") /** * Disk read throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = column("resource_state:disk_read") +public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = column("disk_read") /** * Disk write throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = column("resource_state:disk_write") +public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = column("disk_write") /** * Network receive throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_RX: TableColumn<Double> = column("resource_state:net_rx") +public val RESOURCE_STATE_NET_RX: TableColumn<Double> = column("net_rx") /** * Network transmit throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_TX: TableColumn<Double> = column("resource_state:net_tx") +public val RESOURCE_STATE_NET_TX: TableColumn<Double> = column("net_tx") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt index 31a58360..a58505e9 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt @@ -21,7 +21,9 @@ */ @file:JvmName("TableColumns") -package org.opendc.trace +package org.opendc.trace.conv + +import org.opendc.trace.TableColumn /** * Construct a [TableColumn] with the specified [name] and type [T]. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt index 397c0794..e6daafb7 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt @@ -24,7 +24,6 @@ package org.opendc.trace.conv import org.opendc.trace.TableColumn -import org.opendc.trace.column import java.time.Duration import java.time.Instant @@ -32,70 +31,70 @@ import java.time.Instant * A column containing the task identifier. */ @JvmField -public val TASK_ID: TableColumn<String> = column("task:id") +public val TASK_ID: TableColumn<String> = column("id") /** * A column containing the identifier of the workflow. */ @JvmField -public val TASK_WORKFLOW_ID: TableColumn<String> = column("task:workflow_id") +public val TASK_WORKFLOW_ID: TableColumn<String> = column("workflow_id") /** * A column containing the submission time of the task. */ @JvmField -public val TASK_SUBMIT_TIME: TableColumn<Instant> = column("task:submit_time") +public val TASK_SUBMIT_TIME: TableColumn<Instant> = column("submit_time") /** * A column containing the wait time of the task. */ @JvmField -public val TASK_WAIT_TIME: TableColumn<Instant> = column("task:wait_time") +public val TASK_WAIT_TIME: TableColumn<Instant> = column("wait_time") /** * A column containing the runtime time of the task. */ @JvmField -public val TASK_RUNTIME: TableColumn<Duration> = column("task:runtime") +public val TASK_RUNTIME: TableColumn<Duration> = column("runtime") /** * A column containing the parents of a task. */ @JvmField -public val TASK_PARENTS: TableColumn<Set<String>> = column("task:parents") +public val TASK_PARENTS: TableColumn<Set<String>> = column("parents") /** * A column containing the children of a task. */ @JvmField -public val TASK_CHILDREN: TableColumn<Set<String>> = column("task:children") +public val TASK_CHILDREN: TableColumn<Set<String>> = column("children") /** * A column containing the requested CPUs of a task. */ @JvmField -public val TASK_REQ_NCPUS: TableColumn<Int> = column("task:req_ncpus") +public val TASK_REQ_NCPUS: TableColumn<Int> = column("req_ncpus") /** * A column containing the allocated CPUs of a task. */ @JvmField -public val TASK_ALLOC_NCPUS: TableColumn<Int> = column("task:alloc_ncpus") +public val TASK_ALLOC_NCPUS: TableColumn<Int> = column("alloc_ncpus") /** * A column containing the status of a task. */ @JvmField -public val TASK_STATUS: TableColumn<Int> = column("task:status") +public val TASK_STATUS: TableColumn<Int> = column("status") /** * A column containing the group id of a task. */ @JvmField -public val TASK_GROUP_ID: TableColumn<Int> = column("task:group_id") +public val TASK_GROUP_ID: TableColumn<Int> = column("group_id") /** * A column containing the user id of a task. */ @JvmField -public val TASK_USER_ID: TableColumn<Int> = column("task:user_id") +public val TASK_USER_ID: TableColumn<Int> = column("user_id") diff --git a/opendc-trace/opendc-trace-calcite/build.gradle.kts b/opendc-trace/opendc-trace-calcite/build.gradle.kts new file mode 100644 index 00000000..2ffdac3c --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/build.gradle.kts @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Apache Calcite (SQL) integration for the OpenDC trace library" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` +} + +dependencies { + api(projects.opendcTrace.opendcTraceApi) + + api(libs.calcite.core) + + testRuntimeOnly(projects.opendcTrace.opendcTraceOpendc) + testRuntimeOnly(libs.slf4j.simple) +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt new file mode 100644 index 00000000..1854f262 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.linq4j.Enumerator +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader +import java.sql.Timestamp +import java.time.Duration +import java.time.Instant +import java.util.concurrent.atomic.AtomicBoolean + +/** + * An [Enumerator] for a [TableReader]. + */ +internal class TraceReaderEnumerator<E>( + private val reader: TableReader, + private val columns: List<TableColumn<*>>, + private val cancelFlag: AtomicBoolean +) : Enumerator<E> { + private val columnIndices = columns.map { reader.resolve(it) }.toIntArray() + private var current: E? = null + + override fun moveNext(): Boolean { + if (cancelFlag.get()) { + return false + } + + val reader = reader + val res = reader.nextRow() + + if (res) { + @Suppress("UNCHECKED_CAST") + current = convertRow(reader) as E + } else { + current = null + } + + return res + } + + override fun current(): E = checkNotNull(current) + + override fun reset() { + throw UnsupportedOperationException() + } + + override fun close() { + reader.close() + } + + private fun convertRow(reader: TableReader): Array<Any?> { + val res = arrayOfNulls<Any?>(columns.size) + val columnIndices = columnIndices + + for ((index, column) in columns.withIndex()) { + val columnIndex = columnIndices[index] + res[index] = convertColumn(reader, column, columnIndex) + } + return res + } + + private fun convertColumn(reader: TableReader, column: TableColumn<*>, columnIndex: Int): Any? { + val value = reader.get(columnIndex) + + return when (column.type) { + Instant::class.java -> Timestamp.from(value as Instant) + Duration::class.java -> (value as Duration).toMillis() + Set::class.java -> (value as Set<*>).toTypedArray() + else -> value + } + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt new file mode 100644 index 00000000..298a59dc --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.schema.Schema +import org.apache.calcite.schema.Table +import org.apache.calcite.schema.impl.AbstractSchema +import org.opendc.trace.Trace + +/** + * A Calcite [Schema] that exposes an OpenDC [Trace] into multiple SQL tables. + * + * @param trace The [Trace] to create a schema for. + */ +public class TraceSchema(private val trace: Trace) : AbstractSchema() { + + private val tables: Map<String, TraceTable> by lazy { + trace.tables.associateWith { + val table = checkNotNull(trace.getTable(it)) { "Unexpected null table" } + TraceTable(table) + } + } + + override fun getTableMap(): Map<String, Table> = tables +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt new file mode 100644 index 00000000..3c6badc8 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.model.ModelHandler +import org.apache.calcite.schema.Schema +import org.apache.calcite.schema.SchemaFactory +import org.apache.calcite.schema.SchemaPlus +import org.opendc.trace.Trace +import java.io.File +import java.nio.file.Paths + +/** + * Factory that creates a [TraceSchema]. + * + * This factory allows users to include a schema that references a trace in a `model.json` file. + */ +public class TraceSchemaFactory : SchemaFactory { + override fun create(parentSchema: SchemaPlus, name: String, operand: Map<String, Any>): Schema { + val base = operand[ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName] as File? + val pathParam = requireNotNull(operand["path"]) { "Trace path not specified" } as String + val path = if (base != null) File(base, pathParam).toPath() else Paths.get(pathParam) + + val format = requireNotNull(operand["format"]) { "Trace format not specified" } as String + val create = operand.getOrDefault("create", false) as Boolean + + val trace = if (create) Trace.create(path, format) else Trace.open(path, format) + return TraceSchema(trace) + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt new file mode 100644 index 00000000..8c3fe4e2 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.DataContext +import org.apache.calcite.adapter.java.JavaTypeFactory +import org.apache.calcite.linq4j.AbstractEnumerable +import org.apache.calcite.linq4j.Enumerable +import org.apache.calcite.linq4j.Enumerator +import org.apache.calcite.rel.type.RelDataType +import org.apache.calcite.rel.type.RelDataTypeFactory +import org.apache.calcite.schema.ScannableTable +import org.apache.calcite.schema.Table +import org.apache.calcite.schema.impl.AbstractTable +import org.apache.calcite.sql.type.SqlTypeName +import java.time.Duration +import java.time.Instant +import java.util.concurrent.atomic.AtomicBoolean + +/** + * A Calcite [Table] that exposes an OpenDC [org.opendc.trace.Table] as SQL table. + */ +internal class TraceTable(private val table: org.opendc.trace.Table) : AbstractTable(), ScannableTable { + + private var rowType: RelDataType? = null + + override fun getRowType(typeFactory: RelDataTypeFactory): RelDataType { + var rowType = rowType + if (rowType == null) { + rowType = deduceRowType(typeFactory as JavaTypeFactory) + this.rowType = rowType + } + + return rowType + } + + override fun scan(root: DataContext): Enumerable<Array<Any?>> { + val cancelFlag = DataContext.Variable.CANCEL_FLAG.get<AtomicBoolean>(root) + return object : AbstractEnumerable<Array<Any?>>() { + override fun enumerator(): Enumerator<Array<Any?>> = TraceReaderEnumerator(table.newReader(), table.columns, cancelFlag) + } + } + + override fun toString(): String = "TraceTable" + + private fun deduceRowType(typeFactory: JavaTypeFactory): RelDataType { + val types = mutableListOf<RelDataType>() + val names = mutableListOf<String>() + + for (column in table.columns) { + names.add(column.name) + types.add( + when (column.type) { + Instant::class.java -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP) + Duration::class.java -> typeFactory.createSqlType(SqlTypeName.BIGINT) + Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.ANY), -1) + else -> typeFactory.createType(column.type) + } + ) + } + + return typeFactory.createStructType(types, names) + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt new file mode 100644 index 00000000..f0e461e0 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.jdbc.CalciteConnection +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.opendc.trace.Trace +import java.nio.file.Paths +import java.sql.DriverManager +import java.sql.ResultSet +import java.sql.Timestamp +import java.util.* + +/** + * Smoke test for Apache Calcite integration. + */ +class CalciteTest { + /** + * The trace to experiment with. + */ + private val trace = Trace.open(Paths.get("src/test/resources/trace"), format = "opendc-vm") + + @Test + fun testResources() { + runQuery(trace, "SELECT * FROM trace.resources") { rs -> + assertAll( + { assertTrue(rs.next()) }, + { assertEquals("1019", rs.getString("id")) }, + { assertEquals(1, rs.getInt("cpu_count")) }, + { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("start_time")) }, + { assertEquals(181352.0, rs.getDouble("mem_capacity")) }, + { assertTrue(rs.next()) }, + { assertEquals("1023", rs.getString("id")) }, + { assertTrue(rs.next()) }, + { assertEquals("1052", rs.getString("id")) }, + { assertTrue(rs.next()) }, + { assertEquals("1073", rs.getString("id")) }, + { assertFalse(rs.next()) } + ) + } + } + + @Test + fun testResourceStates() { + runQuery(trace, "SELECT * FROM trace.resource_states") { rs -> + assertAll( + { assertTrue(rs.next()) }, + { assertEquals("1019", rs.getString("id")) }, + { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("timestamp")) }, + { assertEquals(300000, rs.getLong("duration")) }, + { assertEquals(0.0, rs.getDouble("cpu_usage")) }, + { assertTrue(rs.next()) }, + { assertEquals("1019", rs.getString("id")) }, + ) + } + } + + @Test + fun testInterferenceGroups() { + runQuery(trace, "SELECT * FROM trace.interference_groups") { rs -> + assertAll( + { assertTrue(rs.next()) }, + { assertArrayEquals(arrayOf("1019", "1023", "1052"), rs.getArray("members").array as Array<*>) }, + { assertEquals(0.0, rs.getDouble("target")) }, + { assertEquals(0.8830158730158756, rs.getDouble("score")) }, + ) + } + } + + @Test + fun testComplexQuery() { + runQuery(trace, "SELECT max(cpu_usage) as max_cpu_usage, avg(cpu_usage) as avg_cpu_usage FROM trace.resource_states") { rs -> + assertAll( + { assertTrue(rs.next()) }, + { assertEquals(249.59993808, rs.getDouble("max_cpu_usage")) }, + { assertEquals(5.387240309118493, rs.getDouble("avg_cpu_usage")) }, + ) + } + } + + /** + * Helper function to run statement for the specified trace. + */ + private fun runQuery(trace: Trace, query: String, block: (ResultSet) -> Unit) { + val info = Properties() + info.setProperty("lex", "JAVA") + val connection = DriverManager.getConnection("jdbc:calcite:", info).unwrap(CalciteConnection::class.java) + connection.rootSchema.add("trace", TraceSchema(trace)) + + val stmt = connection.createStatement() + val results = stmt.executeQuery(query) + try { + block(results) + } finally { + results.close() + stmt.close() + connection.close() + } + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt new file mode 100644 index 00000000..0a552e74 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import java.sql.DriverManager +import java.sql.Timestamp +import java.util.* + +/** + * Test suite for [TraceSchemaFactory]. + */ +class TraceSchemaFactoryTest { + @Test + fun testSmoke() { + val info = Properties() + info.setProperty("lex", "JAVA") + val connection = DriverManager.getConnection("jdbc:calcite:model=src/test/resources/model.json", info) + val stmt = connection.createStatement() + val rs = stmt.executeQuery("SELECT * FROM trace.resources") + try { + assertAll( + { assertTrue(rs.next()) }, + { assertEquals("1019", rs.getString("id")) }, + { assertEquals(1, rs.getInt("cpu_count")) }, + { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("start_time")) }, + { assertEquals(181352.0, rs.getDouble("mem_capacity")) }, + ) + } finally { + rs.close() + stmt.close() + connection.close() + } + } + + @Test + fun testWithoutParams() { + assertThrows<java.lang.RuntimeException> { + DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory") + } + } + + @Test + fun testWithoutPath() { + assertThrows<java.lang.RuntimeException> { + DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory; schema.format=opendc-vm") + } + } + + @Test + fun testWithoutFormat() { + assertThrows<java.lang.RuntimeException> { + DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory; schema.path=trace") + } + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json new file mode 100644 index 00000000..6a0616d9 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json @@ -0,0 +1,20 @@ +[ + { + "vms": [ + "1019", + "1023", + "1052" + ], + "minServerLoad": 0.0, + "performanceScore": 0.8830158730158756 + }, + { + "vms": [ + "1023", + "1052", + "1073" + ], + "minServerLoad": 0.0, + "performanceScore": 0.7133055555552751 + } +] diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet Binary files differnew file mode 100644 index 00000000..d8184945 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet Binary files differnew file mode 100644 index 00000000..00ab5835 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet |
