summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gradle/libs.versions.toml4
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt2
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt7
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt15
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt25
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt4
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt25
-rw-r--r--opendc-trace/opendc-trace-calcite/build.gradle.kts37
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt93
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt45
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt50
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt84
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt121
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt78
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json20
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquetbin0 -> 1679 bytes
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquetbin0 -> 65174 bytes
-rw-r--r--settings.gradle.kts1
18 files changed, 571 insertions, 40 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 43568067..458ac973 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -1,4 +1,5 @@
[versions]
+calcite = "1.30.0"
classgraph = "4.8.143"
clikt = "3.4.1"
config = "1.4.2"
@@ -101,6 +102,9 @@ quarkus-test-security = { module = "io.quarkus:quarkus-test-security" }
restassured-core = { module = "io.rest-assured:rest-assured" }
restassured-kotlin = { module = "io.rest-assured:kotlin-extensions" }
+# Calcite (SQL)
+calcite-core = { module = "org.apache.calcite:calcite-core", version.ref = "calcite" }
+
# Other
classgraph = { module = "io.github.classgraph:classgraph", version.ref = "classgraph" }
jakarta-validation = { module = "jakarta.validation:jakarta.validation-api", version.ref = "jakarta-validation" }
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt
index 776c40c0..b77a2982 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt
@@ -33,7 +33,7 @@ public class TableColumn<out T>(public val name: String, type: Class<T>) {
/**
* The type of the column.
*/
- private val type: Class<*> = type
+ public val type: Class<*> = type
/**
* Determine whether the type of the column is a subtype of [column].
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt
index 532f6d24..5e8859e4 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt
@@ -24,22 +24,21 @@
package org.opendc.trace.conv
import org.opendc.trace.TableColumn
-import org.opendc.trace.column
/**
* Members of the interference group.
*/
@JvmField
-public val INTERFERENCE_GROUP_MEMBERS: TableColumn<Set<String>> = column("interference_group:members")
+public val INTERFERENCE_GROUP_MEMBERS: TableColumn<Set<String>> = column("members")
/**
* Target load after which the interference occurs.
*/
@JvmField
-public val INTERFERENCE_GROUP_TARGET: TableColumn<Double> = column("interference_group:target")
+public val INTERFERENCE_GROUP_TARGET: TableColumn<Double> = column("target")
/**
* Performance score when the interference occurs.
*/
@JvmField
-public val INTERFERENCE_GROUP_SCORE: TableColumn<Double> = column("interference_group:score")
+public val INTERFERENCE_GROUP_SCORE: TableColumn<Double> = column("score")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
index e9fc5d44..e602e534 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
@@ -24,47 +24,46 @@
package org.opendc.trace.conv
import org.opendc.trace.TableColumn
-import org.opendc.trace.column
import java.time.Instant
/**
* Identifier of the resource.
*/
@JvmField
-public val RESOURCE_ID: TableColumn<String> = column("resource:id")
+public val RESOURCE_ID: TableColumn<String> = column("id")
/**
* The cluster to which the resource belongs.
*/
@JvmField
-public val RESOURCE_CLUSTER_ID: TableColumn<String> = column("resource:cluster_id")
+public val RESOURCE_CLUSTER_ID: TableColumn<String> = column("cluster_id")
/**
* Start time for the resource.
*/
@JvmField
-public val RESOURCE_START_TIME: TableColumn<Instant> = column("resource:start_time")
+public val RESOURCE_START_TIME: TableColumn<Instant> = column("start_time")
/**
* End time for the resource.
*/
@JvmField
-public val RESOURCE_STOP_TIME: TableColumn<Instant> = column("resource:stop_time")
+public val RESOURCE_STOP_TIME: TableColumn<Instant> = column("stop_time")
/**
* Number of CPUs for the resource.
*/
@JvmField
-public val RESOURCE_CPU_COUNT: TableColumn<Int> = column("resource:cpu_count")
+public val RESOURCE_CPU_COUNT: TableColumn<Int> = column("cpu_count")
/**
* Total CPU capacity of the resource in MHz.
*/
@JvmField
-public val RESOURCE_CPU_CAPACITY: TableColumn<Double> = column("resource:cpu_capacity")
+public val RESOURCE_CPU_CAPACITY: TableColumn<Double> = column("cpu_capacity")
/**
* Memory capacity for the resource in KB.
*/
@JvmField
-public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = column("resource:mem_capacity")
+public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = column("mem_capacity")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt
index d5bbafd7..3a44f817 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt
@@ -24,7 +24,6 @@
package org.opendc.trace.conv
import org.opendc.trace.TableColumn
-import org.opendc.trace.column
import java.time.Duration
import java.time.Instant
@@ -32,70 +31,70 @@ import java.time.Instant
* The timestamp at which the state was recorded.
*/
@JvmField
-public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = column("resource_state:timestamp")
+public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = column("timestamp")
/**
* Duration for the state.
*/
@JvmField
-public val RESOURCE_STATE_DURATION: TableColumn<Duration> = column("resource_state:duration")
+public val RESOURCE_STATE_DURATION: TableColumn<Duration> = column("duration")
/**
* A flag to indicate that the resource is powered on.
*/
@JvmField
-public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = column("resource_state:powered_on")
+public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = column("powered_on")
/**
* Total CPU usage of the resource in MHz.
*/
@JvmField
-public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = column("resource_state:cpu_usage")
+public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = column("cpu_usage")
/**
* Total CPU usage of the resource in percentage.
*/
@JvmField
-public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = column("resource_state:cpu_usage_pct")
+public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = column("cpu_usage_pct")
/**
* Total CPU demand of the resource in MHz.
*/
@JvmField
-public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = column("resource_state:cpu_demand")
+public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = column("cpu_demand")
/**
* CPU ready percentage.
*/
@JvmField
-public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = column("resource_state:cpu_ready_pct")
+public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = column("cpu_ready_pct")
/**
* Memory usage of the resource in KB.
*/
@JvmField
-public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = column("resource_state:mem_usage")
+public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = column("mem_usage")
/**
* Disk read throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = column("resource_state:disk_read")
+public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = column("disk_read")
/**
* Disk write throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = column("resource_state:disk_write")
+public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = column("disk_write")
/**
* Network receive throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_NET_RX: TableColumn<Double> = column("resource_state:net_rx")
+public val RESOURCE_STATE_NET_RX: TableColumn<Double> = column("net_rx")
/**
* Network transmit throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_NET_TX: TableColumn<Double> = column("resource_state:net_tx")
+public val RESOURCE_STATE_NET_TX: TableColumn<Double> = column("net_tx")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt
index 31a58360..a58505e9 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt
@@ -21,7 +21,9 @@
*/
@file:JvmName("TableColumns")
-package org.opendc.trace
+package org.opendc.trace.conv
+
+import org.opendc.trace.TableColumn
/**
* Construct a [TableColumn] with the specified [name] and type [T].
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
index 397c0794..e6daafb7 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
@@ -24,7 +24,6 @@
package org.opendc.trace.conv
import org.opendc.trace.TableColumn
-import org.opendc.trace.column
import java.time.Duration
import java.time.Instant
@@ -32,70 +31,70 @@ import java.time.Instant
* A column containing the task identifier.
*/
@JvmField
-public val TASK_ID: TableColumn<String> = column("task:id")
+public val TASK_ID: TableColumn<String> = column("id")
/**
* A column containing the identifier of the workflow.
*/
@JvmField
-public val TASK_WORKFLOW_ID: TableColumn<String> = column("task:workflow_id")
+public val TASK_WORKFLOW_ID: TableColumn<String> = column("workflow_id")
/**
* A column containing the submission time of the task.
*/
@JvmField
-public val TASK_SUBMIT_TIME: TableColumn<Instant> = column("task:submit_time")
+public val TASK_SUBMIT_TIME: TableColumn<Instant> = column("submit_time")
/**
* A column containing the wait time of the task.
*/
@JvmField
-public val TASK_WAIT_TIME: TableColumn<Instant> = column("task:wait_time")
+public val TASK_WAIT_TIME: TableColumn<Instant> = column("wait_time")
/**
* A column containing the runtime time of the task.
*/
@JvmField
-public val TASK_RUNTIME: TableColumn<Duration> = column("task:runtime")
+public val TASK_RUNTIME: TableColumn<Duration> = column("runtime")
/**
* A column containing the parents of a task.
*/
@JvmField
-public val TASK_PARENTS: TableColumn<Set<String>> = column("task:parents")
+public val TASK_PARENTS: TableColumn<Set<String>> = column("parents")
/**
* A column containing the children of a task.
*/
@JvmField
-public val TASK_CHILDREN: TableColumn<Set<String>> = column("task:children")
+public val TASK_CHILDREN: TableColumn<Set<String>> = column("children")
/**
* A column containing the requested CPUs of a task.
*/
@JvmField
-public val TASK_REQ_NCPUS: TableColumn<Int> = column("task:req_ncpus")
+public val TASK_REQ_NCPUS: TableColumn<Int> = column("req_ncpus")
/**
* A column containing the allocated CPUs of a task.
*/
@JvmField
-public val TASK_ALLOC_NCPUS: TableColumn<Int> = column("task:alloc_ncpus")
+public val TASK_ALLOC_NCPUS: TableColumn<Int> = column("alloc_ncpus")
/**
* A column containing the status of a task.
*/
@JvmField
-public val TASK_STATUS: TableColumn<Int> = column("task:status")
+public val TASK_STATUS: TableColumn<Int> = column("status")
/**
* A column containing the group id of a task.
*/
@JvmField
-public val TASK_GROUP_ID: TableColumn<Int> = column("task:group_id")
+public val TASK_GROUP_ID: TableColumn<Int> = column("group_id")
/**
* A column containing the user id of a task.
*/
@JvmField
-public val TASK_USER_ID: TableColumn<Int> = column("task:user_id")
+public val TASK_USER_ID: TableColumn<Int> = column("user_id")
diff --git a/opendc-trace/opendc-trace-calcite/build.gradle.kts b/opendc-trace/opendc-trace-calcite/build.gradle.kts
new file mode 100644
index 00000000..2ffdac3c
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/build.gradle.kts
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Apache Calcite (SQL) integration for the OpenDC trace library"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+}
+
+dependencies {
+ api(projects.opendcTrace.opendcTraceApi)
+
+ api(libs.calcite.core)
+
+ testRuntimeOnly(projects.opendcTrace.opendcTraceOpendc)
+ testRuntimeOnly(libs.slf4j.simple)
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
new file mode 100644
index 00000000..1854f262
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.linq4j.Enumerator
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableReader
+import java.sql.Timestamp
+import java.time.Duration
+import java.time.Instant
+import java.util.concurrent.atomic.AtomicBoolean
+
+/**
+ * An [Enumerator] for a [TableReader].
+ */
+internal class TraceReaderEnumerator<E>(
+ private val reader: TableReader,
+ private val columns: List<TableColumn<*>>,
+ private val cancelFlag: AtomicBoolean
+) : Enumerator<E> {
+ private val columnIndices = columns.map { reader.resolve(it) }.toIntArray()
+ private var current: E? = null
+
+ override fun moveNext(): Boolean {
+ if (cancelFlag.get()) {
+ return false
+ }
+
+ val reader = reader
+ val res = reader.nextRow()
+
+ if (res) {
+ @Suppress("UNCHECKED_CAST")
+ current = convertRow(reader) as E
+ } else {
+ current = null
+ }
+
+ return res
+ }
+
+ override fun current(): E = checkNotNull(current)
+
+ override fun reset() {
+ throw UnsupportedOperationException()
+ }
+
+ override fun close() {
+ reader.close()
+ }
+
+ private fun convertRow(reader: TableReader): Array<Any?> {
+ val res = arrayOfNulls<Any?>(columns.size)
+ val columnIndices = columnIndices
+
+ for ((index, column) in columns.withIndex()) {
+ val columnIndex = columnIndices[index]
+ res[index] = convertColumn(reader, column, columnIndex)
+ }
+ return res
+ }
+
+ private fun convertColumn(reader: TableReader, column: TableColumn<*>, columnIndex: Int): Any? {
+ val value = reader.get(columnIndex)
+
+ return when (column.type) {
+ Instant::class.java -> Timestamp.from(value as Instant)
+ Duration::class.java -> (value as Duration).toMillis()
+ Set::class.java -> (value as Set<*>).toTypedArray()
+ else -> value
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt
new file mode 100644
index 00000000..298a59dc
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.schema.Schema
+import org.apache.calcite.schema.Table
+import org.apache.calcite.schema.impl.AbstractSchema
+import org.opendc.trace.Trace
+
+/**
+ * A Calcite [Schema] that exposes an OpenDC [Trace] into multiple SQL tables.
+ *
+ * @param trace The [Trace] to create a schema for.
+ */
+public class TraceSchema(private val trace: Trace) : AbstractSchema() {
+
+ private val tables: Map<String, TraceTable> by lazy {
+ trace.tables.associateWith {
+ val table = checkNotNull(trace.getTable(it)) { "Unexpected null table" }
+ TraceTable(table)
+ }
+ }
+
+ override fun getTableMap(): Map<String, Table> = tables
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt
new file mode 100644
index 00000000..3c6badc8
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.model.ModelHandler
+import org.apache.calcite.schema.Schema
+import org.apache.calcite.schema.SchemaFactory
+import org.apache.calcite.schema.SchemaPlus
+import org.opendc.trace.Trace
+import java.io.File
+import java.nio.file.Paths
+
+/**
+ * Factory that creates a [TraceSchema].
+ *
+ * This factory allows users to include a schema that references a trace in a `model.json` file.
+ */
+public class TraceSchemaFactory : SchemaFactory {
+ override fun create(parentSchema: SchemaPlus, name: String, operand: Map<String, Any>): Schema {
+ val base = operand[ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName] as File?
+ val pathParam = requireNotNull(operand["path"]) { "Trace path not specified" } as String
+ val path = if (base != null) File(base, pathParam).toPath() else Paths.get(pathParam)
+
+ val format = requireNotNull(operand["format"]) { "Trace format not specified" } as String
+ val create = operand.getOrDefault("create", false) as Boolean
+
+ val trace = if (create) Trace.create(path, format) else Trace.open(path, format)
+ return TraceSchema(trace)
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
new file mode 100644
index 00000000..8c3fe4e2
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.DataContext
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.linq4j.AbstractEnumerable
+import org.apache.calcite.linq4j.Enumerable
+import org.apache.calcite.linq4j.Enumerator
+import org.apache.calcite.rel.type.RelDataType
+import org.apache.calcite.rel.type.RelDataTypeFactory
+import org.apache.calcite.schema.ScannableTable
+import org.apache.calcite.schema.Table
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.sql.type.SqlTypeName
+import java.time.Duration
+import java.time.Instant
+import java.util.concurrent.atomic.AtomicBoolean
+
+/**
+ * A Calcite [Table] that exposes an OpenDC [org.opendc.trace.Table] as SQL table.
+ */
+internal class TraceTable(private val table: org.opendc.trace.Table) : AbstractTable(), ScannableTable {
+
+ private var rowType: RelDataType? = null
+
+ override fun getRowType(typeFactory: RelDataTypeFactory): RelDataType {
+ var rowType = rowType
+ if (rowType == null) {
+ rowType = deduceRowType(typeFactory as JavaTypeFactory)
+ this.rowType = rowType
+ }
+
+ return rowType
+ }
+
+ override fun scan(root: DataContext): Enumerable<Array<Any?>> {
+ val cancelFlag = DataContext.Variable.CANCEL_FLAG.get<AtomicBoolean>(root)
+ return object : AbstractEnumerable<Array<Any?>>() {
+ override fun enumerator(): Enumerator<Array<Any?>> = TraceReaderEnumerator(table.newReader(), table.columns, cancelFlag)
+ }
+ }
+
+ override fun toString(): String = "TraceTable"
+
+ private fun deduceRowType(typeFactory: JavaTypeFactory): RelDataType {
+ val types = mutableListOf<RelDataType>()
+ val names = mutableListOf<String>()
+
+ for (column in table.columns) {
+ names.add(column.name)
+ types.add(
+ when (column.type) {
+ Instant::class.java -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP)
+ Duration::class.java -> typeFactory.createSqlType(SqlTypeName.BIGINT)
+ Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.ANY), -1)
+ else -> typeFactory.createType(column.type)
+ }
+ )
+ }
+
+ return typeFactory.createStructType(types, names)
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt
new file mode 100644
index 00000000..f0e461e0
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.jdbc.CalciteConnection
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.opendc.trace.Trace
+import java.nio.file.Paths
+import java.sql.DriverManager
+import java.sql.ResultSet
+import java.sql.Timestamp
+import java.util.*
+
+/**
+ * Smoke test for Apache Calcite integration.
+ */
+class CalciteTest {
+ /**
+ * The trace to experiment with.
+ */
+ private val trace = Trace.open(Paths.get("src/test/resources/trace"), format = "opendc-vm")
+
+ @Test
+ fun testResources() {
+ runQuery(trace, "SELECT * FROM trace.resources") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertEquals("1019", rs.getString("id")) },
+ { assertEquals(1, rs.getInt("cpu_count")) },
+ { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("start_time")) },
+ { assertEquals(181352.0, rs.getDouble("mem_capacity")) },
+ { assertTrue(rs.next()) },
+ { assertEquals("1023", rs.getString("id")) },
+ { assertTrue(rs.next()) },
+ { assertEquals("1052", rs.getString("id")) },
+ { assertTrue(rs.next()) },
+ { assertEquals("1073", rs.getString("id")) },
+ { assertFalse(rs.next()) }
+ )
+ }
+ }
+
+ @Test
+ fun testResourceStates() {
+ runQuery(trace, "SELECT * FROM trace.resource_states") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertEquals("1019", rs.getString("id")) },
+ { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("timestamp")) },
+ { assertEquals(300000, rs.getLong("duration")) },
+ { assertEquals(0.0, rs.getDouble("cpu_usage")) },
+ { assertTrue(rs.next()) },
+ { assertEquals("1019", rs.getString("id")) },
+ )
+ }
+ }
+
+ @Test
+ fun testInterferenceGroups() {
+ runQuery(trace, "SELECT * FROM trace.interference_groups") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertArrayEquals(arrayOf("1019", "1023", "1052"), rs.getArray("members").array as Array<*>) },
+ { assertEquals(0.0, rs.getDouble("target")) },
+ { assertEquals(0.8830158730158756, rs.getDouble("score")) },
+ )
+ }
+ }
+
+ @Test
+ fun testComplexQuery() {
+ runQuery(trace, "SELECT max(cpu_usage) as max_cpu_usage, avg(cpu_usage) as avg_cpu_usage FROM trace.resource_states") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertEquals(249.59993808, rs.getDouble("max_cpu_usage")) },
+ { assertEquals(5.387240309118493, rs.getDouble("avg_cpu_usage")) },
+ )
+ }
+ }
+
+ /**
+ * Helper function to run statement for the specified trace.
+ */
+ private fun runQuery(trace: Trace, query: String, block: (ResultSet) -> Unit) {
+ val info = Properties()
+ info.setProperty("lex", "JAVA")
+ val connection = DriverManager.getConnection("jdbc:calcite:", info).unwrap(CalciteConnection::class.java)
+ connection.rootSchema.add("trace", TraceSchema(trace))
+
+ val stmt = connection.createStatement()
+ val results = stmt.executeQuery(query)
+ try {
+ block(results)
+ } finally {
+ results.close()
+ stmt.close()
+ connection.close()
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt
new file mode 100644
index 00000000..0a552e74
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import java.sql.DriverManager
+import java.sql.Timestamp
+import java.util.*
+
+/**
+ * Test suite for [TraceSchemaFactory].
+ */
+class TraceSchemaFactoryTest {
+ @Test
+ fun testSmoke() {
+ val info = Properties()
+ info.setProperty("lex", "JAVA")
+ val connection = DriverManager.getConnection("jdbc:calcite:model=src/test/resources/model.json", info)
+ val stmt = connection.createStatement()
+ val rs = stmt.executeQuery("SELECT * FROM trace.resources")
+ try {
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertEquals("1019", rs.getString("id")) },
+ { assertEquals(1, rs.getInt("cpu_count")) },
+ { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("start_time")) },
+ { assertEquals(181352.0, rs.getDouble("mem_capacity")) },
+ )
+ } finally {
+ rs.close()
+ stmt.close()
+ connection.close()
+ }
+ }
+
+ @Test
+ fun testWithoutParams() {
+ assertThrows<java.lang.RuntimeException> {
+ DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory")
+ }
+ }
+
+ @Test
+ fun testWithoutPath() {
+ assertThrows<java.lang.RuntimeException> {
+ DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory; schema.format=opendc-vm")
+ }
+ }
+
+ @Test
+ fun testWithoutFormat() {
+ assertThrows<java.lang.RuntimeException> {
+ DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory; schema.path=trace")
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json
new file mode 100644
index 00000000..6a0616d9
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json
@@ -0,0 +1,20 @@
+[
+ {
+ "vms": [
+ "1019",
+ "1023",
+ "1052"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.8830158730158756
+ },
+ {
+ "vms": [
+ "1023",
+ "1052",
+ "1073"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.7133055555552751
+ }
+]
diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet
new file mode 100644
index 00000000..d8184945
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet
new file mode 100644
index 00000000..00ab5835
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet
Binary files differ
diff --git a/settings.gradle.kts b/settings.gradle.kts
index cc454b19..a779edcc 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -59,6 +59,7 @@ include(":opendc-trace:opendc-trace-bitbrains")
include(":opendc-trace:opendc-trace-azure")
include(":opendc-trace:opendc-trace-opendc")
include(":opendc-trace:opendc-trace-parquet")
+include(":opendc-trace:opendc-trace-calcite")
include(":opendc-trace:opendc-trace-tools")
include(":opendc-harness:opendc-harness-api")
include(":opendc-harness:opendc-harness-engine")