summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-05-02 16:06:44 +0200
committerGitHub <noreply@github.com>2022-05-02 16:06:44 +0200
commitc78285f6346236053979aa98113ba9e6d7efb21e (patch)
tree44221b3a39516a235a0b41adf525a79a60abb998
parent44ddd27a745f2dfe4b6ffef1b7657d156dd61489 (diff)
parente4d3a8add5388182cf7a12b1099678a0b769b106 (diff)
merge: Add support for SQL via Apache Calcite (#78)
This pull request integrates initial support for SQL queries via Apache Calcite into the OpenDC codebase. Our vision is that users of OpenDC should be able to use SQL queries to access and process most of the experiment data generated by simulations. This pull request moves towards this goal by adding the ability to query workload traces supported by OpenDC using SQL. We also provide a CLI for querying the data in workload traces via `opendc-trace-tools`: ```bash opendc-trace-tools query -i data/bitbrains-small -f opendc-vm "SELECT MAX(cpu_count) FROM resource_states" ``` ## Implementation Notes :hammer_and_pick: * Add Calcite (SQL) integration * Add support for writing via SQL * Add support for writing via SQL * Support custom Parquet ReadSupport implementations * Read records using low-level Parquet API * Do not use Avro when exporting experiment data * Do not use Avro when reading WTF trace * Drop dependency on Avro * Add support for projections ## External Dependencies :four_leaf_clover: * Apache Calcite ## Breaking API Changes :warning: * The existing code for reading Parquet traces using Apache Avro has been removed. * `TraceFormat.newReader` now accepts a nullable `projection` parameter
-rw-r--r--gradle/libs.versions.toml8
-rw-r--r--opendc-compute/opendc-compute-workload/build.gradle.kts2
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt32
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt214
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt196
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt121
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt38
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt79
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt73
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt67
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt6
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt2
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt7
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt15
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt25
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt4
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt25
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt4
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt4
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt4
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt2
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt4
-rw-r--r--opendc-trace/opendc-trace-calcite/build.gradle.kts37
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt39
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt93
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt47
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt50
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt176
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt138
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt65
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt158
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt78
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/resources/model.json15
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json20
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquetbin0 -> 1679 bytes
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquetbin0 -> 65174 bytes
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt6
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt74
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt124
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt65
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt117
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt67
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt37
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt147
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt107
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt34
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt139
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt102
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt105
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt114
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt77
-rw-r--r--opendc-trace/opendc-trace-parquet/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt36
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt55
-rw-r--r--opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt118
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt2
-rw-r--r--opendc-trace/opendc-trace-tools/build.gradle.kts8
-rw-r--r--opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt (renamed from opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt)12
-rw-r--r--opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt159
-rw-r--r--opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceTools.kt (renamed from opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt)28
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt4
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt81
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt6
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt42
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt134
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt165
-rw-r--r--opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt2
-rw-r--r--settings.gradle.kts1
74 files changed, 3426 insertions, 604 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 43568067..b05af368 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -1,4 +1,5 @@
[versions]
+calcite = "1.30.0"
classgraph = "4.8.143"
clikt = "3.4.1"
config = "1.4.2"
@@ -8,6 +9,7 @@ gradle-node = "3.2.1"
hadoop = "3.3.1"
jackson = "2.13.2"
jandex-gradle = "0.12.0"
+jline = "3.21.0"
jmh-gradle = "0.6.6"
jakarta-validation = "2.0.2"
junit-jupiter = "5.8.2"
@@ -68,7 +70,7 @@ jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", ver
jackson-module-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" }
jackson-datatype-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" }
jackson-dataformat-csv = { module = "com.fasterxml.jackson.dataformat:jackson-dataformat-csv", version.ref = "jackson" }
-parquet = { module = "org.apache.parquet:parquet-avro", version.ref = "parquet" }
+parquet = { module = "org.apache.parquet:parquet-hadoop", version.ref = "parquet" }
config = { module = "com.typesafe:config", version.ref = "config" }
# Quarkus
@@ -101,6 +103,10 @@ quarkus-test-security = { module = "io.quarkus:quarkus-test-security" }
restassured-core = { module = "io.rest-assured:rest-assured" }
restassured-kotlin = { module = "io.rest-assured:kotlin-extensions" }
+# Calcite (SQL)
+calcite-core = { module = "org.apache.calcite:calcite-core", version.ref = "calcite" }
+jline = { module = "org.jline:jline", version.ref = "jline" }
+
# Other
classgraph = { module = "io.github.classgraph:classgraph", version.ref = "classgraph" }
jakarta-validation = { module = "jakarta.validation:jakarta.validation-api", version.ref = "jakarta-validation" }
diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts
index 9ced95a7..319b2ae3 100644
--- a/opendc-compute/opendc-compute-workload/build.gradle.kts
+++ b/opendc-compute/opendc-compute-workload/build.gradle.kts
@@ -39,4 +39,6 @@ dependencies {
implementation(libs.opentelemetry.semconv)
implementation(libs.kotlin.logging)
+
+ testImplementation(libs.slf4j.simple)
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
index 84387bbc..c854d874 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
@@ -23,14 +23,12 @@
package org.opendc.compute.workload.export.parquet
import mu.KotlinLogging
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.trace.util.parquet.LocalOutputFile
+import org.opendc.trace.util.parquet.LocalParquetWriter
import java.io.File
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.BlockingQueue
@@ -38,10 +36,13 @@ import kotlin.concurrent.thread
/**
* A writer that writes data in Parquet format.
+ *
+ * @param path The path to the file to write the data to.
+ * @param writeSupport The [WriteSupport] implementation for converting the records to Parquet format.
*/
public abstract class ParquetDataWriter<in T>(
path: File,
- private val schema: Schema,
+ private val writeSupport: WriteSupport<T>,
bufferSize: Int = 4096
) : AutoCloseable {
/**
@@ -52,7 +53,7 @@ public abstract class ParquetDataWriter<in T>(
/**
* The queue of records to process.
*/
- private val queue: BlockingQueue<GenericData.Record> = ArrayBlockingQueue(bufferSize)
+ private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize)
/**
* An exception to be propagated to the actual writer.
@@ -64,15 +65,15 @@ public abstract class ParquetDataWriter<in T>(
*/
private val writerThread = thread(start = false, name = this.toString()) {
val writer = let {
- val builder = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path))
- .withSchema(schema)
+ val builder = LocalParquetWriter.builder(path.toPath(), writeSupport)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.withCompressionCodec(CompressionCodecName.ZSTD)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
buildWriter(builder)
}
val queue = queue
- val buf = mutableListOf<GenericData.Record>()
+ val buf = mutableListOf<T>()
var shouldStop = false
try {
@@ -101,16 +102,11 @@ public abstract class ParquetDataWriter<in T>(
/**
* Build the [ParquetWriter] used to write the Parquet files.
*/
- protected open fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ protected open fun buildWriter(builder: LocalParquetWriter.Builder<@UnsafeVariance T>): ParquetWriter<@UnsafeVariance T> {
return builder.build()
}
/**
- * Convert the specified [data] into a Parquet record.
- */
- protected abstract fun convert(builder: GenericRecordBuilder, data: T)
-
- /**
* Write the specified metrics to the database.
*/
public fun write(data: T) {
@@ -119,9 +115,7 @@ public abstract class ParquetDataWriter<in T>(
throw IllegalStateException("Writer thread failed", exception)
}
- val builder = GenericRecordBuilder(schema)
- convert(builder, data)
- queue.put(builder.build())
+ queue.put(data)
}
/**
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
index 2b7cac8f..0d5b6b34 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
@@ -22,81 +22,189 @@
package org.opendc.compute.workload.export.parquet
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
import org.opendc.telemetry.compute.table.HostTableReader
-import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
-import org.opendc.trace.util.parquet.UUID_SCHEMA
-import org.opendc.trace.util.parquet.optional
+import org.opendc.trace.util.parquet.LocalParquetWriter
import java.io.File
+import java.util.*
/**
* A Parquet event writer for [HostTableReader]s.
*/
public class ParquetHostDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<HostTableReader>(path, SCHEMA, bufferSize) {
+ ParquetDataWriter<HostTableReader>(path, HostDataWriteSupport(), bufferSize) {
- override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ override fun buildWriter(builder: LocalParquetWriter.Builder<HostTableReader>): ParquetWriter<HostTableReader> {
return builder
.withDictionaryEncoding("host_id", true)
.build()
}
- override fun convert(builder: GenericRecordBuilder, data: HostTableReader) {
- builder["timestamp"] = data.timestamp.toEpochMilli()
+ override fun toString(): String = "host-writer"
- builder["host_id"] = data.host.id
+ /**
+ * A [WriteSupport] implementation for a [HostTableReader].
+ */
+ private class HostDataWriteSupport : WriteSupport<HostTableReader>() {
+ lateinit var recordConsumer: RecordConsumer
- builder["uptime"] = data.uptime
- builder["downtime"] = data.downtime
- val bootTime = data.bootTime
- builder["boot_time"] = bootTime?.toEpochMilli()
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(SCHEMA, emptyMap())
+ }
- builder["cpu_count"] = data.host.cpuCount
- builder["cpu_limit"] = data.cpuLimit
- builder["cpu_time_active"] = data.cpuActiveTime
- builder["cpu_time_idle"] = data.cpuIdleTime
- builder["cpu_time_steal"] = data.cpuStealTime
- builder["cpu_time_lost"] = data.cpuLostTime
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
- builder["mem_limit"] = data.host.memCapacity
+ override fun write(record: HostTableReader) {
+ write(recordConsumer, record)
+ }
- builder["power_total"] = data.powerTotal
+ private fun write(consumer: RecordConsumer, data: HostTableReader) {
+ consumer.startMessage()
- builder["guests_terminated"] = data.guestsTerminated
- builder["guests_running"] = data.guestsRunning
- builder["guests_error"] = data.guestsError
- builder["guests_invalid"] = data.guestsInvalid
- }
+ consumer.startField("timestamp", 0)
+ consumer.addLong(data.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 0)
- override fun toString(): String = "host-writer"
+ consumer.startField("host_id", 1)
+ consumer.addBinary(UUID.fromString(data.host.id).toBinary())
+ consumer.endField("host_id", 1)
+
+ consumer.startField("uptime", 2)
+ consumer.addLong(data.uptime)
+ consumer.endField("uptime", 2)
+
+ consumer.startField("downtime", 3)
+ consumer.addLong(data.downtime)
+ consumer.endField("downtime", 3)
+
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ consumer.startField("boot_time", 4)
+ consumer.addLong(bootTime.toEpochMilli())
+ consumer.endField("boot_time", 4)
+ }
+
+ consumer.startField("cpu_count", 5)
+ consumer.addInteger(data.host.cpuCount)
+ consumer.endField("cpu_count", 5)
+
+ consumer.startField("cpu_limit", 6)
+ consumer.addDouble(data.cpuLimit)
+ consumer.endField("cpu_limit", 6)
+
+ consumer.startField("cpu_time_active", 7)
+ consumer.addLong(data.cpuActiveTime)
+ consumer.endField("cpu_time_active", 7)
+
+ consumer.startField("cpu_time_idle", 8)
+ consumer.addLong(data.cpuIdleTime)
+ consumer.endField("cpu_time_idle", 8)
+
+ consumer.startField("cpu_time_steal", 9)
+ consumer.addLong(data.cpuStealTime)
+ consumer.endField("cpu_time_steal", 9)
+
+ consumer.startField("cpu_time_lost", 10)
+ consumer.addLong(data.cpuLostTime)
+ consumer.endField("cpu_time_lost", 10)
+
+ consumer.startField("mem_limit", 11)
+ consumer.addLong(data.host.memCapacity)
+ consumer.endField("mem_limit", 11)
+
+ consumer.startField("power_total", 12)
+ consumer.addDouble(data.powerTotal)
+ consumer.endField("power_total", 12)
+
+ consumer.startField("guests_terminated", 13)
+ consumer.addInteger(data.guestsTerminated)
+ consumer.endField("guests_terminated", 13)
+
+ consumer.startField("guests_running", 14)
+ consumer.addInteger(data.guestsRunning)
+ consumer.endField("guests_running", 14)
+
+ consumer.startField("guests_error", 15)
+ consumer.addInteger(data.guestsError)
+ consumer.endField("guests_error", 15)
+
+ consumer.startField("guests_invalid", 16)
+ consumer.addInteger(data.guestsInvalid)
+ consumer.endField("guests_invalid", 16)
+
+ consumer.endMessage()
+ }
+ }
private companion object {
- private val SCHEMA: Schema = SchemaBuilder
- .record("host")
- .namespace("org.opendc.telemetry.compute")
- .fields()
- .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
- .name("host_id").type(UUID_SCHEMA).noDefault()
- .requiredLong("uptime")
- .requiredLong("downtime")
- .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
- .requiredInt("cpu_count")
- .requiredDouble("cpu_limit")
- .requiredLong("cpu_time_active")
- .requiredLong("cpu_time_idle")
- .requiredLong("cpu_time_steal")
- .requiredLong("cpu_time_lost")
- .requiredLong("mem_limit")
- .requiredDouble("power_total")
- .requiredInt("guests_terminated")
- .requiredInt("guests_running")
- .requiredInt("guests_error")
- .requiredInt("guests_invalid")
- .endRecord()
+ /**
+ * The schema of the host data.
+ */
+ val SCHEMA: MessageType = Types
+ .buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+ .length(16)
+ .`as`(LogicalTypeAnnotation.uuidType())
+ .named("host_id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("uptime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("downtime"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("boot_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_limit"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_active"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_idle"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_steal"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_lost"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_limit"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("power_total"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_terminated"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_running"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_error"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_invalid"),
+ )
+ .named("host")
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
index 144b6624..5d11629b 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
@@ -22,73 +22,177 @@
package org.opendc.compute.workload.export.parquet
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
import org.opendc.telemetry.compute.table.ServerTableReader
-import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
-import org.opendc.trace.util.parquet.UUID_SCHEMA
-import org.opendc.trace.util.parquet.optional
+import org.opendc.trace.util.parquet.LocalParquetWriter
import java.io.File
+import java.util.*
/**
* A Parquet event writer for [ServerTableReader]s.
*/
public class ParquetServerDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServerTableReader>(path, SCHEMA, bufferSize) {
+ ParquetDataWriter<ServerTableReader>(path, ServerDataWriteSupport(), bufferSize) {
- override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ override fun buildWriter(builder: LocalParquetWriter.Builder<ServerTableReader>): ParquetWriter<ServerTableReader> {
return builder
.withDictionaryEncoding("server_id", true)
.withDictionaryEncoding("host_id", true)
.build()
}
- override fun convert(builder: GenericRecordBuilder, data: ServerTableReader) {
- builder["timestamp"] = data.timestamp.toEpochMilli()
+ override fun toString(): String = "server-writer"
- builder["server_id"] = data.server.id
- builder["host_id"] = data.host?.id
+ /**
+ * A [WriteSupport] implementation for a [ServerTableReader].
+ */
+ private class ServerDataWriteSupport : WriteSupport<ServerTableReader>() {
+ lateinit var recordConsumer: RecordConsumer
- builder["uptime"] = data.uptime
- builder["downtime"] = data.downtime
- builder["boot_time"] = data.bootTime?.toEpochMilli()
- builder["provision_time"] = data.provisionTime?.toEpochMilli()
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(SCHEMA, emptyMap())
+ }
- builder["cpu_count"] = data.server.cpuCount
- builder["cpu_limit"] = data.cpuLimit
- builder["cpu_time_active"] = data.cpuActiveTime
- builder["cpu_time_idle"] = data.cpuIdleTime
- builder["cpu_time_steal"] = data.cpuStealTime
- builder["cpu_time_lost"] = data.cpuLostTime
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
- builder["mem_limit"] = data.server.memCapacity
- }
+ override fun write(record: ServerTableReader) {
+ write(recordConsumer, record)
+ }
- override fun toString(): String = "server-writer"
+ private fun write(consumer: RecordConsumer, data: ServerTableReader) {
+ consumer.startMessage()
+
+ consumer.startField("timestamp", 0)
+ consumer.addLong(data.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 0)
+
+ consumer.startField("server_id", 1)
+ consumer.addBinary(UUID.fromString(data.server.id).toBinary())
+ consumer.endField("server_id", 1)
+
+ val hostId = data.host?.id
+ if (hostId != null) {
+ consumer.startField("host_id", 2)
+ consumer.addBinary(UUID.fromString(hostId).toBinary())
+ consumer.endField("host_id", 2)
+ }
+
+ consumer.startField("uptime", 3)
+ consumer.addLong(data.uptime)
+ consumer.endField("uptime", 3)
+
+ consumer.startField("downtime", 4)
+ consumer.addLong(data.downtime)
+ consumer.endField("downtime", 4)
+
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ consumer.startField("boot_time", 5)
+ consumer.addLong(bootTime.toEpochMilli())
+ consumer.endField("boot_time", 5)
+ }
+
+ val provisionTime = data.provisionTime
+ if (provisionTime != null) {
+ consumer.startField("provision_time", 6)
+ consumer.addLong(provisionTime.toEpochMilli())
+ consumer.endField("provision_time", 6)
+ }
+
+ consumer.startField("cpu_count", 7)
+ consumer.addInteger(data.server.cpuCount)
+ consumer.endField("cpu_count", 7)
+
+ consumer.startField("cpu_limit", 8)
+ consumer.addDouble(data.cpuLimit)
+ consumer.endField("cpu_limit", 8)
+
+ consumer.startField("cpu_time_active", 9)
+ consumer.addLong(data.cpuActiveTime)
+ consumer.endField("cpu_time_active", 9)
+
+ consumer.startField("cpu_time_idle", 10)
+ consumer.addLong(data.cpuIdleTime)
+ consumer.endField("cpu_time_idle", 10)
+
+ consumer.startField("cpu_time_steal", 11)
+ consumer.addLong(data.cpuStealTime)
+ consumer.endField("cpu_time_steal", 11)
+
+ consumer.startField("cpu_time_lost", 12)
+ consumer.addLong(data.cpuLostTime)
+ consumer.endField("cpu_time_lost", 12)
+
+ consumer.startField("mem_limit", 13)
+ consumer.addLong(data.server.memCapacity)
+ consumer.endField("mem_limit", 13)
+
+ consumer.endMessage()
+ }
+ }
private companion object {
- private val SCHEMA: Schema = SchemaBuilder
- .record("server")
- .namespace("org.opendc.telemetry.compute")
- .fields()
- .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
- .name("server_id").type(UUID_SCHEMA).noDefault()
- .name("host_id").type(UUID_SCHEMA.optional()).noDefault()
- .requiredLong("uptime")
- .requiredLong("downtime")
- .name("provision_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
- .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
- .requiredInt("cpu_count")
- .requiredDouble("cpu_limit")
- .requiredLong("cpu_time_active")
- .requiredLong("cpu_time_idle")
- .requiredLong("cpu_time_steal")
- .requiredLong("cpu_time_lost")
- .requiredLong("mem_limit")
- .endRecord()
+ /**
+ * The schema of the server data.
+ */
+ val SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+ .length(16)
+ .`as`(LogicalTypeAnnotation.uuidType())
+ .named("server_id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+ .length(16)
+ .`as`(LogicalTypeAnnotation.uuidType())
+ .named("host_id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("uptime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("downtime"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("provision_time"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("boot_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_limit"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_active"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_idle"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_steal"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_lost"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_limit")
+ )
+ .named("server")
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
index ec8a2b65..5ad3b95e 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
@@ -22,45 +22,108 @@
package org.opendc.compute.workload.export.parquet
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericRecordBuilder
+import io.opentelemetry.context.ContextKey.named
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
import org.opendc.telemetry.compute.table.ServiceTableReader
-import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
import java.io.File
/**
* A Parquet event writer for [ServiceTableReader]s.
*/
public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServiceTableReader>(path, SCHEMA, bufferSize) {
-
- override fun convert(builder: GenericRecordBuilder, data: ServiceTableReader) {
- builder["timestamp"] = data.timestamp.toEpochMilli()
- builder["hosts_up"] = data.hostsUp
- builder["hosts_down"] = data.hostsDown
- builder["servers_pending"] = data.serversPending
- builder["servers_active"] = data.serversActive
- builder["attempts_success"] = data.attemptsSuccess
- builder["attempts_failure"] = data.attemptsFailure
- builder["attempts_error"] = data.attemptsError
- }
+ ParquetDataWriter<ServiceTableReader>(path, ServiceDataWriteSupport(), bufferSize) {
override fun toString(): String = "service-writer"
+ /**
+ * A [WriteSupport] implementation for a [ServiceTableReader].
+ */
+ private class ServiceDataWriteSupport : WriteSupport<ServiceTableReader>() {
+ lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(SCHEMA, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: ServiceTableReader) {
+ write(recordConsumer, record)
+ }
+
+ private fun write(consumer: RecordConsumer, data: ServiceTableReader) {
+ consumer.startMessage()
+
+ consumer.startField("timestamp", 0)
+ consumer.addLong(data.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 0)
+
+ consumer.startField("hosts_up", 1)
+ consumer.addInteger(data.hostsUp)
+ consumer.endField("hosts_up", 1)
+
+ consumer.startField("hosts_down", 2)
+ consumer.addInteger(data.hostsDown)
+ consumer.endField("hosts_down", 2)
+
+ consumer.startField("servers_pending", 3)
+ consumer.addInteger(data.serversPending)
+ consumer.endField("servers_pending", 3)
+
+ consumer.startField("servers_active", 4)
+ consumer.addInteger(data.serversActive)
+ consumer.endField("servers_active", 4)
+
+ consumer.startField("attempts_success", 5)
+ consumer.addInteger(data.attemptsSuccess)
+ consumer.endField("attempts_pending", 5)
+
+ consumer.startField("attempts_failure", 6)
+ consumer.addInteger(data.attemptsFailure)
+ consumer.endField("attempts_failure", 6)
+
+ consumer.startField("attempts_error", 7)
+ consumer.addInteger(data.attemptsError)
+ consumer.endField("attempts_error", 7)
+
+ consumer.endMessage()
+ }
+ }
+
private companion object {
- private val SCHEMA: Schema = SchemaBuilder
- .record("service")
- .namespace("org.opendc.telemetry.compute")
- .fields()
- .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
- .requiredInt("hosts_up")
- .requiredInt("hosts_down")
- .requiredInt("servers_pending")
- .requiredInt("servers_active")
- .requiredInt("attempts_success")
- .requiredInt("attempts_failure")
- .requiredInt("attempts_error")
- .endRecord()
+ private val SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("hosts_up"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("hosts_down"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("servers_pending"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("servers_active"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("attempts_success"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("attempts_failure"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("attempts_error"),
+ )
+ .named("service")
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt
new file mode 100644
index 00000000..9921f5b8
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import org.apache.parquet.io.api.Binary
+import java.nio.ByteBuffer
+import java.util.UUID
+
+/**
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+internal fun UUID.toBinary(): Binary {
+ val bb = ByteBuffer.wrap(ByteArray(16))
+ bb.putLong(mostSignificantBits)
+ bb.putLong(leastSignificantBits)
+ return Binary.fromConstantByteBuffer(bb)
+}
diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt
new file mode 100644
index 00000000..dae03513
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.opendc.telemetry.compute.table.HostInfo
+import org.opendc.telemetry.compute.table.HostTableReader
+import java.nio.file.Files
+import java.time.Instant
+
+/**
+ * Test suite for [ParquetHostDataWriter]
+ */
+class HostDataWriterTest {
+ /**
+ * The path to write the data file to.
+ */
+ private val path = Files.createTempFile("opendc", "parquet")
+
+ /**
+ * The writer used to write the data.
+ */
+ private val writer = ParquetHostDataWriter(path.toFile(), bufferSize = 4096)
+
+ @AfterEach
+ fun tearDown() {
+ writer.close()
+ Files.deleteIfExists(path)
+ }
+
+ @Test
+ fun testSmoke() {
+ assertDoesNotThrow {
+ writer.write(object : HostTableReader {
+ override val timestamp: Instant = Instant.now()
+ override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096)
+ override val guestsTerminated: Int = 0
+ override val guestsRunning: Int = 0
+ override val guestsError: Int = 0
+ override val guestsInvalid: Int = 0
+ override val cpuLimit: Double = 4096.0
+ override val cpuUsage: Double = 1.0
+ override val cpuDemand: Double = 1.0
+ override val cpuUtilization: Double = 0.0
+ override val cpuActiveTime: Long = 1
+ override val cpuIdleTime: Long = 1
+ override val cpuStealTime: Long = 1
+ override val cpuLostTime: Long = 1
+ override val powerUsage: Double = 1.0
+ override val powerTotal: Double = 1.0
+ override val uptime: Long = 1
+ override val downtime: Long = 1
+ override val bootTime: Instant? = null
+ })
+ }
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt
new file mode 100644
index 00000000..280f5ef8
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.opendc.telemetry.compute.table.HostInfo
+import org.opendc.telemetry.compute.table.ServerInfo
+import org.opendc.telemetry.compute.table.ServerTableReader
+import java.nio.file.Files
+import java.time.Instant
+
+/**
+ * Test suite for [ParquetServerDataWriter]
+ */
+class ServerDataWriterTest {
+ /**
+ * The path to write the data file to.
+ */
+ private val path = Files.createTempFile("opendc", "parquet")
+
+ /**
+ * The writer used to write the data.
+ */
+ private val writer = ParquetServerDataWriter(path.toFile(), bufferSize = 4096)
+
+ @AfterEach
+ fun tearDown() {
+ writer.close()
+ Files.deleteIfExists(path)
+ }
+
+ @Test
+ fun testSmoke() {
+ assertDoesNotThrow {
+ writer.write(object : ServerTableReader {
+ override val timestamp: Instant = Instant.now()
+ override val server: ServerInfo = ServerInfo("id", "test", "vm", "x86", "test", "test", 2, 4096)
+ override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096)
+ override val cpuLimit: Double = 4096.0
+ override val cpuActiveTime: Long = 1
+ override val cpuIdleTime: Long = 1
+ override val cpuStealTime: Long = 1
+ override val cpuLostTime: Long = 1
+ override val uptime: Long = 1
+ override val downtime: Long = 1
+ override val provisionTime: Instant = timestamp
+ override val bootTime: Instant? = null
+ })
+ }
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt
new file mode 100644
index 00000000..7ffa7186
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.opendc.telemetry.compute.table.ServiceTableReader
+import java.nio.file.Files
+import java.time.Instant
+
+/**
+ * Test suite for [ParquetServiceDataWriter]
+ */
+class ServiceDataWriterTest {
+ /**
+ * The path to write the data file to.
+ */
+ private val path = Files.createTempFile("opendc", "parquet")
+
+ /**
+ * The writer used to write the data.
+ */
+ private val writer = ParquetServiceDataWriter(path.toFile(), bufferSize = 4096)
+
+ @AfterEach
+ fun tearDown() {
+ writer.close()
+ Files.deleteIfExists(path)
+ }
+
+ @Test
+ fun testSmoke() {
+ assertDoesNotThrow {
+ writer.write(object : ServiceTableReader {
+ override val timestamp: Instant = Instant.now()
+ override val hostsUp: Int = 1
+ override val hostsDown: Int = 0
+ override val serversPending: Int = 1
+ override val serversActive: Int = 1
+ override val attemptsSuccess: Int = 1
+ override val attemptsFailure: Int = 0
+ override val attemptsError: Int = 0
+ })
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
index b0181cbc..05d0234a 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
@@ -42,9 +42,11 @@ public interface Table {
public val partitionKeys: List<TableColumn<*>>
/**
- * Open a [TableReader] for this table.
+ * Open a [TableReader] for a projection of this table.
+ *
+ * @param projection The list of columns to fetch from the table or `null` if no projection is performed.
*/
- public fun newReader(): TableReader
+ public fun newReader(projection: List<TableColumn<*>>? = null): TableReader
/**
* Open a [TableWriter] for this table.
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt
index 776c40c0..b77a2982 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt
@@ -33,7 +33,7 @@ public class TableColumn<out T>(public val name: String, type: Class<T>) {
/**
* The type of the column.
*/
- private val type: Class<*> = type
+ public val type: Class<*> = type
/**
* Determine whether the type of the column is a subtype of [column].
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt
index 532f6d24..5e8859e4 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt
@@ -24,22 +24,21 @@
package org.opendc.trace.conv
import org.opendc.trace.TableColumn
-import org.opendc.trace.column
/**
* Members of the interference group.
*/
@JvmField
-public val INTERFERENCE_GROUP_MEMBERS: TableColumn<Set<String>> = column("interference_group:members")
+public val INTERFERENCE_GROUP_MEMBERS: TableColumn<Set<String>> = column("members")
/**
* Target load after which the interference occurs.
*/
@JvmField
-public val INTERFERENCE_GROUP_TARGET: TableColumn<Double> = column("interference_group:target")
+public val INTERFERENCE_GROUP_TARGET: TableColumn<Double> = column("target")
/**
* Performance score when the interference occurs.
*/
@JvmField
-public val INTERFERENCE_GROUP_SCORE: TableColumn<Double> = column("interference_group:score")
+public val INTERFERENCE_GROUP_SCORE: TableColumn<Double> = column("score")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
index e9fc5d44..e602e534 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
@@ -24,47 +24,46 @@
package org.opendc.trace.conv
import org.opendc.trace.TableColumn
-import org.opendc.trace.column
import java.time.Instant
/**
* Identifier of the resource.
*/
@JvmField
-public val RESOURCE_ID: TableColumn<String> = column("resource:id")
+public val RESOURCE_ID: TableColumn<String> = column("id")
/**
* The cluster to which the resource belongs.
*/
@JvmField
-public val RESOURCE_CLUSTER_ID: TableColumn<String> = column("resource:cluster_id")
+public val RESOURCE_CLUSTER_ID: TableColumn<String> = column("cluster_id")
/**
* Start time for the resource.
*/
@JvmField
-public val RESOURCE_START_TIME: TableColumn<Instant> = column("resource:start_time")
+public val RESOURCE_START_TIME: TableColumn<Instant> = column("start_time")
/**
* End time for the resource.
*/
@JvmField
-public val RESOURCE_STOP_TIME: TableColumn<Instant> = column("resource:stop_time")
+public val RESOURCE_STOP_TIME: TableColumn<Instant> = column("stop_time")
/**
* Number of CPUs for the resource.
*/
@JvmField
-public val RESOURCE_CPU_COUNT: TableColumn<Int> = column("resource:cpu_count")
+public val RESOURCE_CPU_COUNT: TableColumn<Int> = column("cpu_count")
/**
* Total CPU capacity of the resource in MHz.
*/
@JvmField
-public val RESOURCE_CPU_CAPACITY: TableColumn<Double> = column("resource:cpu_capacity")
+public val RESOURCE_CPU_CAPACITY: TableColumn<Double> = column("cpu_capacity")
/**
* Memory capacity for the resource in KB.
*/
@JvmField
-public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = column("resource:mem_capacity")
+public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = column("mem_capacity")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt
index d5bbafd7..3a44f817 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt
@@ -24,7 +24,6 @@
package org.opendc.trace.conv
import org.opendc.trace.TableColumn
-import org.opendc.trace.column
import java.time.Duration
import java.time.Instant
@@ -32,70 +31,70 @@ import java.time.Instant
* The timestamp at which the state was recorded.
*/
@JvmField
-public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = column("resource_state:timestamp")
+public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = column("timestamp")
/**
* Duration for the state.
*/
@JvmField
-public val RESOURCE_STATE_DURATION: TableColumn<Duration> = column("resource_state:duration")
+public val RESOURCE_STATE_DURATION: TableColumn<Duration> = column("duration")
/**
* A flag to indicate that the resource is powered on.
*/
@JvmField
-public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = column("resource_state:powered_on")
+public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = column("powered_on")
/**
* Total CPU usage of the resource in MHz.
*/
@JvmField
-public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = column("resource_state:cpu_usage")
+public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = column("cpu_usage")
/**
* Total CPU usage of the resource in percentage.
*/
@JvmField
-public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = column("resource_state:cpu_usage_pct")
+public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = column("cpu_usage_pct")
/**
* Total CPU demand of the resource in MHz.
*/
@JvmField
-public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = column("resource_state:cpu_demand")
+public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = column("cpu_demand")
/**
* CPU ready percentage.
*/
@JvmField
-public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = column("resource_state:cpu_ready_pct")
+public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = column("cpu_ready_pct")
/**
* Memory usage of the resource in KB.
*/
@JvmField
-public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = column("resource_state:mem_usage")
+public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = column("mem_usage")
/**
* Disk read throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = column("resource_state:disk_read")
+public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = column("disk_read")
/**
* Disk write throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = column("resource_state:disk_write")
+public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = column("disk_write")
/**
* Network receive throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_NET_RX: TableColumn<Double> = column("resource_state:net_rx")
+public val RESOURCE_STATE_NET_RX: TableColumn<Double> = column("net_rx")
/**
* Network transmit throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_NET_TX: TableColumn<Double> = column("resource_state:net_tx")
+public val RESOURCE_STATE_NET_TX: TableColumn<Double> = column("net_tx")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt
index 31a58360..a58505e9 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt
@@ -21,7 +21,9 @@
*/
@file:JvmName("TableColumns")
-package org.opendc.trace
+package org.opendc.trace.conv
+
+import org.opendc.trace.TableColumn
/**
* Construct a [TableColumn] with the specified [name] and type [T].
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
index 397c0794..e6daafb7 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
@@ -24,7 +24,6 @@
package org.opendc.trace.conv
import org.opendc.trace.TableColumn
-import org.opendc.trace.column
import java.time.Duration
import java.time.Instant
@@ -32,70 +31,70 @@ import java.time.Instant
* A column containing the task identifier.
*/
@JvmField
-public val TASK_ID: TableColumn<String> = column("task:id")
+public val TASK_ID: TableColumn<String> = column("id")
/**
* A column containing the identifier of the workflow.
*/
@JvmField
-public val TASK_WORKFLOW_ID: TableColumn<String> = column("task:workflow_id")
+public val TASK_WORKFLOW_ID: TableColumn<String> = column("workflow_id")
/**
* A column containing the submission time of the task.
*/
@JvmField
-public val TASK_SUBMIT_TIME: TableColumn<Instant> = column("task:submit_time")
+public val TASK_SUBMIT_TIME: TableColumn<Instant> = column("submit_time")
/**
* A column containing the wait time of the task.
*/
@JvmField
-public val TASK_WAIT_TIME: TableColumn<Instant> = column("task:wait_time")
+public val TASK_WAIT_TIME: TableColumn<Instant> = column("wait_time")
/**
* A column containing the runtime time of the task.
*/
@JvmField
-public val TASK_RUNTIME: TableColumn<Duration> = column("task:runtime")
+public val TASK_RUNTIME: TableColumn<Duration> = column("runtime")
/**
* A column containing the parents of a task.
*/
@JvmField
-public val TASK_PARENTS: TableColumn<Set<String>> = column("task:parents")
+public val TASK_PARENTS: TableColumn<Set<String>> = column("parents")
/**
* A column containing the children of a task.
*/
@JvmField
-public val TASK_CHILDREN: TableColumn<Set<String>> = column("task:children")
+public val TASK_CHILDREN: TableColumn<Set<String>> = column("children")
/**
* A column containing the requested CPUs of a task.
*/
@JvmField
-public val TASK_REQ_NCPUS: TableColumn<Int> = column("task:req_ncpus")
+public val TASK_REQ_NCPUS: TableColumn<Int> = column("req_ncpus")
/**
* A column containing the allocated CPUs of a task.
*/
@JvmField
-public val TASK_ALLOC_NCPUS: TableColumn<Int> = column("task:alloc_ncpus")
+public val TASK_ALLOC_NCPUS: TableColumn<Int> = column("alloc_ncpus")
/**
* A column containing the status of a task.
*/
@JvmField
-public val TASK_STATUS: TableColumn<Int> = column("task:status")
+public val TASK_STATUS: TableColumn<Int> = column("status")
/**
* A column containing the group id of a task.
*/
@JvmField
-public val TASK_GROUP_ID: TableColumn<Int> = column("task:group_id")
+public val TASK_GROUP_ID: TableColumn<Int> = column("group_id")
/**
* A column containing the user id of a task.
*/
@JvmField
-public val TASK_USER_ID: TableColumn<Int> = column("task:user_id")
+public val TASK_USER_ID: TableColumn<Int> = column("user_id")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt
index 24551edb..b848e19a 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt
@@ -43,7 +43,9 @@ internal class TableImpl(val trace: TraceImpl, override val name: String) : Tabl
override val partitionKeys: List<TableColumn<*>>
get() = details.partitionKeys
- override fun newReader(): TableReader = trace.format.newReader(trace.path, name)
+ override fun newReader(projection: List<TableColumn<*>>?): TableReader {
+ return trace.format.newReader(trace.path, name, projection)
+ }
override fun newWriter(): TableWriter = trace.format.newWriter(trace.path, name)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
index f2e610db..47761e0f 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
@@ -22,6 +22,7 @@
package org.opendc.trace.spi
+import org.opendc.trace.TableColumn
import org.opendc.trace.TableReader
import org.opendc.trace.TableWriter
import java.nio.file.Path
@@ -68,10 +69,11 @@ public interface TraceFormat {
*
* @param path The path to the trace to open.
* @param table The name of the table to open a [TableReader] for.
+ * @param projection The list of [TableColumn]s to project or `null` if no projection is performed.
* @throws IllegalArgumentException If [table] does not exist.
* @return A [TableReader] instance for the table.
*/
- public fun newReader(path: Path, table: String): TableReader
+ public fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader
/**
* Open a [TableWriter] for the specified [table].
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
index 8e3e60cc..73978990 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
@@ -81,7 +81,7 @@ public class AzureTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_RESOURCES -> {
val stream = GZIPInputStream(path.resolve("vmtable/vmtable.csv.gz").inputStream())
diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
index 56f9a940..263d26ce 100644
--- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
@@ -57,7 +57,7 @@ class AzureTraceFormatTest {
@Test
fun testResources() {
val path = Paths.get("src/test/resources/trace")
- val reader = format.newReader(path, TABLE_RESOURCES)
+ val reader = format.newReader(path, TABLE_RESOURCES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
{ assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) },
@@ -71,7 +71,7 @@ class AzureTraceFormatTest {
@Test
fun testSmoke() {
val path = Paths.get("src/test/resources/trace")
- val reader = format.newReader(path, TABLE_RESOURCE_STATES)
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
index 11d21a04..82e454ad 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
@@ -72,7 +72,7 @@ public class BitbrainsExTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_RESOURCE_STATES -> newResourceStateReader(path)
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
index e1e7604a..a374e951 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
@@ -81,7 +81,7 @@ public class BitbrainsTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_RESOURCES -> {
val vms = Files.walk(path, 1)
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
index 77429e3e..c944cb98 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
@@ -59,7 +59,7 @@ internal class BitbrainsExTraceFormatTest {
@Test
fun testSmoke() {
val path = Paths.get("src/test/resources/vm.txt")
- val reader = format.newReader(path, TABLE_RESOURCE_STATES)
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
index 9309beb1..841801e6 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
@@ -57,7 +57,7 @@ class BitbrainsTraceFormatTest {
@Test
fun testResources() {
val path = Paths.get("src/test/resources/bitbrains.csv")
- val reader = format.newReader(path, TABLE_RESOURCES)
+ val reader = format.newReader(path, TABLE_RESOURCES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -71,7 +71,7 @@ class BitbrainsTraceFormatTest {
@Test
fun testSmoke() {
val path = Paths.get("src/test/resources/bitbrains.csv")
- val reader = format.newReader(path, TABLE_RESOURCE_STATES)
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
diff --git a/opendc-trace/opendc-trace-calcite/build.gradle.kts b/opendc-trace/opendc-trace-calcite/build.gradle.kts
new file mode 100644
index 00000000..2ffdac3c
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/build.gradle.kts
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Apache Calcite (SQL) integration for the OpenDC trace library"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+}
+
+dependencies {
+ api(projects.opendcTrace.opendcTraceApi)
+
+ api(libs.calcite.core)
+
+ testRuntimeOnly(projects.opendcTrace.opendcTraceOpendc)
+ testRuntimeOnly(libs.slf4j.simple)
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt
new file mode 100644
index 00000000..9c7b69a2
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.linq4j.Enumerable
+import org.apache.calcite.schema.Table
+
+/**
+ * A Calcite [Table] to which rows can be inserted.
+ */
+internal interface InsertableTable : Table {
+ /**
+ * Insert [rows] into this table.
+ *
+ * @param rows The rows to insert into the table.
+ * @return The number of rows inserted.
+ */
+ fun insert(rows: Enumerable<Array<Any?>>): Long
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
new file mode 100644
index 00000000..1854f262
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.linq4j.Enumerator
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableReader
+import java.sql.Timestamp
+import java.time.Duration
+import java.time.Instant
+import java.util.concurrent.atomic.AtomicBoolean
+
+/**
+ * An [Enumerator] for a [TableReader].
+ */
+internal class TraceReaderEnumerator<E>(
+ private val reader: TableReader,
+ private val columns: List<TableColumn<*>>,
+ private val cancelFlag: AtomicBoolean
+) : Enumerator<E> {
+ private val columnIndices = columns.map { reader.resolve(it) }.toIntArray()
+ private var current: E? = null
+
+ override fun moveNext(): Boolean {
+ if (cancelFlag.get()) {
+ return false
+ }
+
+ val reader = reader
+ val res = reader.nextRow()
+
+ if (res) {
+ @Suppress("UNCHECKED_CAST")
+ current = convertRow(reader) as E
+ } else {
+ current = null
+ }
+
+ return res
+ }
+
+ override fun current(): E = checkNotNull(current)
+
+ override fun reset() {
+ throw UnsupportedOperationException()
+ }
+
+ override fun close() {
+ reader.close()
+ }
+
+ private fun convertRow(reader: TableReader): Array<Any?> {
+ val res = arrayOfNulls<Any?>(columns.size)
+ val columnIndices = columnIndices
+
+ for ((index, column) in columns.withIndex()) {
+ val columnIndex = columnIndices[index]
+ res[index] = convertColumn(reader, column, columnIndex)
+ }
+ return res
+ }
+
+ private fun convertColumn(reader: TableReader, column: TableColumn<*>, columnIndex: Int): Any? {
+ val value = reader.get(columnIndex)
+
+ return when (column.type) {
+ Instant::class.java -> Timestamp.from(value as Instant)
+ Duration::class.java -> (value as Duration).toMillis()
+ Set::class.java -> (value as Set<*>).toTypedArray()
+ else -> value
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt
new file mode 100644
index 00000000..3249546d
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.schema.Schema
+import org.apache.calcite.schema.Table
+import org.apache.calcite.schema.impl.AbstractSchema
+import org.opendc.trace.Trace
+
+/**
+ * A Calcite [Schema] that exposes an OpenDC [Trace] into multiple SQL tables.
+ *
+ * @param trace The [Trace] to create a schema for.
+ */
+public class TraceSchema(private val trace: Trace) : AbstractSchema() {
+ /**
+ * The [Table]s that belong to this schema.
+ */
+ private val tables: Map<String, TraceTable> by lazy {
+ trace.tables.associateWith {
+ val table = checkNotNull(trace.getTable(it)) { "Unexpected null table" }
+ TraceTable(table)
+ }
+ }
+
+ override fun getTableMap(): Map<String, Table> = tables
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt
new file mode 100644
index 00000000..3c6badc8
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.model.ModelHandler
+import org.apache.calcite.schema.Schema
+import org.apache.calcite.schema.SchemaFactory
+import org.apache.calcite.schema.SchemaPlus
+import org.opendc.trace.Trace
+import java.io.File
+import java.nio.file.Paths
+
+/**
+ * Factory that creates a [TraceSchema].
+ *
+ * This factory allows users to include a schema that references a trace in a `model.json` file.
+ */
+public class TraceSchemaFactory : SchemaFactory {
+ override fun create(parentSchema: SchemaPlus, name: String, operand: Map<String, Any>): Schema {
+ val base = operand[ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName] as File?
+ val pathParam = requireNotNull(operand["path"]) { "Trace path not specified" } as String
+ val path = if (base != null) File(base, pathParam).toPath() else Paths.get(pathParam)
+
+ val format = requireNotNull(operand["format"]) { "Trace format not specified" } as String
+ val create = operand.getOrDefault("create", false) as Boolean
+
+ val trace = if (create) Trace.create(path, format) else Trace.open(path, format)
+ return TraceSchema(trace)
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
new file mode 100644
index 00000000..8c571b82
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
@@ -0,0 +1,176 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.DataContext
+import org.apache.calcite.adapter.java.AbstractQueryableTable
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.linq4j.*
+import org.apache.calcite.plan.RelOptCluster
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.prepare.Prepare.CatalogReader
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableModify
+import org.apache.calcite.rel.logical.LogicalTableModify
+import org.apache.calcite.rel.type.RelDataType
+import org.apache.calcite.rel.type.RelDataTypeFactory
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.schema.*
+import org.apache.calcite.schema.impl.AbstractTableQueryable
+import org.apache.calcite.sql.type.SqlTypeName
+import java.time.Duration
+import java.time.Instant
+import java.util.concurrent.atomic.AtomicBoolean
+
+/**
+ * A Calcite [Table] that exposes an OpenDC [org.opendc.trace.Table] as SQL table.
+ */
+internal class TraceTable(private val table: org.opendc.trace.Table) :
+ AbstractQueryableTable(Array<Any?>::class.java),
+ ProjectableFilterableTable,
+ ModifiableTable,
+ InsertableTable {
+ private var rowType: RelDataType? = null
+
+ override fun getRowType(typeFactory: RelDataTypeFactory): RelDataType {
+ var rowType = rowType
+ if (rowType == null) {
+ rowType = deduceRowType(typeFactory as JavaTypeFactory)
+ this.rowType = rowType
+ }
+
+ return rowType
+ }
+
+ override fun scan(root: DataContext, filters: MutableList<RexNode>, projects: IntArray?): Enumerable<Array<Any?>> {
+ // Filters are currently not supported by the OpenDC trace API. By keeping the filters in the list, Calcite
+ // assumes that they are declined and will perform the filters itself.
+
+ val projection = projects?.map { table.columns[it] }
+ val cancelFlag = DataContext.Variable.CANCEL_FLAG.get<AtomicBoolean>(root)
+ return object : AbstractEnumerable<Array<Any?>>() {
+ override fun enumerator(): Enumerator<Array<Any?>> =
+ TraceReaderEnumerator(table.newReader(projection), projection ?: table.columns, cancelFlag)
+ }
+ }
+
+ override fun insert(rows: Enumerable<Array<Any?>>): Long {
+ val table = table
+ val columns = table.columns
+ val writer = table.newWriter()
+ val columnIndices = columns.map { writer.resolve(it) }.toIntArray()
+ var rowCount = 0L
+
+ try {
+ for (row in rows) {
+ writer.startRow()
+
+ for ((index, value) in row.withIndex()) {
+ if (value == null) {
+ continue
+ }
+ val columnType = columns[index].type
+
+ writer.set(
+ columnIndices[index],
+ when (columnType) {
+ Duration::class.java -> Duration.ofMillis(value as Long)
+ Instant::class.java -> Instant.ofEpochMilli(value as Long)
+ Set::class.java -> (value as List<*>).toSet()
+ else -> value
+ }
+ )
+ }
+
+ writer.endRow()
+
+ rowCount++
+ }
+ } finally {
+ writer.close()
+ }
+
+ return rowCount
+ }
+
+ override fun <T> asQueryable(queryProvider: QueryProvider, schema: SchemaPlus, tableName: String): Queryable<T> {
+ return object : AbstractTableQueryable<T>(queryProvider, schema, this@TraceTable, tableName) {
+ override fun enumerator(): Enumerator<T> {
+ val cancelFlag = AtomicBoolean(false)
+ return TraceReaderEnumerator(
+ this@TraceTable.table.newReader(),
+ this@TraceTable.table.columns,
+ cancelFlag
+ )
+ }
+
+ override fun toString(): String = "TraceTableQueryable[table=$tableName]"
+ }
+ }
+
+ override fun getModifiableCollection(): MutableCollection<Any?>? = null
+
+ override fun toModificationRel(
+ cluster: RelOptCluster,
+ table: RelOptTable,
+ catalogReader: CatalogReader,
+ child: RelNode,
+ operation: TableModify.Operation,
+ updateColumnList: MutableList<String>?,
+ sourceExpressionList: MutableList<RexNode>?,
+ flattened: Boolean
+ ): TableModify {
+ cluster.planner.addRule(TraceTableModifyRule.DEFAULT.toRule())
+
+ return LogicalTableModify.create(
+ table,
+ catalogReader,
+ child,
+ operation,
+ updateColumnList,
+ sourceExpressionList,
+ flattened
+ )
+ }
+
+ override fun toString(): String = "TraceTable"
+
+ private fun deduceRowType(typeFactory: JavaTypeFactory): RelDataType {
+ val types = mutableListOf<RelDataType>()
+ val names = mutableListOf<String>()
+
+ for (column in table.columns) {
+ names.add(column.name)
+ types.add(
+ when (column.type) {
+ Instant::class.java -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP)
+ Duration::class.java -> typeFactory.createSqlType(SqlTypeName.BIGINT)
+ Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.UNKNOWN), -1)
+ else -> typeFactory.createType(column.type)
+ }
+ )
+ }
+
+ return typeFactory.createStructType(types, names)
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt
new file mode 100644
index 00000000..64dc0cea
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt
@@ -0,0 +1,138 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.adapter.enumerable.*
+import org.apache.calcite.adapter.enumerable.EnumerableRel.Prefer
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.linq4j.Enumerable
+import org.apache.calcite.linq4j.tree.BlockBuilder
+import org.apache.calcite.linq4j.tree.Expressions
+import org.apache.calcite.linq4j.tree.Types
+import org.apache.calcite.plan.*
+import org.apache.calcite.prepare.Prepare
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableModify
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.schema.ModifiableTable
+import org.apache.calcite.util.BuiltInMethod
+import java.lang.reflect.Method
+
+/**
+ * A [TableModify] expression that modifies a workload trace.
+ */
+internal class TraceTableModify(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ table: RelOptTable,
+ schema: Prepare.CatalogReader,
+ input: RelNode,
+ operation: Operation,
+ updateColumnList: List<String>?,
+ sourceExpressionList: List<RexNode>?,
+ flattened: Boolean
+) : TableModify(cluster, traitSet, table, schema, input, operation, updateColumnList, sourceExpressionList, flattened),
+ EnumerableRel {
+ init {
+ // Make sure the table is modifiable
+ table.unwrap(ModifiableTable::class.java) ?: throw AssertionError() // TODO: user error in validator
+ }
+
+ override fun copy(traitSet: RelTraitSet, inputs: List<RelNode>?): RelNode {
+ return TraceTableModify(
+ cluster,
+ traitSet,
+ table,
+ getCatalogReader(),
+ sole(inputs),
+ operation,
+ updateColumnList,
+ sourceExpressionList,
+ isFlattened
+ )
+ }
+
+ override fun computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery?): RelOptCost {
+ // Prefer this plan compared to the standard EnumerableTableModify.
+ return super.computeSelfCost(planner, mq)!!.multiplyBy(.1)
+ }
+
+ override fun implement(implementor: EnumerableRelImplementor, pref: Prefer): EnumerableRel.Result {
+ val builder = BlockBuilder()
+ val result = implementor.visitChild(this, 0, getInput() as EnumerableRel, pref)
+ val childExp = builder.append("child", result.block)
+ val convertedChildExpr = if (getInput().rowType != rowType) {
+ val typeFactory = cluster.typeFactory as JavaTypeFactory
+ val format = EnumerableTableScan.deduceFormat(table)
+ val physType = PhysTypeImpl.of(typeFactory, table.rowType, format)
+ val childPhysType = result.physType
+ val o = Expressions.parameter(childPhysType.javaRowType, "o")
+ val expressionList = List(childPhysType.rowType.fieldCount) { i ->
+ childPhysType.fieldReference(o, i, physType.getJavaFieldType(i))
+ }
+
+ builder.append(
+ "convertedChild",
+ Expressions.call(
+ childExp,
+ BuiltInMethod.SELECT.method,
+ Expressions.lambda<org.apache.calcite.linq4j.function.Function<*>>(physType.record(expressionList), o)
+ )
+ )
+ } else {
+ childExp
+ }
+
+ if (!isInsert) {
+ throw UnsupportedOperationException("Deletion and update not supported")
+ }
+
+ val expression = table.getExpression(InsertableTable::class.java)
+ builder.add(
+ Expressions.return_(
+ null,
+ Expressions.call(
+ BuiltInMethod.SINGLETON_ENUMERABLE.method,
+ Expressions.call(
+ Long::class.java,
+ expression,
+ INSERT_METHOD,
+ convertedChildExpr,
+ )
+ )
+ )
+ )
+
+ val rowFormat = if (pref === Prefer.ARRAY) JavaRowFormat.ARRAY else JavaRowFormat.SCALAR
+ val physType = PhysTypeImpl.of(implementor.typeFactory, getRowType(), rowFormat)
+ return implementor.result(physType, builder.toBlock())
+ }
+
+ private companion object {
+ /**
+ * Reference to [InsertableTable.insert] method.
+ */
+ val INSERT_METHOD: Method = Types.lookupMethod(InsertableTable::class.java, "insert", Enumerable::class.java)
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt
new file mode 100644
index 00000000..7572e381
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention
+import org.apache.calcite.plan.Convention
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableModify
+import org.apache.calcite.rel.logical.LogicalTableModify
+import org.apache.calcite.schema.ModifiableTable
+
+/**
+ * A [ConverterRule] from a [LogicalTableModify] to a [TraceTableModify].
+ */
+internal class TraceTableModifyRule(config: Config) : ConverterRule(config) {
+ override fun convert(rel: RelNode): RelNode? {
+ val modify = rel as TableModify
+ val table = modify.table!!
+
+ // Make sure that the table is modifiable
+ if (table.unwrap(ModifiableTable::class.java) == null) {
+ return null
+ }
+
+ val traitSet = modify.traitSet.replace(EnumerableConvention.INSTANCE)
+ return TraceTableModify(
+ modify.cluster, traitSet,
+ table,
+ modify.catalogReader,
+ convert(modify.input, traitSet),
+ modify.operation,
+ modify.updateColumnList,
+ modify.sourceExpressionList,
+ modify.isFlattened
+ )
+ }
+
+ companion object {
+ /** Default configuration. */
+ val DEFAULT: Config = Config.INSTANCE
+ .withConversion(LogicalTableModify::class.java, Convention.NONE, EnumerableConvention.INSTANCE, "TraceTableModificationRule")
+ .withRuleFactory { config: Config -> TraceTableModifyRule(config) }
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt
new file mode 100644
index 00000000..d2877d7c
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt
@@ -0,0 +1,158 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.jdbc.CalciteConnection
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.opendc.trace.Trace
+import java.nio.file.Files
+import java.nio.file.Paths
+import java.sql.DriverManager
+import java.sql.ResultSet
+import java.sql.Statement
+import java.sql.Timestamp
+import java.util.*
+
+/**
+ * Smoke test for Apache Calcite integration.
+ */
+class CalciteTest {
+ /**
+ * The trace to experiment with.
+ */
+ private val trace = Trace.open(Paths.get("src/test/resources/trace"), format = "opendc-vm")
+
+ @Test
+ fun testResources() {
+ runQuery(trace, "SELECT * FROM trace.resources") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertEquals("1019", rs.getString("id")) },
+ { assertEquals(1, rs.getInt("cpu_count")) },
+ { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("start_time")) },
+ { assertEquals(181352.0, rs.getDouble("mem_capacity")) },
+ { assertTrue(rs.next()) },
+ { assertEquals("1023", rs.getString("id")) },
+ { assertTrue(rs.next()) },
+ { assertEquals("1052", rs.getString("id")) },
+ { assertTrue(rs.next()) },
+ { assertEquals("1073", rs.getString("id")) },
+ { assertFalse(rs.next()) }
+ )
+ }
+ }
+
+ @Test
+ fun testResourceStates() {
+ runQuery(trace, "SELECT * FROM trace.resource_states") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertEquals("1019", rs.getString("id")) },
+ { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("timestamp")) },
+ { assertEquals(300000, rs.getLong("duration")) },
+ { assertEquals(0.0, rs.getDouble("cpu_usage")) },
+ { assertTrue(rs.next()) },
+ { assertEquals("1019", rs.getString("id")) },
+ )
+ }
+ }
+
+ @Test
+ fun testInterferenceGroups() {
+ runQuery(trace, "SELECT * FROM trace.interference_groups") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertArrayEquals(arrayOf("1019", "1023", "1052"), rs.getArray("members").array as Array<*>) },
+ { assertEquals(0.0, rs.getDouble("target")) },
+ { assertEquals(0.8830158730158756, rs.getDouble("score")) },
+ )
+ }
+ }
+
+ @Test
+ fun testComplexQuery() {
+ runQuery(trace, "SELECT max(cpu_usage) as max_cpu_usage, avg(cpu_usage) as avg_cpu_usage FROM trace.resource_states") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertEquals(249.59993808, rs.getDouble("max_cpu_usage")) },
+ { assertEquals(5.387240309118493, rs.getDouble("avg_cpu_usage")) },
+ )
+ }
+ }
+
+ @Test
+ fun testInsert() {
+ val tmp = Files.createTempDirectory("opendc")
+ val newTrace = Trace.create(tmp, "opendc-vm")
+
+ runStatement(newTrace) { stmt ->
+ val count = stmt.executeUpdate(
+ """
+ INSERT INTO trace.resources (id, start_time, stop_time, cpu_count, cpu_capacity, mem_capacity)
+ VALUES (1234, '2013-08-12 13:35:46.0', '2013-09-11 13:39:58.0', 1, 2926.0, 1024.0)
+ """.trimIndent()
+ )
+ assertEquals(1, count)
+ }
+
+ runQuery(newTrace, "SELECT * FROM trace.resources") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertEquals("1234", rs.getString("id")) },
+ { assertEquals(1, rs.getInt("cpu_count")) },
+ { assertEquals(Timestamp.valueOf("2013-08-12 13:35:46.0"), rs.getTimestamp("start_time")) },
+ { assertEquals(2926.0, rs.getDouble("cpu_capacity")) },
+ { assertEquals(1024.0, rs.getDouble("mem_capacity")) }
+ )
+ }
+ }
+
+ /**
+ * Helper function to run statement for the specified trace.
+ */
+ private fun runQuery(trace: Trace, query: String, block: (ResultSet) -> Unit) {
+ runStatement(trace) { stmt ->
+ val rs = stmt.executeQuery(query)
+ rs.use { block(rs) }
+ }
+ }
+
+ /**
+ * Helper function to run statement for the specified trace.
+ */
+ private fun runStatement(trace: Trace, block: (Statement) -> Unit) {
+ val info = Properties()
+ info.setProperty("lex", "JAVA")
+ val connection = DriverManager.getConnection("jdbc:calcite:", info).unwrap(CalciteConnection::class.java)
+ connection.rootSchema.add("trace", TraceSchema(trace))
+
+ val stmt = connection.createStatement()
+ try {
+ block(stmt)
+ } finally {
+ stmt.close()
+ connection.close()
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt
new file mode 100644
index 00000000..0a552e74
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import java.sql.DriverManager
+import java.sql.Timestamp
+import java.util.*
+
+/**
+ * Test suite for [TraceSchemaFactory].
+ */
+class TraceSchemaFactoryTest {
+ @Test
+ fun testSmoke() {
+ val info = Properties()
+ info.setProperty("lex", "JAVA")
+ val connection = DriverManager.getConnection("jdbc:calcite:model=src/test/resources/model.json", info)
+ val stmt = connection.createStatement()
+ val rs = stmt.executeQuery("SELECT * FROM trace.resources")
+ try {
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertEquals("1019", rs.getString("id")) },
+ { assertEquals(1, rs.getInt("cpu_count")) },
+ { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("start_time")) },
+ { assertEquals(181352.0, rs.getDouble("mem_capacity")) },
+ )
+ } finally {
+ rs.close()
+ stmt.close()
+ connection.close()
+ }
+ }
+
+ @Test
+ fun testWithoutParams() {
+ assertThrows<java.lang.RuntimeException> {
+ DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory")
+ }
+ }
+
+ @Test
+ fun testWithoutPath() {
+ assertThrows<java.lang.RuntimeException> {
+ DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory; schema.format=opendc-vm")
+ }
+ }
+
+ @Test
+ fun testWithoutFormat() {
+ assertThrows<java.lang.RuntimeException> {
+ DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory; schema.path=trace")
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/model.json b/opendc-trace/opendc-trace-calcite/src/test/resources/model.json
new file mode 100644
index 00000000..91e2657f
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/resources/model.json
@@ -0,0 +1,15 @@
+{
+ "version": "1.0",
+ "defaultSchema": "trace",
+ "schemas": [
+ {
+ "name": "trace",
+ "type": "custom",
+ "factory": "org.opendc.trace.calcite.TraceSchemaFactory",
+ "operand": {
+ "path": "trace",
+ "format": "opendc-vm"
+ }
+ }
+ ]
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json
new file mode 100644
index 00000000..6a0616d9
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json
@@ -0,0 +1,20 @@
+[
+ {
+ "vms": [
+ "1019",
+ "1023",
+ "1052"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.8830158730158756
+ },
+ {
+ "vms": [
+ "1023",
+ "1052",
+ "1073"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.7133055555552751
+ }
+]
diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet
new file mode 100644
index 00000000..d8184945
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet
new file mode 100644
index 00000000..00ab5835
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
index 63688523..8d9eab82 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
@@ -70,7 +70,7 @@ public class GwfTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile()))
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
index 9bf28ad7..411d45d0 100644
--- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
@@ -58,7 +58,7 @@ internal class GwfTraceFormatTest {
@Test
fun testTableReader() {
val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI())
- val reader = format.newReader(path, TABLE_TASKS)
+ val reader = format.newReader(path, TABLE_TASKS, null)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -73,7 +73,7 @@ internal class GwfTraceFormatTest {
@Test
fun testReadingRowWithDependencies() {
val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI())
- val reader = format.newReader(path, TABLE_TASKS)
+ val reader = format.newReader(path, TABLE_TASKS, null)
// Move to row 7
for (x in 1..6)
@@ -85,7 +85,7 @@ internal class GwfTraceFormatTest {
{ assertEquals("7", reader.get(TASK_ID)) },
{ assertEquals(Instant.ofEpochSecond(87), reader.get(TASK_SUBMIT_TIME)) },
{ assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) },
- { assertEquals(setOf<String>("4", "5", "6"), reader.get(TASK_PARENTS)) },
+ { assertEquals(setOf("4", "5", "6"), reader.get(TASK_PARENTS)) },
)
}
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
index b82da888..7a01b881 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
@@ -22,38 +22,30 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.ResourceState
import org.opendc.trace.util.parquet.LocalParquetReader
-import java.time.Duration
-import java.time.Instant
/**
* A [TableReader] implementation for the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader<ResourceState>) : TableReader {
/**
* The current record.
*/
- private var record: GenericRecord? = null
-
- /**
- * A flag to indicate that the columns have been initialized.
- */
- private var hasInitializedColumns = false
+ private var record: ResourceState? = null
override fun nextRow(): Boolean {
- val record = reader.read()
- this.record = record
+ try {
+ val record = reader.read()
+ this.record = record
- if (!hasInitializedColumns && record != null) {
- initColumns(record.schema)
- hasInitializedColumns = true
+ return record != null
+ } catch (e: Throwable) {
+ this.record = null
+ throw e
}
-
- return record != null
}
override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
@@ -67,36 +59,36 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_ID -> record[AVRO_COL_ID].toString()
- COL_TIMESTAMP -> Instant.ofEpochMilli(record[AVRO_COL_TIMESTAMP] as Long)
- COL_DURATION -> Duration.ofMillis(record[AVRO_COL_DURATION] as Long)
- COL_CPU_COUNT -> getInt(index)
- COL_CPU_USAGE -> getDouble(index)
- else -> throw IllegalArgumentException("Invalid column")
+ COL_ID -> record.id
+ COL_TIMESTAMP -> record.timestamp
+ COL_DURATION -> record.duration
+ COL_CPU_COUNT -> record.cpuCount
+ COL_CPU_USAGE -> record.cpuUsage
+ else -> throw IllegalArgumentException("Invalid column index $index")
}
}
override fun getBoolean(index: Int): Boolean {
- throw IllegalArgumentException("Invalid column")
+ throw IllegalArgumentException("Invalid column or type [index $index]")
}
override fun getInt(index: Int): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int
- else -> throw IllegalArgumentException("Invalid column")
+ COL_CPU_COUNT -> record.cpuCount
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
override fun getLong(index: Int): Long {
- throw IllegalArgumentException("Invalid column")
+ throw IllegalArgumentException("Invalid column or type [index $index]")
}
override fun getDouble(index: Int): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_USAGE -> (record[AVRO_COL_CPU_USAGE] as Number).toDouble()
- else -> throw IllegalArgumentException("Invalid column")
+ COL_CPU_USAGE -> record.cpuUsage
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
@@ -106,28 +98,6 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
override fun toString(): String = "OdcVmResourceStateTableReader"
- /**
- * Initialize the columns for the reader based on [schema].
- */
- private fun initColumns(schema: Schema) {
- try {
- AVRO_COL_ID = schema.getField("id").pos()
- AVRO_COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos()
- AVRO_COL_DURATION = schema.getField("duration").pos()
- AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
- AVRO_COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
- } catch (e: NullPointerException) {
- // This happens when the field we are trying to access does not exist
- throw IllegalArgumentException("Invalid schema", e)
- }
- }
-
- private var AVRO_COL_ID = -1
- private var AVRO_COL_TIMESTAMP = -1
- private var AVRO_COL_DURATION = -1
- private var AVRO_COL_CPU_COUNT = -1
- private var AVRO_COL_CPU_USAGE = -1
-
private val COL_ID = 0
private val COL_TIMESTAMP = 1
private val COL_DURATION = 2
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
index 01b9750c..97af5b59 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
@@ -22,84 +22,85 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
-import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.ResourceState
import java.time.Duration
import java.time.Instant
/**
* A [TableWriter] implementation for the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceStateTableWriter(
- private val writer: ParquetWriter<GenericRecord>,
- private val schema: Schema
-) : TableWriter {
+internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<ResourceState>) : TableWriter {
/**
- * The current builder for the record that is being written.
+ * The current state for the record that is being written.
*/
- private var builder: GenericRecordBuilder? = null
-
- /**
- * The fields belonging to the resource state schema.
- */
- private val fields = schema.fields
+ private var _isActive = false
+ private var _id: String = ""
+ private var _timestamp: Instant = Instant.MIN
+ private var _duration: Duration = Duration.ZERO
+ private var _cpuCount: Int = 0
+ private var _cpuUsage: Double = Double.NaN
override fun startRow() {
- builder = GenericRecordBuilder(schema)
+ _isActive = true
+ _id = ""
+ _timestamp = Instant.MIN
+ _duration = Duration.ZERO
+ _cpuCount = 0
+ _cpuUsage = Double.NaN
}
override fun endRow() {
- val builder = checkNotNull(builder) { "No active row" }
- this.builder = null
-
- val record = builder.build()
- val id = record[COL_ID] as String
- val timestamp = record[COL_TIMESTAMP] as Long
+ check(_isActive) { "No active row" }
+ _isActive = false
- check(lastId != id || timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" }
+ check(lastId != _id || _timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" }
- writer.write(builder.build())
+ writer.write(ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage))
- lastId = id
- lastTimestamp = timestamp
+ lastId = _id
+ lastTimestamp = _timestamp
}
- override fun resolve(column: TableColumn<*>): Int {
- val schema = schema
- return when (column) {
- RESOURCE_ID -> schema.getField("id").pos()
- RESOURCE_STATE_TIMESTAMP -> (schema.getField("timestamp") ?: schema.getField("time")).pos()
- RESOURCE_STATE_DURATION -> schema.getField("duration").pos()
- RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
- RESOURCE_STATE_CPU_USAGE -> (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
- else -> -1
- }
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
override fun set(index: Int, value: Any) {
- val builder = checkNotNull(builder) { "No active row" }
-
- builder.set(
- fields[index],
- when (index) {
- COL_TIMESTAMP -> (value as Instant).toEpochMilli()
- COL_DURATION -> (value as Duration).toMillis()
- else -> value
- }
- )
+ check(_isActive) { "No active row" }
+
+ when (index) {
+ COL_ID -> _id = value as String
+ COL_TIMESTAMP -> _timestamp = value as Instant
+ COL_DURATION -> _duration = value as Duration
+ COL_CPU_COUNT -> _cpuCount = value as Int
+ COL_CPU_USAGE -> _cpuUsage = value as Double
+ }
}
- override fun setBoolean(index: Int, value: Boolean) = set(index, value)
+ override fun setBoolean(index: Int, value: Boolean) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
- override fun setInt(index: Int, value: Int) = set(index, value)
+ override fun setInt(index: Int, value: Int) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_CPU_COUNT -> _cpuCount = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
- override fun setLong(index: Int, value: Long) = set(index, value)
+ override fun setLong(index: Int, value: Long) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
- override fun setDouble(index: Int, value: Double) = set(index, value)
+ override fun setDouble(index: Int, value: Double) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_CPU_USAGE -> _cpuUsage = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
override fun flush() {
// Not available
@@ -113,12 +114,19 @@ internal class OdcVmResourceStateTableWriter(
* Last column values that are used to check for correct partitioning.
*/
private var lastId: String? = null
- private var lastTimestamp: Long = Long.MIN_VALUE
-
- /**
- * Columns with special behavior.
- */
- private val COL_ID = resolve(RESOURCE_ID)
- private val COL_TIMESTAMP = resolve(RESOURCE_STATE_TIMESTAMP)
- private val COL_DURATION = resolve(RESOURCE_STATE_DURATION)
+ private var lastTimestamp: Instant = Instant.MAX
+
+ private val COL_ID = 0
+ private val COL_TIMESTAMP = 1
+ private val COL_DURATION = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_CPU_USAGE = 4
+
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
+ RESOURCE_STATE_DURATION to COL_DURATION,
+ RESOURCE_CPU_COUNT to COL_CPU_COUNT,
+ RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE,
+ )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
index 4909e70e..6102332f 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
@@ -22,37 +22,30 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.Resource
import org.opendc.trace.util.parquet.LocalParquetReader
-import java.time.Instant
/**
- * A [TableReader] implementation for the resources table in the OpenDC virtual machine trace format.
+ * A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<Resource>) : TableReader {
/**
* The current record.
*/
- private var record: GenericRecord? = null
-
- /**
- * A flag to indicate that the columns have been initialized.
- */
- private var hasInitializedColumns = false
+ private var record: Resource? = null
override fun nextRow(): Boolean {
- val record = reader.read()
- this.record = record
+ try {
+ val record = reader.read()
+ this.record = record
- if (!hasInitializedColumns && record != null) {
- initColumns(record.schema)
- hasInitializedColumns = true
+ return record != null
+ } catch (e: Throwable) {
+ this.record = null
+ throw e
}
-
- return record != null
}
override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
@@ -66,9 +59,9 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_ID -> record[AVRO_COL_ID].toString()
- COL_START_TIME -> Instant.ofEpochMilli(record[AVRO_COL_START_TIME] as Long)
- COL_STOP_TIME -> Instant.ofEpochMilli(record[AVRO_COL_STOP_TIME] as Long)
+ COL_ID -> record.id
+ COL_START_TIME -> record.startTime
+ COL_STOP_TIME -> record.stopTime
COL_CPU_COUNT -> getInt(index)
COL_CPU_CAPACITY -> getDouble(index)
COL_MEM_CAPACITY -> getDouble(index)
@@ -84,7 +77,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int
+ COL_CPU_COUNT -> record.cpuCount
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -97,8 +90,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_CAPACITY -> if (AVRO_COL_CPU_CAPACITY >= 0) (record[AVRO_COL_CPU_CAPACITY] as Number).toDouble() else 0.0
- COL_MEM_CAPACITY -> (record[AVRO_COL_MEM_CAPACITY] as Number).toDouble()
+ COL_CPU_CAPACITY -> record.cpuCapacity
+ COL_MEM_CAPACITY -> record.memCapacity
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -109,30 +102,6 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
override fun toString(): String = "OdcVmResourceTableReader"
- /**
- * Initialize the columns for the reader based on [schema].
- */
- private fun initColumns(schema: Schema) {
- try {
- AVRO_COL_ID = schema.getField("id").pos()
- AVRO_COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
- AVRO_COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
- AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
- AVRO_COL_CPU_CAPACITY = schema.getField("cpu_capacity")?.pos() ?: -1
- AVRO_COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
- } catch (e: NullPointerException) {
- // This happens when the field we are trying to access does not exist
- throw IllegalArgumentException("Invalid schema")
- }
- }
-
- private var AVRO_COL_ID = -1
- private var AVRO_COL_START_TIME = -1
- private var AVRO_COL_STOP_TIME = -1
- private var AVRO_COL_CPU_COUNT = -1
- private var AVRO_COL_CPU_CAPACITY = -1
- private var AVRO_COL_MEM_CAPACITY = -1
-
private val COL_ID = 0
private val COL_START_TIME = 1
private val COL_STOP_TIME = 2
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
index edc89ee6..cae65faa 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
@@ -22,74 +22,82 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
-import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.Resource
import java.time.Instant
-import kotlin.math.roundToLong
/**
* A [TableWriter] implementation for the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceTableWriter(
- private val writer: ParquetWriter<GenericRecord>,
- private val schema: Schema
-) : TableWriter {
+internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resource>) : TableWriter {
/**
- * The current builder for the record that is being written.
+ * The current state for the record that is being written.
*/
- private var builder: GenericRecordBuilder? = null
-
- /**
- * The fields belonging to the resource schema.
- */
- private val fields = schema.fields
+ private var _isActive = false
+ private var _id: String = ""
+ private var _startTime: Instant = Instant.MIN
+ private var _stopTime: Instant = Instant.MIN
+ private var _cpuCount: Int = 0
+ private var _cpuCapacity: Double = Double.NaN
+ private var _memCapacity: Double = Double.NaN
override fun startRow() {
- builder = GenericRecordBuilder(schema)
+ _isActive = true
+ _id = ""
+ _startTime = Instant.MIN
+ _stopTime = Instant.MIN
+ _cpuCount = 0
+ _cpuCapacity = Double.NaN
+ _memCapacity = Double.NaN
}
override fun endRow() {
- val builder = checkNotNull(builder) { "No active row" }
- this.builder = null
- writer.write(builder.build())
+ check(_isActive) { "No active row" }
+ _isActive = false
+ writer.write(Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity))
}
- override fun resolve(column: TableColumn<*>): Int {
- val schema = schema
- return when (column) {
- RESOURCE_ID -> schema.getField("id").pos()
- RESOURCE_START_TIME -> (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
- RESOURCE_STOP_TIME -> (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
- RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
- RESOURCE_CPU_CAPACITY -> schema.getField("cpu_capacity").pos()
- RESOURCE_MEM_CAPACITY -> (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
- else -> -1
- }
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
override fun set(index: Int, value: Any) {
- val builder = checkNotNull(builder) { "No active row" }
- builder.set(
- fields[index],
- when (index) {
- COL_START_TIME, COL_STOP_TIME -> (value as Instant).toEpochMilli()
- COL_MEM_CAPACITY -> (value as Double).roundToLong()
- else -> value
- }
- )
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_ID -> _id = value as String
+ COL_START_TIME -> _startTime = value as Instant
+ COL_STOP_TIME -> _stopTime = value as Instant
+ COL_CPU_COUNT -> _cpuCount = value as Int
+ COL_CPU_CAPACITY -> _cpuCapacity = value as Double
+ COL_MEM_CAPACITY -> _memCapacity = value as Double
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
}
- override fun setBoolean(index: Int, value: Boolean) = set(index, value)
+ override fun setBoolean(index: Int, value: Boolean) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
- override fun setInt(index: Int, value: Int) = set(index, value)
+ override fun setInt(index: Int, value: Int) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_CPU_COUNT -> _cpuCount = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
- override fun setLong(index: Int, value: Long) = set(index, value)
+ override fun setLong(index: Int, value: Long) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
- override fun setDouble(index: Int, value: Double) = set(index, value)
+ override fun setDouble(index: Int, value: Double) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_CPU_CAPACITY -> _cpuCapacity = value
+ COL_MEM_CAPACITY -> _memCapacity = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
override fun flush() {
// Not available
@@ -99,10 +107,19 @@ internal class OdcVmResourceTableWriter(
writer.close()
}
- /**
- * Columns with special behavior.
- */
- private val COL_START_TIME = resolve(RESOURCE_START_TIME)
- private val COL_STOP_TIME = resolve(RESOURCE_STOP_TIME)
- private val COL_MEM_CAPACITY = resolve(RESOURCE_MEM_CAPACITY)
+ private val COL_ID = 0
+ private val COL_START_TIME = 1
+ private val COL_STOP_TIME = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_CPU_CAPACITY = 4
+ private val COL_MEM_CAPACITY = 5
+
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_START_TIME to COL_START_TIME,
+ RESOURCE_STOP_TIME to COL_STOP_TIME,
+ RESOURCE_CPU_COUNT to COL_CPU_COUNT,
+ RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY,
+ RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY,
+ )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index 36a1b4a0..d45910c6 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -22,19 +22,19 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericRecord
-import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.ResourceReadSupport
+import org.opendc.trace.opendc.parquet.ResourceStateReadSupport
+import org.opendc.trace.opendc.parquet.ResourceStateWriteSupport
+import org.opendc.trace.opendc.parquet.ResourceWriteSupport
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
-import org.opendc.trace.util.parquet.LocalOutputFile
import org.opendc.trace.util.parquet.LocalParquetReader
-import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
+import org.opendc.trace.util.parquet.LocalParquetWriter
import shaded.parquet.com.fasterxml.jackson.core.JsonEncoding
import shaded.parquet.com.fasterxml.jackson.core.JsonFactory
import java.nio.file.Files
@@ -102,14 +102,14 @@ public class OdcVmTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_RESOURCES -> {
- val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet"))
+ val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection))
OdcVmResourceTableReader(reader)
}
TABLE_RESOURCE_STATES -> {
- val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet"))
+ val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport(projection))
OdcVmResourceStateTableReader(reader)
}
TABLE_INTERFERENCE_GROUPS -> {
@@ -128,24 +128,24 @@ public class OdcVmTraceFormat : TraceFormat {
override fun newWriter(path: Path, table: String): TableWriter {
return when (table) {
TABLE_RESOURCES -> {
- val schema = RESOURCES_SCHEMA
- val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("meta.parquet")))
- .withSchema(schema)
+ val writer = LocalParquetWriter.builder(path.resolve("meta.parquet"), ResourceWriteSupport())
.withCompressionCodec(CompressionCodecName.ZSTD)
+ .withPageWriteChecksumEnabled(true)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()
- OdcVmResourceTableWriter(writer, schema)
+ OdcVmResourceTableWriter(writer)
}
TABLE_RESOURCE_STATES -> {
- val schema = RESOURCE_STATES_SCHEMA
- val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("trace.parquet")))
- .withSchema(schema)
+ val writer = LocalParquetWriter.builder(path.resolve("trace.parquet"), ResourceStateWriteSupport())
.withCompressionCodec(CompressionCodecName.ZSTD)
.withDictionaryEncoding("id", true)
.withBloomFilterEnabled("id", true)
+ .withPageWriteChecksumEnabled(true)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()
- OdcVmResourceStateTableWriter(writer, schema)
+ OdcVmResourceStateTableWriter(writer)
}
TABLE_INTERFERENCE_GROUPS -> {
val generator = jsonFactory.createGenerator(path.resolve("interference-model.json").toFile(), JsonEncoding.UTF8)
@@ -154,37 +154,4 @@ public class OdcVmTraceFormat : TraceFormat {
else -> throw IllegalArgumentException("Table $table not supported")
}
}
-
- public companion object {
- /**
- * Schema for the resources table in the trace.
- */
- @JvmStatic
- public val RESOURCES_SCHEMA: Schema = SchemaBuilder
- .record("resource")
- .namespace("org.opendc.trace.opendc")
- .fields()
- .requiredString("id")
- .name("start_time").type(TIMESTAMP_SCHEMA).noDefault()
- .name("stop_time").type(TIMESTAMP_SCHEMA).noDefault()
- .requiredInt("cpu_count")
- .requiredDouble("cpu_capacity")
- .requiredLong("mem_capacity")
- .endRecord()
-
- /**
- * Schema for the resource states table in the trace.
- */
- @JvmStatic
- public val RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder
- .record("resource_state")
- .namespace("org.opendc.trace.opendc")
- .fields()
- .requiredString("id")
- .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
- .requiredLong("duration")
- .requiredInt("cpu_count")
- .requiredDouble("cpu_usage")
- .endRecord()
- }
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt
new file mode 100644
index 00000000..c6db45b5
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import java.time.Instant
+
+/**
+ * A description of a resource in a trace.
+ */
+internal data class Resource(
+ val id: String,
+ val startTime: Instant,
+ val stopTime: Instant,
+ val cpuCount: Int,
+ val cpuCapacity: Double,
+ val memCapacity: Double
+)
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
new file mode 100644
index 00000000..0d70446d
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.*
+import org.opendc.trace.TableColumn
+import org.opendc.trace.conv.*
+
+/**
+ * A [ReadSupport] instance for [Resource] objects.
+ */
+internal class ResourceReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Resource>() {
+ /**
+ * Mapping from field names to [TableColumn]s.
+ */
+ private val fieldMap = mapOf<String, TableColumn<*>>(
+ "id" to RESOURCE_ID,
+ "submissionTime" to RESOURCE_START_TIME,
+ "start_time" to RESOURCE_START_TIME,
+ "endTime" to RESOURCE_STOP_TIME,
+ "stop_time" to RESOURCE_STOP_TIME,
+ "maxCores" to RESOURCE_CPU_COUNT,
+ "cpu_count" to RESOURCE_CPU_COUNT,
+ "cpu_capacity" to RESOURCE_CPU_CAPACITY,
+ "requiredMemory" to RESOURCE_MEM_CAPACITY,
+ "mem_capacity" to RESOURCE_MEM_CAPACITY,
+ )
+
+ override fun init(context: InitContext): ReadContext {
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val projectionSet = projection.toSet()
+
+ for (field in READ_SCHEMA.fields) {
+ val col = fieldMap[field.name] ?: continue
+ if (col in projectionSet) {
+ addField(field)
+ }
+ }
+ }
+ .named(READ_SCHEMA.name)
+ } else {
+ READ_SCHEMA
+ }
+
+ return ReadContext(projectedSchema)
+ }
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext
+ ): RecordMaterializer<Resource> = ResourceRecordMaterializer(readContext.requestedSchema)
+
+ companion object {
+ /**
+ * Parquet read schema (version 2.0) for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_0: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("submissionTime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("endTime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("maxCores"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("requiredMemory"),
+ )
+ .named("resource")
+
+ /**
+ * Parquet read schema (version 2.1) for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_1: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("start_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("stop_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ )
+ .named("resource")
+
+ /**
+ * Parquet read schema for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0
+ .union(READ_SCHEMA_V2_1)
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt
new file mode 100644
index 00000000..3adb0709
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.parquet.io.api.*
+import org.apache.parquet.schema.MessageType
+import java.time.Instant
+
+/**
+ * A [RecordMaterializer] for [Resource] records.
+ */
+internal class ResourceRecordMaterializer(schema: MessageType) : RecordMaterializer<Resource>() {
+ /**
+ * State of current record being read.
+ */
+ private var _id = ""
+ private var _startTime = Instant.MIN
+ private var _stopTime = Instant.MIN
+ private var _cpuCount = 0
+ private var _cpuCapacity = 0.0
+ private var _memCapacity = 0.0
+
+ /**
+ * Root converter for the record.
+ */
+ private val root = object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters = schema.fields.map { type ->
+ when (type.name) {
+ "id" -> object : PrimitiveConverter() {
+ override fun addBinary(value: Binary) {
+ _id = value.toStringUsingUTF8()
+ }
+ }
+ "start_time", "submissionTime" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _startTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "stop_time", "endTime" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _stopTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "cpu_count", "maxCores" -> object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ _cpuCount = value
+ }
+ }
+ "cpu_capacity" -> object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ _cpuCapacity = value
+ }
+ }
+ "mem_capacity", "requiredMemory" -> object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ _memCapacity = value
+ }
+
+ override fun addLong(value: Long) {
+ _memCapacity = value.toDouble()
+ }
+ }
+ else -> error("Unknown column $type")
+ }
+ }
+
+ override fun start() {
+ _id = ""
+ _startTime = Instant.MIN
+ _stopTime = Instant.MIN
+ _cpuCount = 0
+ _cpuCapacity = 0.0
+ _memCapacity = 0.0
+ }
+
+ override fun end() {}
+
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
+
+ override fun getCurrentRecord(): Resource = Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity)
+
+ override fun getRootConverter(): GroupConverter = root
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt
new file mode 100644
index 00000000..9ad58764
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import java.time.Duration
+import java.time.Instant
+
+internal class ResourceState(
+ val id: String,
+ val timestamp: Instant,
+ val duration: Duration,
+ val cpuCount: Int,
+ val cpuUsage: Double
+)
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
new file mode 100644
index 00000000..97aa00b2
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.*
+import org.opendc.trace.TableColumn
+import org.opendc.trace.conv.*
+
+/**
+ * A [ReadSupport] instance for [ResourceState] objects.
+ */
+internal class ResourceStateReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<ResourceState>() {
+ /**
+ * Mapping from field names to [TableColumn]s.
+ */
+ private val fieldMap = mapOf<String, TableColumn<*>>(
+ "id" to RESOURCE_ID,
+ "time" to RESOURCE_STATE_TIMESTAMP,
+ "timestamp" to RESOURCE_STATE_TIMESTAMP,
+ "duration" to RESOURCE_STATE_DURATION,
+ "cores" to RESOURCE_CPU_COUNT,
+ "cpu_count" to RESOURCE_CPU_COUNT,
+ "cpuUsage" to RESOURCE_STATE_CPU_USAGE,
+ "cpu_usage" to RESOURCE_STATE_CPU_USAGE,
+ )
+
+ override fun init(context: InitContext): ReadContext {
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val projectionSet = projection.toSet()
+
+ for (field in READ_SCHEMA.fields) {
+ val col = fieldMap[field.name] ?: continue
+ if (col in projectionSet) {
+ addField(field)
+ }
+ }
+ }
+ .named(READ_SCHEMA.name)
+ } else {
+ READ_SCHEMA
+ }
+
+ return ReadContext(projectedSchema)
+ }
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext
+ ): RecordMaterializer<ResourceState> = ResourceStateRecordMaterializer(readContext.requestedSchema)
+
+ companion object {
+ /**
+ * Parquet read schema (version 2.0) for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_0: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cores"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpuUsage")
+ )
+ .named("resource_state")
+
+ /**
+ * Parquet read schema (version 2.1) for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_1: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_usage")
+ )
+ .named("resource_state")
+
+ /**
+ * Parquet read schema for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0.union(READ_SCHEMA_V2_1)
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt
new file mode 100644
index 00000000..f8b0c3c2
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt
@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.parquet.io.api.*
+import org.apache.parquet.schema.MessageType
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A [RecordMaterializer] for [ResourceState] records.
+ */
+internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMaterializer<ResourceState>() {
+ /**
+ * State of current record being read.
+ */
+ private var _id = ""
+ private var _timestamp = Instant.MIN
+ private var _duration = Duration.ZERO
+ private var _cpuCount = 0
+ private var _cpuUsage = 0.0
+
+ /**
+ * Root converter for the record.
+ */
+ private val root = object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters = schema.fields.map { type ->
+ when (type.name) {
+ "id" -> object : PrimitiveConverter() {
+ override fun addBinary(value: Binary) {
+ _id = value.toStringUsingUTF8()
+ }
+ }
+ "timestamp", "time" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _timestamp = Instant.ofEpochMilli(value)
+ }
+ }
+ "duration" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _duration = Duration.ofMillis(value)
+ }
+ }
+ "cpu_count", "cores" -> object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ _cpuCount = value
+ }
+ }
+ "cpu_usage", "cpuUsage" -> object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ _cpuUsage = value
+ }
+ }
+ "flops" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ // Ignore to support v1 format
+ }
+ }
+ else -> error("Unknown column $type")
+ }
+ }
+
+ override fun start() {
+ _id = ""
+ _timestamp = Instant.MIN
+ _duration = Duration.ZERO
+ _cpuCount = 0
+ _cpuUsage = 0.0
+ }
+
+ override fun end() {}
+
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
+
+ override fun getCurrentRecord(): ResourceState = ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage)
+
+ override fun getRootConverter(): GroupConverter = root
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt
new file mode 100644
index 00000000..e2f3df31
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
+
+/**
+ * Support for writing [Resource] instances to Parquet format.
+ */
+internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() {
+ /**
+ * The current active record consumer.
+ */
+ private lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(WRITE_SCHEMA, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: ResourceState) {
+ write(recordConsumer, record)
+ }
+
+ private fun write(consumer: RecordConsumer, record: ResourceState) {
+ consumer.startMessage()
+
+ consumer.startField("id", 0)
+ consumer.addBinary(Binary.fromCharSequence(record.id))
+ consumer.endField("id", 0)
+
+ consumer.startField("timestamp", 1)
+ consumer.addLong(record.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 1)
+
+ consumer.startField("duration", 2)
+ consumer.addLong(record.duration.toMillis())
+ consumer.endField("duration", 2)
+
+ consumer.startField("cpu_count", 3)
+ consumer.addInteger(record.cpuCount)
+ consumer.endField("cpu_count", 3)
+
+ consumer.startField("cpu_usage", 4)
+ consumer.addDouble(record.cpuUsage)
+ consumer.endField("cpu_usage", 4)
+
+ consumer.endMessage()
+ }
+
+ companion object {
+ /**
+ * Parquet schema for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val WRITE_SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_usage")
+ )
+ .named("resource_state")
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt
new file mode 100644
index 00000000..14cadabb
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
+import kotlin.math.roundToLong
+
+/**
+ * Support for writing [Resource] instances to Parquet format.
+ */
+internal class ResourceWriteSupport : WriteSupport<Resource>() {
+ /**
+ * The current active record consumer.
+ */
+ private lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(WRITE_SCHEMA, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: Resource) {
+ write(recordConsumer, record)
+ }
+
+ private fun write(consumer: RecordConsumer, record: Resource) {
+ consumer.startMessage()
+
+ consumer.startField("id", 0)
+ consumer.addBinary(Binary.fromCharSequence(record.id))
+ consumer.endField("id", 0)
+
+ consumer.startField("start_time", 1)
+ consumer.addLong(record.startTime.toEpochMilli())
+ consumer.endField("start_time", 1)
+
+ consumer.startField("stop_time", 2)
+ consumer.addLong(record.stopTime.toEpochMilli())
+ consumer.endField("stop_time", 2)
+
+ consumer.startField("cpu_count", 3)
+ consumer.addInteger(record.cpuCount)
+ consumer.endField("cpu_count", 3)
+
+ consumer.startField("cpu_capacity", 4)
+ consumer.addDouble(record.cpuCapacity)
+ consumer.endField("cpu_capacity", 4)
+
+ consumer.startField("mem_capacity", 5)
+ consumer.addLong(record.memCapacity.roundToLong())
+ consumer.endField("mem_capacity", 5)
+
+ consumer.endMessage()
+ }
+
+ companion object {
+ /**
+ * Parquet schema for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val WRITE_SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("start_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("stop_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ )
+ .named("resource")
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
index c8742624..1f4f6195 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
@@ -29,7 +29,9 @@ import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.opendc.trace.conv.*
+import java.nio.file.Files
import java.nio.file.Paths
+import java.time.Instant
/**
* Test suite for the [OdcVmTraceFormat] implementation.
@@ -61,11 +63,12 @@ internal class OdcVmTraceFormatTest {
@ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
fun testResources(name: String) {
val path = Paths.get("src/test/resources/$name")
- val reader = format.newReader(path, TABLE_RESOURCES)
+ val reader = format.newReader(path, TABLE_RESOURCES, listOf(RESOURCE_ID, RESOURCE_START_TIME))
assertAll(
{ assertTrue(reader.nextRow()) },
{ assertEquals("1019", reader.get(RESOURCE_ID)) },
+ { assertEquals(Instant.ofEpochMilli(1376314846000), reader.get(RESOURCE_START_TIME)) },
{ assertTrue(reader.nextRow()) },
{ assertEquals("1023", reader.get(RESOURCE_ID)) },
{ assertTrue(reader.nextRow()) },
@@ -78,11 +81,46 @@ internal class OdcVmTraceFormatTest {
reader.close()
}
+ @Test
+ fun testResourcesWrite() {
+ val path = Files.createTempDirectory("opendc")
+ val writer = format.newWriter(path, TABLE_RESOURCES)
+
+ writer.startRow()
+ writer.set(RESOURCE_ID, "1019")
+ writer.set(RESOURCE_START_TIME, Instant.EPOCH)
+ writer.set(RESOURCE_STOP_TIME, Instant.EPOCH)
+ writer.setInt(RESOURCE_CPU_COUNT, 1)
+ writer.setDouble(RESOURCE_CPU_CAPACITY, 1024.0)
+ writer.setDouble(RESOURCE_MEM_CAPACITY, 1024.0)
+ writer.endRow()
+ writer.close()
+
+ val reader = format.newReader(path, TABLE_RESOURCES, null)
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("1019", reader.get(RESOURCE_ID)) },
+ { assertEquals(Instant.EPOCH, reader.get(RESOURCE_START_TIME)) },
+ { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STOP_TIME)) },
+ { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
+ { assertEquals(1024.0, reader.getDouble(RESOURCE_CPU_CAPACITY)) },
+ { assertEquals(1024.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) },
+ { assertFalse(reader.nextRow()) },
+ )
+
+ reader.close()
+ }
+
@ParameterizedTest
@ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
fun testSmoke(name: String) {
val path = Paths.get("src/test/resources/$name")
- val reader = format.newReader(path, TABLE_RESOURCE_STATES)
+ val reader = format.newReader(
+ path,
+ TABLE_RESOURCE_STATES,
+ listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_CPU_USAGE)
+ )
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -95,9 +133,40 @@ internal class OdcVmTraceFormatTest {
}
@Test
+ fun testResourceStatesWrite() {
+ val path = Files.createTempDirectory("opendc")
+ val writer = format.newWriter(path, TABLE_RESOURCE_STATES)
+
+ writer.startRow()
+ writer.set(RESOURCE_ID, "1019")
+ writer.set(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH)
+ writer.setDouble(RESOURCE_STATE_CPU_USAGE, 23.0)
+ writer.setInt(RESOURCE_CPU_COUNT, 1)
+ writer.endRow()
+ writer.close()
+
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("1019", reader.get(RESOURCE_ID)) },
+ { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STATE_TIMESTAMP)) },
+ { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
+ { assertEquals(23.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE)) },
+ { assertFalse(reader.nextRow()) },
+ )
+
+ reader.close()
+ }
+
+ @Test
fun testInterferenceGroups() {
val path = Paths.get("src/test/resources/trace-v2.1")
- val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS)
+ val reader = format.newReader(
+ path,
+ TABLE_INTERFERENCE_GROUPS,
+ listOf(INTERFERENCE_GROUP_MEMBERS, INTERFERENCE_GROUP_TARGET, INTERFERENCE_GROUP_SCORE)
+ )
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -117,7 +186,7 @@ internal class OdcVmTraceFormatTest {
@Test
fun testInterferenceGroupsEmpty() {
val path = Paths.get("src/test/resources/trace-v2.0")
- val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS)
+ val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS, listOf(INTERFERENCE_GROUP_MEMBERS))
assertFalse(reader.nextRow())
reader.close()
diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts
index 302c0b14..e6415586 100644
--- a/opendc-trace/opendc-trace-parquet/build.gradle.kts
+++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts
@@ -32,7 +32,7 @@ dependencies {
api(libs.parquet) {
exclude(group = "org.apache.hadoop")
}
- runtimeOnly(libs.hadoop.common) {
+ api(libs.hadoop.common) {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
exclude(group = "log4j")
exclude(group = "org.apache.hadoop")
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
index ef9eaeb3..eef83956 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
@@ -22,8 +22,8 @@
package org.opendc.trace.util.parquet
-import org.apache.parquet.avro.AvroParquetReader
import org.apache.parquet.hadoop.ParquetReader
+import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.io.InputFile
import java.io.File
import java.io.IOException
@@ -32,11 +32,17 @@ import java.nio.file.Path
import kotlin.io.path.isDirectory
/**
- * A helper class to read Parquet files.
+ * A helper class to read Parquet files from the filesystem.
+ *
+ * This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets.
*
* @param path The path to the Parquet file or directory to read.
+ * @param readSupport Helper class to perform conversion from Parquet to [T].
*/
-public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
+public class LocalParquetReader<out T>(
+ path: Path,
+ private val readSupport: ReadSupport<T>
+) : AutoCloseable {
/**
* The input files to process.
*/
@@ -57,7 +63,7 @@ public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
/**
* Construct a [LocalParquetReader] for the specified [file].
*/
- public constructor(file: File) : this(file.toPath())
+ public constructor(file: File, readSupport: ReadSupport<T>) : this(file.toPath(), readSupport)
/**
* Read a single entry in the Parquet file.
@@ -93,20 +99,24 @@ public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
private fun initReader() {
reader?.close()
- this.reader = if (filesIterator.hasNext()) {
- createReader(filesIterator.next())
- } else {
- null
+ try {
+ this.reader = if (filesIterator.hasNext()) {
+ createReader(filesIterator.next())
+ } else {
+ null
+ }
+ } catch (e: Throwable) {
+ this.reader = null
+ throw e
}
}
/**
- * Create a Parquet reader for the specified file.
+ * Construct a [ParquetReader] for the specified [input] with a custom [ReadSupport].
*/
private fun createReader(input: InputFile): ParquetReader<T> {
- return AvroParquetReader
- .builder<T>(input)
- .disableCompatibility()
- .build()
+ return object : ParquetReader.Builder<T>(input) {
+ override fun getReadSupport(): ReadSupport<@UnsafeVariance T> = this@LocalParquetReader.readSupport
+ }.build()
}
}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt
new file mode 100644
index 00000000..b5eb1deb
--- /dev/null
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.util.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.OutputFile
+import java.nio.file.Path
+
+/**
+ * Helper class for writing Parquet records to local disk.
+ */
+public class LocalParquetWriter {
+ /**
+ * A [ParquetWriter.Builder] implementation supporting custom [OutputFile]s and [WriteSupport] implementations.
+ */
+ public class Builder<T> internal constructor(
+ output: OutputFile,
+ private val writeSupport: WriteSupport<T>
+ ) : ParquetWriter.Builder<T, Builder<T>>(output) {
+ override fun self(): Builder<T> = this
+
+ override fun getWriteSupport(conf: Configuration): WriteSupport<T> = writeSupport
+ }
+
+ public companion object {
+ /**
+ * Create a [Builder] instance that writes a Parquet file at the specified [path].
+ */
+ @JvmStatic
+ public fun <T> builder(path: Path, writeSupport: WriteSupport<T>): Builder<T> =
+ Builder(LocalOutputFile(path), writeSupport)
+ }
+}
diff --git a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
index 8ef4d1fb..be354319 100644
--- a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
+++ b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
@@ -22,36 +22,81 @@
package org.opendc.trace.util.parquet
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.parquet.avro.AvroParquetReader
-import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.ParquetFileWriter
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.Converter
+import org.apache.parquet.io.api.GroupConverter
+import org.apache.parquet.io.api.PrimitiveConverter
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.Type
+import org.apache.parquet.schema.Types
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
-import java.io.File
import java.nio.file.FileAlreadyExistsException
+import java.nio.file.Files
import java.nio.file.NoSuchFileException
+import java.nio.file.Path
/**
* Test suite for the Parquet helper classes.
*/
internal class ParquetTest {
- private val schema = SchemaBuilder
- .record("test")
- .namespace("org.opendc.format.util")
- .fields()
- .name("field").type().intType().noDefault()
- .endRecord()
+ private lateinit var path: Path
- private lateinit var file: File
+ private val schema = Types.buildMessage()
+ .addField(
+ Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED)
+ .named("field")
+ )
+ .named("test")
+ private val writeSupport = object : WriteSupport<Int>() {
+ lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(schema, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: Int) {
+ val consumer = recordConsumer
+
+ consumer.startMessage()
+ consumer.startField("field", 0)
+ consumer.addInteger(record)
+ consumer.endField("field", 0)
+ consumer.endMessage()
+ }
+ }
+
+ private val readSupport = object : ReadSupport<Int>() {
+ override fun init(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType
+ ): ReadContext = ReadContext(fileSchema)
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext
+ ): RecordMaterializer<Int> = TestRecordMaterializer()
+ }
/**
- * Setup the test
+ * Set up the test
*/
@BeforeEach
fun setUp() {
- file = File.createTempFile("opendc", "parquet")
+ path = Files.createTempFile("opendc", "parquet")
}
/**
@@ -59,7 +104,7 @@ internal class ParquetTest {
*/
@AfterEach
fun tearDown() {
- file.delete()
+ Files.deleteIfExists(path)
}
/**
@@ -68,29 +113,24 @@ internal class ParquetTest {
@Test
fun testSmoke() {
val n = 4
- val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(file))
- .withSchema(schema)
+ val writer = LocalParquetWriter.builder(path, writeSupport)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()
try {
repeat(n) { i ->
- val record = GenericData.Record(schema)
- record.put("field", i)
- writer.write(record)
+ writer.write(i)
}
} finally {
writer.close()
}
- val reader = AvroParquetReader.builder<GenericData.Record>(LocalInputFile(file))
- .build()
-
+ val reader = LocalParquetReader(path, readSupport)
var counter = 0
try {
while (true) {
val record = reader.read() ?: break
- assertEquals(counter++, record.get("field"))
+ assertEquals(counter++, record)
}
} finally {
reader.close()
@@ -105,9 +145,7 @@ internal class ParquetTest {
@Test
fun testOverwrite() {
assertThrows<FileAlreadyExistsException> {
- AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(file))
- .withSchema(schema)
- .build()
+ LocalParquetWriter.builder(path, writeSupport).build()
}
}
@@ -116,10 +154,30 @@ internal class ParquetTest {
*/
@Test
fun testNonExistent() {
- file.delete()
+ Files.deleteIfExists(path)
assertThrows<NoSuchFileException> {
- AvroParquetReader.builder<GenericData.Record>(LocalInputFile(file))
- .build()
+ LocalParquetReader(path, readSupport)
+ }
+ }
+
+ private class TestRecordMaterializer : RecordMaterializer<Int>() {
+ private var current: Int = 0
+ private val fieldConverter = object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ current = value
+ }
+ }
+ private val root = object : GroupConverter() {
+ override fun getConverter(fieldIndex: Int): Converter {
+ require(fieldIndex == 0)
+ return fieldConverter
+ }
+ override fun start() {}
+ override fun end() {}
}
+
+ override fun getCurrentRecord(): Int = current
+
+ override fun getRootConverter(): GroupConverter = root
}
}
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
index b969f3ef..916a5eca 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
@@ -64,7 +64,7 @@ public class SwfTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader())
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
index 1698f644..c3d644e8 100644
--- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
@@ -58,7 +58,7 @@ internal class SwfTraceFormatTest {
@Test
fun testReader() {
val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI())
- val reader = format.newReader(path, TABLE_TASKS)
+ val reader = format.newReader(path, TABLE_TASKS, null)
assertAll(
{ assertTrue(reader.nextRow()) },
diff --git a/opendc-trace/opendc-trace-tools/build.gradle.kts b/opendc-trace/opendc-trace-tools/build.gradle.kts
index 0c1e179e..e98fb932 100644
--- a/opendc-trace/opendc-trace-tools/build.gradle.kts
+++ b/opendc-trace/opendc-trace-tools/build.gradle.kts
@@ -29,16 +29,22 @@ plugins {
}
application {
- mainClass.set("org.opendc.trace.tools.TraceConverter")
+ mainClass.set("org.opendc.trace.tools.TraceTools")
}
dependencies {
implementation(projects.opendcTrace.opendcTraceApi)
+ implementation(projects.opendcTrace.opendcTraceCalcite)
implementation(libs.kotlin.logging)
implementation(libs.clikt)
+ implementation(libs.jline)
runtimeOnly(projects.opendcTrace.opendcTraceOpendc)
runtimeOnly(projects.opendcTrace.opendcTraceBitbrains)
runtimeOnly(projects.opendcTrace.opendcTraceAzure)
+ runtimeOnly(projects.opendcTrace.opendcTraceGwf)
+ runtimeOnly(projects.opendcTrace.opendcTraceSwf)
+ runtimeOnly(projects.opendcTrace.opendcTraceWfformat)
+ runtimeOnly(projects.opendcTrace.opendcTraceWtf)
runtimeOnly(libs.log4j.slf4j)
}
diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt
index c71035d4..970de0f4 100644
--- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
+++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt
@@ -20,7 +20,6 @@
* SOFTWARE.
*/
-@file:JvmName("TraceConverter")
package org.opendc.trace.tools
import com.github.ajalt.clikt.core.CliktCommand
@@ -44,14 +43,9 @@ import kotlin.math.max
import kotlin.math.min
/**
- * A script to convert a trace in text format into a Parquet trace.
+ * A [CliktCommand] that can convert between workload trace formats.
*/
-fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
-
-/**
- * Represents the command for converting traces
- */
-internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
+internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert between workload trace formats") {
/**
* The logger instance for the converter.
*/
@@ -73,7 +67,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
/**
* The input format of the trace.
*/
- private val inputFormat by option("-f", "--input-format", help = "format of output trace")
+ private val inputFormat by option("-f", "--input-format", help = "format of input trace")
.required()
/**
diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt
new file mode 100644
index 00000000..b0f95de2
--- /dev/null
+++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt
@@ -0,0 +1,159 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.tools
+
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.parameters.arguments.argument
+import com.github.ajalt.clikt.parameters.options.option
+import com.github.ajalt.clikt.parameters.options.required
+import com.github.ajalt.clikt.parameters.types.file
+import org.apache.calcite.jdbc.CalciteConnection
+import org.jline.builtins.Styles
+import org.jline.console.Printer
+import org.jline.console.impl.DefaultPrinter
+import org.jline.terminal.Terminal
+import org.jline.terminal.TerminalBuilder
+import org.jline.utils.AttributedStringBuilder
+import org.opendc.trace.Trace
+import org.opendc.trace.calcite.TraceSchema
+import java.nio.charset.StandardCharsets
+import java.sql.DriverManager
+import java.sql.ResultSet
+import java.sql.ResultSetMetaData
+import java.util.*
+
+/**
+ * A [CliktCommand] that allows users to query workload traces using SQL.
+ */
+internal class QueryCommand : CliktCommand(name = "query", help = "Query workload traces") {
+ /**
+ * The trace to open.
+ */
+ private val input by option("-i", "--input")
+ .file(mustExist = true)
+ .required()
+
+ /**
+ * The input format of the trace.
+ */
+ private val inputFormat by option("-f", "--format", help = "format of the trace")
+ .required()
+
+ /**
+ * The query to execute.
+ */
+ private val query by argument()
+
+ /**
+ * Access to the terminal.
+ */
+ private val terminal = TerminalBuilder.builder()
+ .system(false)
+ .streams(System.`in`, System.out)
+ .encoding(StandardCharsets.UTF_8)
+ .build()
+
+ /**
+ * Helper class to print results to console.
+ */
+ private val printer = QueryPrinter(terminal)
+
+ override fun run() {
+ val inputTrace = Trace.open(input, format = inputFormat)
+ val info = Properties().apply { this["lex"] = "JAVA" }
+ val connection = DriverManager.getConnection("jdbc:calcite:", info).unwrap(CalciteConnection::class.java)
+ connection.rootSchema.add("trace", TraceSchema(inputTrace))
+ connection.schema = "trace"
+
+ val stmt = connection.createStatement()
+ stmt.executeQuery(query)
+
+ val start = System.currentTimeMillis()
+ val hasResults = stmt.execute(query)
+
+ try {
+ if (hasResults) {
+ do {
+ stmt.resultSet.use { rs ->
+ val count: Int = printResults(rs)
+ val duration = (System.currentTimeMillis() - start) / 1000.0
+ printer.println("$count rows selected (${"%.3f".format(duration)} seconds)")
+ }
+ } while (stmt.moreResults)
+ } else {
+ val count: Int = stmt.updateCount
+ val duration = (System.currentTimeMillis() - start) / 1000.0
+
+ printer.println("$count rows affected (${"%0.3f".format(duration)} seconds)")
+ }
+ } finally {
+ stmt.close()
+ connection.close()
+ }
+ }
+
+ /**
+ * Helper function to print the results to console.
+ */
+ private fun printResults(rs: ResultSet): Int {
+ var count = 0
+ val meta: ResultSetMetaData = rs.metaData
+
+ val options = mapOf(
+ Printer.COLUMNS to List(meta.columnCount) { meta.getColumnName(it + 1) },
+ Printer.BORDER to "|",
+ )
+ val data = mutableListOf<Map<String, Any>>()
+
+ while (rs.next()) {
+ val row = mutableMapOf<String, Any>()
+ for (i in 1..meta.columnCount) {
+ row[meta.getColumnName(i)] = rs.getObject(i)
+ }
+ data.add(row)
+
+ count++
+ }
+
+ printer.println(options, data)
+
+ return count
+ }
+
+ /**
+ * Helper class to print the results of the query.
+ */
+ private class QueryPrinter(private val terminal: Terminal) : DefaultPrinter(null) {
+ override fun terminal(): Terminal = terminal
+
+ override fun highlightAndPrint(options: MutableMap<String, Any>, exception: Throwable) {
+ if (options.getOrDefault("exception", "stack") == "stack") {
+ exception.printStackTrace()
+ } else {
+ val asb = AttributedStringBuilder()
+ asb.append(exception.message, Styles.prntStyle().resolve(".em"))
+ asb.toAttributedString().println(terminal())
+ }
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceTools.kt
index 086b900b..b480484b 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt
+++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceTools.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,25 +20,25 @@
* SOFTWARE.
*/
-@file:JvmName("AvroUtils")
-package org.opendc.trace.util.parquet
+@file:JvmName("TraceTools")
+package org.opendc.trace.tools
-import org.apache.avro.LogicalTypes
-import org.apache.avro.Schema
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.core.subcommands
/**
- * Schema for UUID type.
+ * A script for querying and manipulating workload traces supported by OpenDC.
*/
-public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING))
+fun main(args: Array<String>): Unit = TraceToolsCli().main(args)
/**
- * Schema for timestamp type.
+ * The primary [CliktCommand] for the trace tools offered by OpenDC.
*/
-public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))
+class TraceToolsCli : CliktCommand(name = "trace-tools") {
+ init {
+ subcommands(QueryCommand())
+ subcommands(ConvertCommand())
+ }
-/**
- * Helper function to make a [Schema] field optional.
- */
-public fun Schema.optional(): Schema {
- return Schema.createUnion(Schema.create(Schema.Type.NULL), this)
+ override fun run() {}
}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
index bc175b58..8db4c169 100644
--- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
@@ -63,7 +63,7 @@ public class WfFormatTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile()))
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
index 710de88e..4a8b2792 100644
--- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
@@ -62,7 +62,7 @@ class WfFormatTraceFormatTest {
@Test
fun testTableReader() {
val path = Paths.get("src/test/resources/trace.json")
- val reader = format.newReader(path, TABLE_TASKS)
+ val reader = format.newReader(path, TABLE_TASKS, null)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -89,7 +89,7 @@ class WfFormatTraceFormatTest {
@Test
fun testTableReaderFull() {
val path = Paths.get("src/test/resources/trace.json")
- val reader = format.newReader(path, TABLE_TASKS)
+ val reader = format.newReader(path, TABLE_TASKS, null)
assertDoesNotThrow {
while (reader.nextRow()) {
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
index 1e332aca..f0db78b7 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
@@ -22,38 +22,30 @@
package org.opendc.trace.wtf
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.conv.*
import org.opendc.trace.util.parquet.LocalParquetReader
-import java.time.Duration
-import java.time.Instant
+import org.opendc.trace.wtf.parquet.Task
/**
* A [TableReader] implementation for the WTF format.
*/
-internal class WtfTaskTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) : TableReader {
/**
* The current record.
*/
- private var record: GenericRecord? = null
-
- /**
- * A flag to indicate that the columns have been initialized.
- */
- private var hasInitializedColumns = false
+ private var record: Task? = null
override fun nextRow(): Boolean {
- val record = reader.read()
- this.record = record
+ try {
+ val record = reader.read()
+ this.record = record
- if (!hasInitializedColumns && record != null) {
- initColumns(record.schema)
- hasInitializedColumns = true
+ return record != null
+ } catch (e: Throwable) {
+ this.record = null
+ throw e
}
-
- return record != null
}
override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
@@ -65,16 +57,15 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
override fun get(index: Int): Any? {
val record = checkNotNull(record) { "Reader in invalid state" }
- @Suppress("UNCHECKED_CAST")
return when (index) {
- COL_ID -> (record[AVRO_COL_ID] as Long).toString()
- COL_WORKFLOW_ID -> (record[AVRO_COL_WORKFLOW_ID] as Long).toString()
- COL_SUBMIT_TIME -> Instant.ofEpochMilli(record[AVRO_COL_SUBMIT_TIME] as Long)
- COL_WAIT_TIME -> Duration.ofMillis(record[AVRO_COL_WAIT_TIME] as Long)
- COL_RUNTIME -> Duration.ofMillis(record[AVRO_COL_RUNTIME] as Long)
+ COL_ID -> record.id
+ COL_WORKFLOW_ID -> record.workflowId
+ COL_SUBMIT_TIME -> record.submitTime
+ COL_WAIT_TIME -> record.waitTime
+ COL_RUNTIME -> record.runtime
COL_REQ_NCPUS, COL_GROUP_ID, COL_USER_ID -> getInt(index)
- COL_PARENTS -> (record[AVRO_COL_PARENTS] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
- COL_CHILDREN -> (record[AVRO_COL_CHILDREN] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
+ COL_PARENTS -> record.parents
+ COL_CHILDREN -> record.children
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -87,9 +78,9 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_REQ_NCPUS -> (record[AVRO_COL_REQ_NCPUS] as Double).toInt()
- COL_GROUP_ID -> record[AVRO_COL_GROUP_ID] as Int
- COL_USER_ID -> record[AVRO_COL_USER_ID] as Int
+ COL_REQ_NCPUS -> record.requestedCpus
+ COL_GROUP_ID -> record.groupId
+ COL_USER_ID -> record.userId
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -106,38 +97,6 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
reader.close()
}
- /**
- * Initialize the columns for the reader based on [schema].
- */
- private fun initColumns(schema: Schema) {
- try {
- AVRO_COL_ID = schema.getField("id").pos()
- AVRO_COL_WORKFLOW_ID = schema.getField("workflow_id").pos()
- AVRO_COL_SUBMIT_TIME = schema.getField("ts_submit").pos()
- AVRO_COL_WAIT_TIME = schema.getField("wait_time").pos()
- AVRO_COL_RUNTIME = schema.getField("runtime").pos()
- AVRO_COL_REQ_NCPUS = schema.getField("resource_amount_requested").pos()
- AVRO_COL_PARENTS = schema.getField("parents").pos()
- AVRO_COL_CHILDREN = schema.getField("children").pos()
- AVRO_COL_GROUP_ID = schema.getField("group_id").pos()
- AVRO_COL_USER_ID = schema.getField("user_id").pos()
- } catch (e: NullPointerException) {
- // This happens when the field we are trying to access does not exist
- throw IllegalArgumentException("Invalid schema", e)
- }
- }
-
- private var AVRO_COL_ID = -1
- private var AVRO_COL_WORKFLOW_ID = -1
- private var AVRO_COL_SUBMIT_TIME = -1
- private var AVRO_COL_WAIT_TIME = -1
- private var AVRO_COL_RUNTIME = -1
- private var AVRO_COL_REQ_NCPUS = -1
- private var AVRO_COL_PARENTS = -1
- private var AVRO_COL_CHILDREN = -1
- private var AVRO_COL_GROUP_ID = -1
- private var AVRO_COL_USER_ID = -1
-
private val COL_ID = 0
private val COL_WORKFLOW_ID = 1
private val COL_SUBMIT_TIME = 2
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
index c8f9ecaa..e71253ac 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
@@ -22,12 +22,12 @@
package org.opendc.trace.wtf
-import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.conv.*
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
import org.opendc.trace.util.parquet.LocalParquetReader
+import org.opendc.trace.wtf.parquet.TaskReadSupport
import java.nio.file.Path
/**
@@ -63,10 +63,10 @@ public class WtfTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_TASKS -> {
- val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0"))
+ val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection))
WtfTaskTableReader(reader)
}
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt
new file mode 100644
index 00000000..71557f96
--- /dev/null
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wtf.parquet
+
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A task in the Workflow Trace Format.
+ */
+internal data class Task(
+ val id: String,
+ val workflowId: String,
+ val submitTime: Instant,
+ val waitTime: Duration,
+ val runtime: Duration,
+ val requestedCpus: Int,
+ val groupId: Int,
+ val userId: Int,
+ val parents: Set<String>,
+ val children: Set<String>
+)
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
new file mode 100644
index 00000000..8e7325de
--- /dev/null
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
@@ -0,0 +1,134 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wtf.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.*
+import org.opendc.trace.TableColumn
+import org.opendc.trace.conv.*
+
+/**
+ * A [ReadSupport] instance for [Task] objects.
+ *
+ * @param projection The projection of the table to read.
+ */
+internal class TaskReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Task>() {
+ /**
+ * Mapping of table columns to their Parquet column names.
+ */
+ private val colMap = mapOf<TableColumn<*>, String>(
+ TASK_ID to "id",
+ TASK_WORKFLOW_ID to "workflow_id",
+ TASK_SUBMIT_TIME to "ts_submit",
+ TASK_WAIT_TIME to "wait_time",
+ TASK_RUNTIME to "runtime",
+ TASK_REQ_NCPUS to "resource_amount_requested",
+ TASK_PARENTS to "parents",
+ TASK_CHILDREN to "children",
+ TASK_GROUP_ID to "group_id",
+ TASK_USER_ID to "user_id"
+ )
+
+ override fun init(context: InitContext): ReadContext {
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val fieldByName = READ_SCHEMA.fields.associateBy { it.name }
+
+ for (col in projection) {
+ val fieldName = colMap[col] ?: continue
+ addField(fieldByName.getValue(fieldName))
+ }
+ }
+ .named(READ_SCHEMA.name)
+ } else {
+ READ_SCHEMA
+ }
+ return ReadContext(projectedSchema)
+ }
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext
+ ): RecordMaterializer<Task> = TaskRecordMaterializer(readContext.requestedSchema)
+
+ companion object {
+ /**
+ * Parquet read schema for the "tasks" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("workflow_id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("ts_submit"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("wait_time"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("runtime"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("resource_amount_requested"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("user_id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("group_id"),
+ Types
+ .buildGroup(Type.Repetition.OPTIONAL)
+ .addField(
+ Types.repeatedGroup()
+ .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
+ .named("list")
+ )
+ .`as`(LogicalTypeAnnotation.listType())
+ .named("children"),
+ Types
+ .buildGroup(Type.Repetition.OPTIONAL)
+ .addField(
+ Types.repeatedGroup()
+ .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
+ .named("list")
+ )
+ .`as`(LogicalTypeAnnotation.listType())
+ .named("parents"),
+ )
+ .named("task")
+ }
+}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt
new file mode 100644
index 00000000..08da5eaf
--- /dev/null
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt
@@ -0,0 +1,165 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wtf.parquet
+
+import org.apache.parquet.io.api.*
+import org.apache.parquet.schema.MessageType
+import java.time.Duration
+import java.time.Instant
+import kotlin.math.roundToInt
+
+/**
+ * A [RecordMaterializer] for [Task] records.
+ */
+internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<Task>() {
+ /**
+ * State of current record being read.
+ */
+ private var _id = ""
+ private var _workflowId = ""
+ private var _submitTime = Instant.MIN
+ private var _waitTime = Duration.ZERO
+ private var _runtime = Duration.ZERO
+ private var _requestedCpus = 0
+ private var _groupId = 0
+ private var _userId = 0
+ private var _parents = mutableSetOf<String>()
+ private var _children = mutableSetOf<String>()
+
+ /**
+ * Root converter for the record.
+ */
+ private val root = object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters = schema.fields.map { type ->
+ when (type.name) {
+ "id" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _id = value.toString()
+ }
+ }
+ "workflow_id" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _workflowId = value.toString()
+ }
+ }
+ "ts_submit" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _submitTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "wait_time" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _waitTime = Duration.ofMillis(value)
+ }
+ }
+ "runtime" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _runtime = Duration.ofMillis(value)
+ }
+ }
+ "resource_amount_requested" -> object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ _requestedCpus = value.roundToInt()
+ }
+ }
+ "group_id" -> object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ _groupId = value
+ }
+ }
+ "user_id" -> object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ _userId = value
+ }
+ }
+ "children" -> RelationConverter(_children)
+ "parents" -> RelationConverter(_parents)
+ else -> error("Unknown column $type")
+ }
+ }
+
+ override fun start() {
+ _id = ""
+ _workflowId = ""
+ _submitTime = Instant.MIN
+ _waitTime = Duration.ZERO
+ _runtime = Duration.ZERO
+ _requestedCpus = 0
+ _groupId = 0
+ _userId = 0
+ _parents.clear()
+ _children.clear()
+ }
+
+ override fun end() {}
+
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
+
+ override fun getCurrentRecord(): Task = Task(
+ _id,
+ _workflowId,
+ _submitTime,
+ _waitTime,
+ _runtime,
+ _requestedCpus,
+ _groupId,
+ _userId,
+ _parents.toSet(),
+ _children.toSet()
+ )
+
+ override fun getRootConverter(): GroupConverter = root
+
+ /**
+ * Helper class to convert parent and child relations and add them to [relations].
+ */
+ private class RelationConverter(private val relations: MutableSet<String>) : GroupConverter() {
+ private val entryConverter = object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ relations.add(value.toString())
+ }
+ }
+
+ private val listConverter = object : GroupConverter() {
+ override fun getConverter(fieldIndex: Int): Converter {
+ require(fieldIndex == 0)
+ return entryConverter
+ }
+
+ override fun start() {}
+ override fun end() {}
+ }
+
+ override fun getConverter(fieldIndex: Int): Converter {
+ require(fieldIndex == 0)
+ return listConverter
+ }
+
+ override fun start() {}
+ override fun end() {}
+ }
+}
diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
index 0f0e422d..c0eb3f08 100644
--- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
@@ -61,7 +61,7 @@ class WtfTraceFormatTest {
@Test
fun testTableReader() {
val path = Paths.get("src/test/resources/wtf-trace")
- val reader = format.newReader(path, TABLE_TASKS)
+ val reader = format.newReader(path, TABLE_TASKS, listOf(TASK_ID, TASK_WORKFLOW_ID, TASK_SUBMIT_TIME, TASK_RUNTIME, TASK_PARENTS))
assertAll(
{ assertTrue(reader.nextRow()) },
diff --git a/settings.gradle.kts b/settings.gradle.kts
index cc454b19..a779edcc 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -59,6 +59,7 @@ include(":opendc-trace:opendc-trace-bitbrains")
include(":opendc-trace:opendc-trace-azure")
include(":opendc-trace:opendc-trace-opendc")
include(":opendc-trace:opendc-trace-parquet")
+include(":opendc-trace:opendc-trace-calcite")
include(":opendc-trace:opendc-trace-tools")
include(":opendc-harness:opendc-harness-api")
include(":opendc-harness:opendc-harness-engine")