diff options
74 files changed, 3426 insertions, 604 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 43568067..b05af368 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,4 +1,5 @@ [versions] +calcite = "1.30.0" classgraph = "4.8.143" clikt = "3.4.1" config = "1.4.2" @@ -8,6 +9,7 @@ gradle-node = "3.2.1" hadoop = "3.3.1" jackson = "2.13.2" jandex-gradle = "0.12.0" +jline = "3.21.0" jmh-gradle = "0.6.6" jakarta-validation = "2.0.2" junit-jupiter = "5.8.2" @@ -68,7 +70,7 @@ jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", ver jackson-module-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" } jackson-datatype-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" } jackson-dataformat-csv = { module = "com.fasterxml.jackson.dataformat:jackson-dataformat-csv", version.ref = "jackson" } -parquet = { module = "org.apache.parquet:parquet-avro", version.ref = "parquet" } +parquet = { module = "org.apache.parquet:parquet-hadoop", version.ref = "parquet" } config = { module = "com.typesafe:config", version.ref = "config" } # Quarkus @@ -101,6 +103,10 @@ quarkus-test-security = { module = "io.quarkus:quarkus-test-security" } restassured-core = { module = "io.rest-assured:rest-assured" } restassured-kotlin = { module = "io.rest-assured:kotlin-extensions" } +# Calcite (SQL) +calcite-core = { module = "org.apache.calcite:calcite-core", version.ref = "calcite" } +jline = { module = "org.jline:jline", version.ref = "jline" } + # Other classgraph = { module = "io.github.classgraph:classgraph", version.ref = "classgraph" } jakarta-validation = { module = "jakarta.validation:jakarta.validation-api", version.ref = "jakarta-validation" } diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts index 9ced95a7..319b2ae3 100644 --- a/opendc-compute/opendc-compute-workload/build.gradle.kts +++ b/opendc-compute/opendc-compute-workload/build.gradle.kts @@ -39,4 +39,6 @@ dependencies { implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) + + testImplementation(libs.slf4j.simple) } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt index 84387bbc..c854d874 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt @@ -23,14 +23,12 @@ package org.opendc.compute.workload.export.parquet import mu.KotlinLogging -import org.apache.avro.Schema -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.column.ParquetProperties import org.apache.parquet.hadoop.ParquetFileWriter import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.opendc.trace.util.parquet.LocalOutputFile +import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.BlockingQueue @@ -38,10 +36,13 @@ import kotlin.concurrent.thread /** * A writer that writes data in Parquet format. + * + * @param path The path to the file to write the data to. + * @param writeSupport The [WriteSupport] implementation for converting the records to Parquet format. */ public abstract class ParquetDataWriter<in T>( path: File, - private val schema: Schema, + private val writeSupport: WriteSupport<T>, bufferSize: Int = 4096 ) : AutoCloseable { /** @@ -52,7 +53,7 @@ public abstract class ParquetDataWriter<in T>( /** * The queue of records to process. */ - private val queue: BlockingQueue<GenericData.Record> = ArrayBlockingQueue(bufferSize) + private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize) /** * An exception to be propagated to the actual writer. @@ -64,15 +65,15 @@ public abstract class ParquetDataWriter<in T>( */ private val writerThread = thread(start = false, name = this.toString()) { val writer = let { - val builder = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path)) - .withSchema(schema) + val builder = LocalParquetWriter.builder(path.toPath(), writeSupport) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) .withCompressionCodec(CompressionCodecName.ZSTD) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) buildWriter(builder) } val queue = queue - val buf = mutableListOf<GenericData.Record>() + val buf = mutableListOf<T>() var shouldStop = false try { @@ -101,16 +102,11 @@ public abstract class ParquetDataWriter<in T>( /** * Build the [ParquetWriter] used to write the Parquet files. */ - protected open fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + protected open fun buildWriter(builder: LocalParquetWriter.Builder<@UnsafeVariance T>): ParquetWriter<@UnsafeVariance T> { return builder.build() } /** - * Convert the specified [data] into a Parquet record. - */ - protected abstract fun convert(builder: GenericRecordBuilder, data: T) - - /** * Write the specified metrics to the database. */ public fun write(data: T) { @@ -119,9 +115,7 @@ public abstract class ParquetDataWriter<in T>( throw IllegalStateException("Writer thread failed", exception) } - val builder = GenericRecordBuilder(schema) - convert(builder, data) - queue.put(builder.build()) + queue.put(data) } /** diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt index 2b7cac8f..0d5b6b34 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt @@ -22,81 +22,189 @@ package org.opendc.compute.workload.export.parquet -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter +import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.* import org.opendc.telemetry.compute.table.HostTableReader -import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA -import org.opendc.trace.util.parquet.UUID_SCHEMA -import org.opendc.trace.util.parquet.optional +import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File +import java.util.* /** * A Parquet event writer for [HostTableReader]s. */ public class ParquetHostDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<HostTableReader>(path, SCHEMA, bufferSize) { + ParquetDataWriter<HostTableReader>(path, HostDataWriteSupport(), bufferSize) { - override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + override fun buildWriter(builder: LocalParquetWriter.Builder<HostTableReader>): ParquetWriter<HostTableReader> { return builder .withDictionaryEncoding("host_id", true) .build() } - override fun convert(builder: GenericRecordBuilder, data: HostTableReader) { - builder["timestamp"] = data.timestamp.toEpochMilli() + override fun toString(): String = "host-writer" - builder["host_id"] = data.host.id + /** + * A [WriteSupport] implementation for a [HostTableReader]. + */ + private class HostDataWriteSupport : WriteSupport<HostTableReader>() { + lateinit var recordConsumer: RecordConsumer - builder["uptime"] = data.uptime - builder["downtime"] = data.downtime - val bootTime = data.bootTime - builder["boot_time"] = bootTime?.toEpochMilli() + override fun init(configuration: Configuration): WriteContext { + return WriteContext(SCHEMA, emptyMap()) + } - builder["cpu_count"] = data.host.cpuCount - builder["cpu_limit"] = data.cpuLimit - builder["cpu_time_active"] = data.cpuActiveTime - builder["cpu_time_idle"] = data.cpuIdleTime - builder["cpu_time_steal"] = data.cpuStealTime - builder["cpu_time_lost"] = data.cpuLostTime + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } - builder["mem_limit"] = data.host.memCapacity + override fun write(record: HostTableReader) { + write(recordConsumer, record) + } - builder["power_total"] = data.powerTotal + private fun write(consumer: RecordConsumer, data: HostTableReader) { + consumer.startMessage() - builder["guests_terminated"] = data.guestsTerminated - builder["guests_running"] = data.guestsRunning - builder["guests_error"] = data.guestsError - builder["guests_invalid"] = data.guestsInvalid - } + consumer.startField("timestamp", 0) + consumer.addLong(data.timestamp.toEpochMilli()) + consumer.endField("timestamp", 0) - override fun toString(): String = "host-writer" + consumer.startField("host_id", 1) + consumer.addBinary(UUID.fromString(data.host.id).toBinary()) + consumer.endField("host_id", 1) + + consumer.startField("uptime", 2) + consumer.addLong(data.uptime) + consumer.endField("uptime", 2) + + consumer.startField("downtime", 3) + consumer.addLong(data.downtime) + consumer.endField("downtime", 3) + + val bootTime = data.bootTime + if (bootTime != null) { + consumer.startField("boot_time", 4) + consumer.addLong(bootTime.toEpochMilli()) + consumer.endField("boot_time", 4) + } + + consumer.startField("cpu_count", 5) + consumer.addInteger(data.host.cpuCount) + consumer.endField("cpu_count", 5) + + consumer.startField("cpu_limit", 6) + consumer.addDouble(data.cpuLimit) + consumer.endField("cpu_limit", 6) + + consumer.startField("cpu_time_active", 7) + consumer.addLong(data.cpuActiveTime) + consumer.endField("cpu_time_active", 7) + + consumer.startField("cpu_time_idle", 8) + consumer.addLong(data.cpuIdleTime) + consumer.endField("cpu_time_idle", 8) + + consumer.startField("cpu_time_steal", 9) + consumer.addLong(data.cpuStealTime) + consumer.endField("cpu_time_steal", 9) + + consumer.startField("cpu_time_lost", 10) + consumer.addLong(data.cpuLostTime) + consumer.endField("cpu_time_lost", 10) + + consumer.startField("mem_limit", 11) + consumer.addLong(data.host.memCapacity) + consumer.endField("mem_limit", 11) + + consumer.startField("power_total", 12) + consumer.addDouble(data.powerTotal) + consumer.endField("power_total", 12) + + consumer.startField("guests_terminated", 13) + consumer.addInteger(data.guestsTerminated) + consumer.endField("guests_terminated", 13) + + consumer.startField("guests_running", 14) + consumer.addInteger(data.guestsRunning) + consumer.endField("guests_running", 14) + + consumer.startField("guests_error", 15) + consumer.addInteger(data.guestsError) + consumer.endField("guests_error", 15) + + consumer.startField("guests_invalid", 16) + consumer.addInteger(data.guestsInvalid) + consumer.endField("guests_invalid", 16) + + consumer.endMessage() + } + } private companion object { - private val SCHEMA: Schema = SchemaBuilder - .record("host") - .namespace("org.opendc.telemetry.compute") - .fields() - .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() - .name("host_id").type(UUID_SCHEMA).noDefault() - .requiredLong("uptime") - .requiredLong("downtime") - .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() - .requiredInt("cpu_count") - .requiredDouble("cpu_limit") - .requiredLong("cpu_time_active") - .requiredLong("cpu_time_idle") - .requiredLong("cpu_time_steal") - .requiredLong("cpu_time_lost") - .requiredLong("mem_limit") - .requiredDouble("power_total") - .requiredInt("guests_terminated") - .requiredInt("guests_running") - .requiredInt("guests_error") - .requiredInt("guests_invalid") - .endRecord() + /** + * The schema of the host data. + */ + val SCHEMA: MessageType = Types + .buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .length(16) + .`as`(LogicalTypeAnnotation.uuidType()) + .named("host_id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("uptime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("downtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("boot_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_limit"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_active"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_idle"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_steal"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_lost"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_limit"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("power_total"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_terminated"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_running"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_error"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("guests_invalid"), + ) + .named("host") } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt index 144b6624..5d11629b 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt @@ -22,73 +22,177 @@ package org.opendc.compute.workload.export.parquet -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.avro.generic.GenericRecordBuilder -import org.apache.parquet.avro.AvroParquetWriter +import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.* import org.opendc.telemetry.compute.table.ServerTableReader -import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA -import org.opendc.trace.util.parquet.UUID_SCHEMA -import org.opendc.trace.util.parquet.optional +import org.opendc.trace.util.parquet.LocalParquetWriter import java.io.File +import java.util.* /** * A Parquet event writer for [ServerTableReader]s. */ public class ParquetServerDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<ServerTableReader>(path, SCHEMA, bufferSize) { + ParquetDataWriter<ServerTableReader>(path, ServerDataWriteSupport(), bufferSize) { - override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> { + override fun buildWriter(builder: LocalParquetWriter.Builder<ServerTableReader>): ParquetWriter<ServerTableReader> { return builder .withDictionaryEncoding("server_id", true) .withDictionaryEncoding("host_id", true) .build() } - override fun convert(builder: GenericRecordBuilder, data: ServerTableReader) { - builder["timestamp"] = data.timestamp.toEpochMilli() + override fun toString(): String = "server-writer" - builder["server_id"] = data.server.id - builder["host_id"] = data.host?.id + /** + * A [WriteSupport] implementation for a [ServerTableReader]. + */ + private class ServerDataWriteSupport : WriteSupport<ServerTableReader>() { + lateinit var recordConsumer: RecordConsumer - builder["uptime"] = data.uptime - builder["downtime"] = data.downtime - builder["boot_time"] = data.bootTime?.toEpochMilli() - builder["provision_time"] = data.provisionTime?.toEpochMilli() + override fun init(configuration: Configuration): WriteContext { + return WriteContext(SCHEMA, emptyMap()) + } - builder["cpu_count"] = data.server.cpuCount - builder["cpu_limit"] = data.cpuLimit - builder["cpu_time_active"] = data.cpuActiveTime - builder["cpu_time_idle"] = data.cpuIdleTime - builder["cpu_time_steal"] = data.cpuStealTime - builder["cpu_time_lost"] = data.cpuLostTime + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } - builder["mem_limit"] = data.server.memCapacity - } + override fun write(record: ServerTableReader) { + write(recordConsumer, record) + } - override fun toString(): String = "server-writer" + private fun write(consumer: RecordConsumer, data: ServerTableReader) { + consumer.startMessage() + + consumer.startField("timestamp", 0) + consumer.addLong(data.timestamp.toEpochMilli()) + consumer.endField("timestamp", 0) + + consumer.startField("server_id", 1) + consumer.addBinary(UUID.fromString(data.server.id).toBinary()) + consumer.endField("server_id", 1) + + val hostId = data.host?.id + if (hostId != null) { + consumer.startField("host_id", 2) + consumer.addBinary(UUID.fromString(hostId).toBinary()) + consumer.endField("host_id", 2) + } + + consumer.startField("uptime", 3) + consumer.addLong(data.uptime) + consumer.endField("uptime", 3) + + consumer.startField("downtime", 4) + consumer.addLong(data.downtime) + consumer.endField("downtime", 4) + + val bootTime = data.bootTime + if (bootTime != null) { + consumer.startField("boot_time", 5) + consumer.addLong(bootTime.toEpochMilli()) + consumer.endField("boot_time", 5) + } + + val provisionTime = data.provisionTime + if (provisionTime != null) { + consumer.startField("provision_time", 6) + consumer.addLong(provisionTime.toEpochMilli()) + consumer.endField("provision_time", 6) + } + + consumer.startField("cpu_count", 7) + consumer.addInteger(data.server.cpuCount) + consumer.endField("cpu_count", 7) + + consumer.startField("cpu_limit", 8) + consumer.addDouble(data.cpuLimit) + consumer.endField("cpu_limit", 8) + + consumer.startField("cpu_time_active", 9) + consumer.addLong(data.cpuActiveTime) + consumer.endField("cpu_time_active", 9) + + consumer.startField("cpu_time_idle", 10) + consumer.addLong(data.cpuIdleTime) + consumer.endField("cpu_time_idle", 10) + + consumer.startField("cpu_time_steal", 11) + consumer.addLong(data.cpuStealTime) + consumer.endField("cpu_time_steal", 11) + + consumer.startField("cpu_time_lost", 12) + consumer.addLong(data.cpuLostTime) + consumer.endField("cpu_time_lost", 12) + + consumer.startField("mem_limit", 13) + consumer.addLong(data.server.memCapacity) + consumer.endField("mem_limit", 13) + + consumer.endMessage() + } + } private companion object { - private val SCHEMA: Schema = SchemaBuilder - .record("server") - .namespace("org.opendc.telemetry.compute") - .fields() - .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() - .name("server_id").type(UUID_SCHEMA).noDefault() - .name("host_id").type(UUID_SCHEMA.optional()).noDefault() - .requiredLong("uptime") - .requiredLong("downtime") - .name("provision_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() - .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault() - .requiredInt("cpu_count") - .requiredDouble("cpu_limit") - .requiredLong("cpu_time_active") - .requiredLong("cpu_time_idle") - .requiredLong("cpu_time_steal") - .requiredLong("cpu_time_lost") - .requiredLong("mem_limit") - .endRecord() + /** + * The schema of the server data. + */ + val SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .length(16) + .`as`(LogicalTypeAnnotation.uuidType()) + .named("server_id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .length(16) + .`as`(LogicalTypeAnnotation.uuidType()) + .named("host_id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("uptime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("downtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("provision_time"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("boot_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_limit"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_active"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_idle"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_steal"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("cpu_time_lost"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_limit") + ) + .named("server") } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt index ec8a2b65..5ad3b95e 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt @@ -22,45 +22,108 @@ package org.opendc.compute.workload.export.parquet -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericRecordBuilder +import io.opentelemetry.context.ContextKey.named +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.* import org.opendc.telemetry.compute.table.ServiceTableReader -import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA import java.io.File /** * A Parquet event writer for [ServiceTableReader]s. */ public class ParquetServiceDataWriter(path: File, bufferSize: Int) : - ParquetDataWriter<ServiceTableReader>(path, SCHEMA, bufferSize) { - - override fun convert(builder: GenericRecordBuilder, data: ServiceTableReader) { - builder["timestamp"] = data.timestamp.toEpochMilli() - builder["hosts_up"] = data.hostsUp - builder["hosts_down"] = data.hostsDown - builder["servers_pending"] = data.serversPending - builder["servers_active"] = data.serversActive - builder["attempts_success"] = data.attemptsSuccess - builder["attempts_failure"] = data.attemptsFailure - builder["attempts_error"] = data.attemptsError - } + ParquetDataWriter<ServiceTableReader>(path, ServiceDataWriteSupport(), bufferSize) { override fun toString(): String = "service-writer" + /** + * A [WriteSupport] implementation for a [ServiceTableReader]. + */ + private class ServiceDataWriteSupport : WriteSupport<ServiceTableReader>() { + lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(SCHEMA, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: ServiceTableReader) { + write(recordConsumer, record) + } + + private fun write(consumer: RecordConsumer, data: ServiceTableReader) { + consumer.startMessage() + + consumer.startField("timestamp", 0) + consumer.addLong(data.timestamp.toEpochMilli()) + consumer.endField("timestamp", 0) + + consumer.startField("hosts_up", 1) + consumer.addInteger(data.hostsUp) + consumer.endField("hosts_up", 1) + + consumer.startField("hosts_down", 2) + consumer.addInteger(data.hostsDown) + consumer.endField("hosts_down", 2) + + consumer.startField("servers_pending", 3) + consumer.addInteger(data.serversPending) + consumer.endField("servers_pending", 3) + + consumer.startField("servers_active", 4) + consumer.addInteger(data.serversActive) + consumer.endField("servers_active", 4) + + consumer.startField("attempts_success", 5) + consumer.addInteger(data.attemptsSuccess) + consumer.endField("attempts_pending", 5) + + consumer.startField("attempts_failure", 6) + consumer.addInteger(data.attemptsFailure) + consumer.endField("attempts_failure", 6) + + consumer.startField("attempts_error", 7) + consumer.addInteger(data.attemptsError) + consumer.endField("attempts_error", 7) + + consumer.endMessage() + } + } + private companion object { - private val SCHEMA: Schema = SchemaBuilder - .record("service") - .namespace("org.opendc.telemetry.compute") - .fields() - .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() - .requiredInt("hosts_up") - .requiredInt("hosts_down") - .requiredInt("servers_pending") - .requiredInt("servers_active") - .requiredInt("attempts_success") - .requiredInt("attempts_failure") - .requiredInt("attempts_error") - .endRecord() + private val SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("hosts_up"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("hosts_down"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("servers_pending"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("servers_active"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("attempts_success"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("attempts_failure"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("attempts_error"), + ) + .named("service") } } diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt new file mode 100644 index 00000000..9921f5b8 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.apache.parquet.io.api.Binary +import java.nio.ByteBuffer +import java.util.UUID + +/** + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +internal fun UUID.toBinary(): Binary { + val bb = ByteBuffer.wrap(ByteArray(16)) + bb.putLong(mostSignificantBits) + bb.putLong(leastSignificantBits) + return Binary.fromConstantByteBuffer(bb) +} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt new file mode 100644 index 00000000..dae03513 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.opendc.telemetry.compute.table.HostInfo +import org.opendc.telemetry.compute.table.HostTableReader +import java.nio.file.Files +import java.time.Instant + +/** + * Test suite for [ParquetHostDataWriter] + */ +class HostDataWriterTest { + /** + * The path to write the data file to. + */ + private val path = Files.createTempFile("opendc", "parquet") + + /** + * The writer used to write the data. + */ + private val writer = ParquetHostDataWriter(path.toFile(), bufferSize = 4096) + + @AfterEach + fun tearDown() { + writer.close() + Files.deleteIfExists(path) + } + + @Test + fun testSmoke() { + assertDoesNotThrow { + writer.write(object : HostTableReader { + override val timestamp: Instant = Instant.now() + override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096) + override val guestsTerminated: Int = 0 + override val guestsRunning: Int = 0 + override val guestsError: Int = 0 + override val guestsInvalid: Int = 0 + override val cpuLimit: Double = 4096.0 + override val cpuUsage: Double = 1.0 + override val cpuDemand: Double = 1.0 + override val cpuUtilization: Double = 0.0 + override val cpuActiveTime: Long = 1 + override val cpuIdleTime: Long = 1 + override val cpuStealTime: Long = 1 + override val cpuLostTime: Long = 1 + override val powerUsage: Double = 1.0 + override val powerTotal: Double = 1.0 + override val uptime: Long = 1 + override val downtime: Long = 1 + override val bootTime: Instant? = null + }) + } + } +} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt new file mode 100644 index 00000000..280f5ef8 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.opendc.telemetry.compute.table.HostInfo +import org.opendc.telemetry.compute.table.ServerInfo +import org.opendc.telemetry.compute.table.ServerTableReader +import java.nio.file.Files +import java.time.Instant + +/** + * Test suite for [ParquetServerDataWriter] + */ +class ServerDataWriterTest { + /** + * The path to write the data file to. + */ + private val path = Files.createTempFile("opendc", "parquet") + + /** + * The writer used to write the data. + */ + private val writer = ParquetServerDataWriter(path.toFile(), bufferSize = 4096) + + @AfterEach + fun tearDown() { + writer.close() + Files.deleteIfExists(path) + } + + @Test + fun testSmoke() { + assertDoesNotThrow { + writer.write(object : ServerTableReader { + override val timestamp: Instant = Instant.now() + override val server: ServerInfo = ServerInfo("id", "test", "vm", "x86", "test", "test", 2, 4096) + override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096) + override val cpuLimit: Double = 4096.0 + override val cpuActiveTime: Long = 1 + override val cpuIdleTime: Long = 1 + override val cpuStealTime: Long = 1 + override val cpuLostTime: Long = 1 + override val uptime: Long = 1 + override val downtime: Long = 1 + override val provisionTime: Instant = timestamp + override val bootTime: Instant? = null + }) + } + } +} diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt new file mode 100644 index 00000000..7ffa7186 --- /dev/null +++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.workload.export.parquet + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.opendc.telemetry.compute.table.ServiceTableReader +import java.nio.file.Files +import java.time.Instant + +/** + * Test suite for [ParquetServiceDataWriter] + */ +class ServiceDataWriterTest { + /** + * The path to write the data file to. + */ + private val path = Files.createTempFile("opendc", "parquet") + + /** + * The writer used to write the data. + */ + private val writer = ParquetServiceDataWriter(path.toFile(), bufferSize = 4096) + + @AfterEach + fun tearDown() { + writer.close() + Files.deleteIfExists(path) + } + + @Test + fun testSmoke() { + assertDoesNotThrow { + writer.write(object : ServiceTableReader { + override val timestamp: Instant = Instant.now() + override val hostsUp: Int = 1 + override val hostsDown: Int = 0 + override val serversPending: Int = 1 + override val serversActive: Int = 1 + override val attemptsSuccess: Int = 1 + override val attemptsFailure: Int = 0 + override val attemptsError: Int = 0 + }) + } + } +} diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt index b0181cbc..05d0234a 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt @@ -42,9 +42,11 @@ public interface Table { public val partitionKeys: List<TableColumn<*>> /** - * Open a [TableReader] for this table. + * Open a [TableReader] for a projection of this table. + * + * @param projection The list of columns to fetch from the table or `null` if no projection is performed. */ - public fun newReader(): TableReader + public fun newReader(projection: List<TableColumn<*>>? = null): TableReader /** * Open a [TableWriter] for this table. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt index 776c40c0..b77a2982 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt @@ -33,7 +33,7 @@ public class TableColumn<out T>(public val name: String, type: Class<T>) { /** * The type of the column. */ - private val type: Class<*> = type + public val type: Class<*> = type /** * Determine whether the type of the column is a subtype of [column]. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt index 532f6d24..5e8859e4 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt @@ -24,22 +24,21 @@ package org.opendc.trace.conv import org.opendc.trace.TableColumn -import org.opendc.trace.column /** * Members of the interference group. */ @JvmField -public val INTERFERENCE_GROUP_MEMBERS: TableColumn<Set<String>> = column("interference_group:members") +public val INTERFERENCE_GROUP_MEMBERS: TableColumn<Set<String>> = column("members") /** * Target load after which the interference occurs. */ @JvmField -public val INTERFERENCE_GROUP_TARGET: TableColumn<Double> = column("interference_group:target") +public val INTERFERENCE_GROUP_TARGET: TableColumn<Double> = column("target") /** * Performance score when the interference occurs. */ @JvmField -public val INTERFERENCE_GROUP_SCORE: TableColumn<Double> = column("interference_group:score") +public val INTERFERENCE_GROUP_SCORE: TableColumn<Double> = column("score") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt index e9fc5d44..e602e534 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt @@ -24,47 +24,46 @@ package org.opendc.trace.conv import org.opendc.trace.TableColumn -import org.opendc.trace.column import java.time.Instant /** * Identifier of the resource. */ @JvmField -public val RESOURCE_ID: TableColumn<String> = column("resource:id") +public val RESOURCE_ID: TableColumn<String> = column("id") /** * The cluster to which the resource belongs. */ @JvmField -public val RESOURCE_CLUSTER_ID: TableColumn<String> = column("resource:cluster_id") +public val RESOURCE_CLUSTER_ID: TableColumn<String> = column("cluster_id") /** * Start time for the resource. */ @JvmField -public val RESOURCE_START_TIME: TableColumn<Instant> = column("resource:start_time") +public val RESOURCE_START_TIME: TableColumn<Instant> = column("start_time") /** * End time for the resource. */ @JvmField -public val RESOURCE_STOP_TIME: TableColumn<Instant> = column("resource:stop_time") +public val RESOURCE_STOP_TIME: TableColumn<Instant> = column("stop_time") /** * Number of CPUs for the resource. */ @JvmField -public val RESOURCE_CPU_COUNT: TableColumn<Int> = column("resource:cpu_count") +public val RESOURCE_CPU_COUNT: TableColumn<Int> = column("cpu_count") /** * Total CPU capacity of the resource in MHz. */ @JvmField -public val RESOURCE_CPU_CAPACITY: TableColumn<Double> = column("resource:cpu_capacity") +public val RESOURCE_CPU_CAPACITY: TableColumn<Double> = column("cpu_capacity") /** * Memory capacity for the resource in KB. */ @JvmField -public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = column("resource:mem_capacity") +public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = column("mem_capacity") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt index d5bbafd7..3a44f817 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt @@ -24,7 +24,6 @@ package org.opendc.trace.conv import org.opendc.trace.TableColumn -import org.opendc.trace.column import java.time.Duration import java.time.Instant @@ -32,70 +31,70 @@ import java.time.Instant * The timestamp at which the state was recorded. */ @JvmField -public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = column("resource_state:timestamp") +public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = column("timestamp") /** * Duration for the state. */ @JvmField -public val RESOURCE_STATE_DURATION: TableColumn<Duration> = column("resource_state:duration") +public val RESOURCE_STATE_DURATION: TableColumn<Duration> = column("duration") /** * A flag to indicate that the resource is powered on. */ @JvmField -public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = column("resource_state:powered_on") +public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = column("powered_on") /** * Total CPU usage of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = column("resource_state:cpu_usage") +public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = column("cpu_usage") /** * Total CPU usage of the resource in percentage. */ @JvmField -public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = column("resource_state:cpu_usage_pct") +public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = column("cpu_usage_pct") /** * Total CPU demand of the resource in MHz. */ @JvmField -public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = column("resource_state:cpu_demand") +public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = column("cpu_demand") /** * CPU ready percentage. */ @JvmField -public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = column("resource_state:cpu_ready_pct") +public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = column("cpu_ready_pct") /** * Memory usage of the resource in KB. */ @JvmField -public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = column("resource_state:mem_usage") +public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = column("mem_usage") /** * Disk read throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = column("resource_state:disk_read") +public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = column("disk_read") /** * Disk write throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = column("resource_state:disk_write") +public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = column("disk_write") /** * Network receive throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_RX: TableColumn<Double> = column("resource_state:net_rx") +public val RESOURCE_STATE_NET_RX: TableColumn<Double> = column("net_rx") /** * Network transmit throughput of the resource in KB/s. */ @JvmField -public val RESOURCE_STATE_NET_TX: TableColumn<Double> = column("resource_state:net_tx") +public val RESOURCE_STATE_NET_TX: TableColumn<Double> = column("net_tx") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt index 31a58360..a58505e9 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt @@ -21,7 +21,9 @@ */ @file:JvmName("TableColumns") -package org.opendc.trace +package org.opendc.trace.conv + +import org.opendc.trace.TableColumn /** * Construct a [TableColumn] with the specified [name] and type [T]. diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt index 397c0794..e6daafb7 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt @@ -24,7 +24,6 @@ package org.opendc.trace.conv import org.opendc.trace.TableColumn -import org.opendc.trace.column import java.time.Duration import java.time.Instant @@ -32,70 +31,70 @@ import java.time.Instant * A column containing the task identifier. */ @JvmField -public val TASK_ID: TableColumn<String> = column("task:id") +public val TASK_ID: TableColumn<String> = column("id") /** * A column containing the identifier of the workflow. */ @JvmField -public val TASK_WORKFLOW_ID: TableColumn<String> = column("task:workflow_id") +public val TASK_WORKFLOW_ID: TableColumn<String> = column("workflow_id") /** * A column containing the submission time of the task. */ @JvmField -public val TASK_SUBMIT_TIME: TableColumn<Instant> = column("task:submit_time") +public val TASK_SUBMIT_TIME: TableColumn<Instant> = column("submit_time") /** * A column containing the wait time of the task. */ @JvmField -public val TASK_WAIT_TIME: TableColumn<Instant> = column("task:wait_time") +public val TASK_WAIT_TIME: TableColumn<Instant> = column("wait_time") /** * A column containing the runtime time of the task. */ @JvmField -public val TASK_RUNTIME: TableColumn<Duration> = column("task:runtime") +public val TASK_RUNTIME: TableColumn<Duration> = column("runtime") /** * A column containing the parents of a task. */ @JvmField -public val TASK_PARENTS: TableColumn<Set<String>> = column("task:parents") +public val TASK_PARENTS: TableColumn<Set<String>> = column("parents") /** * A column containing the children of a task. */ @JvmField -public val TASK_CHILDREN: TableColumn<Set<String>> = column("task:children") +public val TASK_CHILDREN: TableColumn<Set<String>> = column("children") /** * A column containing the requested CPUs of a task. */ @JvmField -public val TASK_REQ_NCPUS: TableColumn<Int> = column("task:req_ncpus") +public val TASK_REQ_NCPUS: TableColumn<Int> = column("req_ncpus") /** * A column containing the allocated CPUs of a task. */ @JvmField -public val TASK_ALLOC_NCPUS: TableColumn<Int> = column("task:alloc_ncpus") +public val TASK_ALLOC_NCPUS: TableColumn<Int> = column("alloc_ncpus") /** * A column containing the status of a task. */ @JvmField -public val TASK_STATUS: TableColumn<Int> = column("task:status") +public val TASK_STATUS: TableColumn<Int> = column("status") /** * A column containing the group id of a task. */ @JvmField -public val TASK_GROUP_ID: TableColumn<Int> = column("task:group_id") +public val TASK_GROUP_ID: TableColumn<Int> = column("group_id") /** * A column containing the user id of a task. */ @JvmField -public val TASK_USER_ID: TableColumn<Int> = column("task:user_id") +public val TASK_USER_ID: TableColumn<Int> = column("user_id") diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt index 24551edb..b848e19a 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt @@ -43,7 +43,9 @@ internal class TableImpl(val trace: TraceImpl, override val name: String) : Tabl override val partitionKeys: List<TableColumn<*>> get() = details.partitionKeys - override fun newReader(): TableReader = trace.format.newReader(trace.path, name) + override fun newReader(projection: List<TableColumn<*>>?): TableReader { + return trace.format.newReader(trace.path, name, projection) + } override fun newWriter(): TableWriter = trace.format.newWriter(trace.path, name) diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt index f2e610db..47761e0f 100644 --- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt +++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt @@ -22,6 +22,7 @@ package org.opendc.trace.spi +import org.opendc.trace.TableColumn import org.opendc.trace.TableReader import org.opendc.trace.TableWriter import java.nio.file.Path @@ -68,10 +69,11 @@ public interface TraceFormat { * * @param path The path to the trace to open. * @param table The name of the table to open a [TableReader] for. + * @param projection The list of [TableColumn]s to project or `null` if no projection is performed. * @throws IllegalArgumentException If [table] does not exist. * @return A [TableReader] instance for the table. */ - public fun newReader(path: Path, table: String): TableReader + public fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader /** * Open a [TableWriter] for the specified [table]. diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt index 8e3e60cc..73978990 100644 --- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt +++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt @@ -81,7 +81,7 @@ public class AzureTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { return when (table) { TABLE_RESOURCES -> { val stream = GZIPInputStream(path.resolve("vmtable/vmtable.csv.gz").inputStream()) diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt index 56f9a940..263d26ce 100644 --- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt @@ -57,7 +57,7 @@ class AzureTraceFormatTest { @Test fun testResources() { val path = Paths.get("src/test/resources/trace") - val reader = format.newReader(path, TABLE_RESOURCES) + val reader = format.newReader(path, TABLE_RESOURCES, null) assertAll( { assertTrue(reader.nextRow()) }, { assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) }, @@ -71,7 +71,7 @@ class AzureTraceFormatTest { @Test fun testSmoke() { val path = Paths.get("src/test/resources/trace") - val reader = format.newReader(path, TABLE_RESOURCE_STATES) + val reader = format.newReader(path, TABLE_RESOURCE_STATES, null) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt index 11d21a04..82e454ad 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt @@ -72,7 +72,7 @@ public class BitbrainsExTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { return when (table) { TABLE_RESOURCE_STATES -> newResourceStateReader(path) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt index e1e7604a..a374e951 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt @@ -81,7 +81,7 @@ public class BitbrainsTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { return when (table) { TABLE_RESOURCES -> { val vms = Files.walk(path, 1) diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt index 77429e3e..c944cb98 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt @@ -59,7 +59,7 @@ internal class BitbrainsExTraceFormatTest { @Test fun testSmoke() { val path = Paths.get("src/test/resources/vm.txt") - val reader = format.newReader(path, TABLE_RESOURCE_STATES) + val reader = format.newReader(path, TABLE_RESOURCE_STATES, null) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt index 9309beb1..841801e6 100644 --- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt @@ -57,7 +57,7 @@ class BitbrainsTraceFormatTest { @Test fun testResources() { val path = Paths.get("src/test/resources/bitbrains.csv") - val reader = format.newReader(path, TABLE_RESOURCES) + val reader = format.newReader(path, TABLE_RESOURCES, null) assertAll( { assertTrue(reader.nextRow()) }, @@ -71,7 +71,7 @@ class BitbrainsTraceFormatTest { @Test fun testSmoke() { val path = Paths.get("src/test/resources/bitbrains.csv") - val reader = format.newReader(path, TABLE_RESOURCE_STATES) + val reader = format.newReader(path, TABLE_RESOURCE_STATES, null) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-calcite/build.gradle.kts b/opendc-trace/opendc-trace-calcite/build.gradle.kts new file mode 100644 index 00000000..2ffdac3c --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/build.gradle.kts @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "Apache Calcite (SQL) integration for the OpenDC trace library" + +/* Build configuration */ +plugins { + `kotlin-library-conventions` +} + +dependencies { + api(projects.opendcTrace.opendcTraceApi) + + api(libs.calcite.core) + + testRuntimeOnly(projects.opendcTrace.opendcTraceOpendc) + testRuntimeOnly(libs.slf4j.simple) +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt new file mode 100644 index 00000000..9c7b69a2 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.linq4j.Enumerable +import org.apache.calcite.schema.Table + +/** + * A Calcite [Table] to which rows can be inserted. + */ +internal interface InsertableTable : Table { + /** + * Insert [rows] into this table. + * + * @param rows The rows to insert into the table. + * @return The number of rows inserted. + */ + fun insert(rows: Enumerable<Array<Any?>>): Long +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt new file mode 100644 index 00000000..1854f262 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.linq4j.Enumerator +import org.opendc.trace.TableColumn +import org.opendc.trace.TableReader +import java.sql.Timestamp +import java.time.Duration +import java.time.Instant +import java.util.concurrent.atomic.AtomicBoolean + +/** + * An [Enumerator] for a [TableReader]. + */ +internal class TraceReaderEnumerator<E>( + private val reader: TableReader, + private val columns: List<TableColumn<*>>, + private val cancelFlag: AtomicBoolean +) : Enumerator<E> { + private val columnIndices = columns.map { reader.resolve(it) }.toIntArray() + private var current: E? = null + + override fun moveNext(): Boolean { + if (cancelFlag.get()) { + return false + } + + val reader = reader + val res = reader.nextRow() + + if (res) { + @Suppress("UNCHECKED_CAST") + current = convertRow(reader) as E + } else { + current = null + } + + return res + } + + override fun current(): E = checkNotNull(current) + + override fun reset() { + throw UnsupportedOperationException() + } + + override fun close() { + reader.close() + } + + private fun convertRow(reader: TableReader): Array<Any?> { + val res = arrayOfNulls<Any?>(columns.size) + val columnIndices = columnIndices + + for ((index, column) in columns.withIndex()) { + val columnIndex = columnIndices[index] + res[index] = convertColumn(reader, column, columnIndex) + } + return res + } + + private fun convertColumn(reader: TableReader, column: TableColumn<*>, columnIndex: Int): Any? { + val value = reader.get(columnIndex) + + return when (column.type) { + Instant::class.java -> Timestamp.from(value as Instant) + Duration::class.java -> (value as Duration).toMillis() + Set::class.java -> (value as Set<*>).toTypedArray() + else -> value + } + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt new file mode 100644 index 00000000..3249546d --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.schema.Schema +import org.apache.calcite.schema.Table +import org.apache.calcite.schema.impl.AbstractSchema +import org.opendc.trace.Trace + +/** + * A Calcite [Schema] that exposes an OpenDC [Trace] into multiple SQL tables. + * + * @param trace The [Trace] to create a schema for. + */ +public class TraceSchema(private val trace: Trace) : AbstractSchema() { + /** + * The [Table]s that belong to this schema. + */ + private val tables: Map<String, TraceTable> by lazy { + trace.tables.associateWith { + val table = checkNotNull(trace.getTable(it)) { "Unexpected null table" } + TraceTable(table) + } + } + + override fun getTableMap(): Map<String, Table> = tables +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt new file mode 100644 index 00000000..3c6badc8 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.model.ModelHandler +import org.apache.calcite.schema.Schema +import org.apache.calcite.schema.SchemaFactory +import org.apache.calcite.schema.SchemaPlus +import org.opendc.trace.Trace +import java.io.File +import java.nio.file.Paths + +/** + * Factory that creates a [TraceSchema]. + * + * This factory allows users to include a schema that references a trace in a `model.json` file. + */ +public class TraceSchemaFactory : SchemaFactory { + override fun create(parentSchema: SchemaPlus, name: String, operand: Map<String, Any>): Schema { + val base = operand[ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName] as File? + val pathParam = requireNotNull(operand["path"]) { "Trace path not specified" } as String + val path = if (base != null) File(base, pathParam).toPath() else Paths.get(pathParam) + + val format = requireNotNull(operand["format"]) { "Trace format not specified" } as String + val create = operand.getOrDefault("create", false) as Boolean + + val trace = if (create) Trace.create(path, format) else Trace.open(path, format) + return TraceSchema(trace) + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt new file mode 100644 index 00000000..8c571b82 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.DataContext +import org.apache.calcite.adapter.java.AbstractQueryableTable +import org.apache.calcite.adapter.java.JavaTypeFactory +import org.apache.calcite.linq4j.* +import org.apache.calcite.plan.RelOptCluster +import org.apache.calcite.plan.RelOptTable +import org.apache.calcite.prepare.Prepare.CatalogReader +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableModify +import org.apache.calcite.rel.logical.LogicalTableModify +import org.apache.calcite.rel.type.RelDataType +import org.apache.calcite.rel.type.RelDataTypeFactory +import org.apache.calcite.rex.RexNode +import org.apache.calcite.schema.* +import org.apache.calcite.schema.impl.AbstractTableQueryable +import org.apache.calcite.sql.type.SqlTypeName +import java.time.Duration +import java.time.Instant +import java.util.concurrent.atomic.AtomicBoolean + +/** + * A Calcite [Table] that exposes an OpenDC [org.opendc.trace.Table] as SQL table. + */ +internal class TraceTable(private val table: org.opendc.trace.Table) : + AbstractQueryableTable(Array<Any?>::class.java), + ProjectableFilterableTable, + ModifiableTable, + InsertableTable { + private var rowType: RelDataType? = null + + override fun getRowType(typeFactory: RelDataTypeFactory): RelDataType { + var rowType = rowType + if (rowType == null) { + rowType = deduceRowType(typeFactory as JavaTypeFactory) + this.rowType = rowType + } + + return rowType + } + + override fun scan(root: DataContext, filters: MutableList<RexNode>, projects: IntArray?): Enumerable<Array<Any?>> { + // Filters are currently not supported by the OpenDC trace API. By keeping the filters in the list, Calcite + // assumes that they are declined and will perform the filters itself. + + val projection = projects?.map { table.columns[it] } + val cancelFlag = DataContext.Variable.CANCEL_FLAG.get<AtomicBoolean>(root) + return object : AbstractEnumerable<Array<Any?>>() { + override fun enumerator(): Enumerator<Array<Any?>> = + TraceReaderEnumerator(table.newReader(projection), projection ?: table.columns, cancelFlag) + } + } + + override fun insert(rows: Enumerable<Array<Any?>>): Long { + val table = table + val columns = table.columns + val writer = table.newWriter() + val columnIndices = columns.map { writer.resolve(it) }.toIntArray() + var rowCount = 0L + + try { + for (row in rows) { + writer.startRow() + + for ((index, value) in row.withIndex()) { + if (value == null) { + continue + } + val columnType = columns[index].type + + writer.set( + columnIndices[index], + when (columnType) { + Duration::class.java -> Duration.ofMillis(value as Long) + Instant::class.java -> Instant.ofEpochMilli(value as Long) + Set::class.java -> (value as List<*>).toSet() + else -> value + } + ) + } + + writer.endRow() + + rowCount++ + } + } finally { + writer.close() + } + + return rowCount + } + + override fun <T> asQueryable(queryProvider: QueryProvider, schema: SchemaPlus, tableName: String): Queryable<T> { + return object : AbstractTableQueryable<T>(queryProvider, schema, this@TraceTable, tableName) { + override fun enumerator(): Enumerator<T> { + val cancelFlag = AtomicBoolean(false) + return TraceReaderEnumerator( + this@TraceTable.table.newReader(), + this@TraceTable.table.columns, + cancelFlag + ) + } + + override fun toString(): String = "TraceTableQueryable[table=$tableName]" + } + } + + override fun getModifiableCollection(): MutableCollection<Any?>? = null + + override fun toModificationRel( + cluster: RelOptCluster, + table: RelOptTable, + catalogReader: CatalogReader, + child: RelNode, + operation: TableModify.Operation, + updateColumnList: MutableList<String>?, + sourceExpressionList: MutableList<RexNode>?, + flattened: Boolean + ): TableModify { + cluster.planner.addRule(TraceTableModifyRule.DEFAULT.toRule()) + + return LogicalTableModify.create( + table, + catalogReader, + child, + operation, + updateColumnList, + sourceExpressionList, + flattened + ) + } + + override fun toString(): String = "TraceTable" + + private fun deduceRowType(typeFactory: JavaTypeFactory): RelDataType { + val types = mutableListOf<RelDataType>() + val names = mutableListOf<String>() + + for (column in table.columns) { + names.add(column.name) + types.add( + when (column.type) { + Instant::class.java -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP) + Duration::class.java -> typeFactory.createSqlType(SqlTypeName.BIGINT) + Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.UNKNOWN), -1) + else -> typeFactory.createType(column.type) + } + ) + } + + return typeFactory.createStructType(types, names) + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt new file mode 100644 index 00000000..64dc0cea --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.adapter.enumerable.* +import org.apache.calcite.adapter.enumerable.EnumerableRel.Prefer +import org.apache.calcite.adapter.java.JavaTypeFactory +import org.apache.calcite.linq4j.Enumerable +import org.apache.calcite.linq4j.tree.BlockBuilder +import org.apache.calcite.linq4j.tree.Expressions +import org.apache.calcite.linq4j.tree.Types +import org.apache.calcite.plan.* +import org.apache.calcite.prepare.Prepare +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableModify +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.RexNode +import org.apache.calcite.schema.ModifiableTable +import org.apache.calcite.util.BuiltInMethod +import java.lang.reflect.Method + +/** + * A [TableModify] expression that modifies a workload trace. + */ +internal class TraceTableModify( + cluster: RelOptCluster, + traitSet: RelTraitSet, + table: RelOptTable, + schema: Prepare.CatalogReader, + input: RelNode, + operation: Operation, + updateColumnList: List<String>?, + sourceExpressionList: List<RexNode>?, + flattened: Boolean +) : TableModify(cluster, traitSet, table, schema, input, operation, updateColumnList, sourceExpressionList, flattened), + EnumerableRel { + init { + // Make sure the table is modifiable + table.unwrap(ModifiableTable::class.java) ?: throw AssertionError() // TODO: user error in validator + } + + override fun copy(traitSet: RelTraitSet, inputs: List<RelNode>?): RelNode { + return TraceTableModify( + cluster, + traitSet, + table, + getCatalogReader(), + sole(inputs), + operation, + updateColumnList, + sourceExpressionList, + isFlattened + ) + } + + override fun computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery?): RelOptCost { + // Prefer this plan compared to the standard EnumerableTableModify. + return super.computeSelfCost(planner, mq)!!.multiplyBy(.1) + } + + override fun implement(implementor: EnumerableRelImplementor, pref: Prefer): EnumerableRel.Result { + val builder = BlockBuilder() + val result = implementor.visitChild(this, 0, getInput() as EnumerableRel, pref) + val childExp = builder.append("child", result.block) + val convertedChildExpr = if (getInput().rowType != rowType) { + val typeFactory = cluster.typeFactory as JavaTypeFactory + val format = EnumerableTableScan.deduceFormat(table) + val physType = PhysTypeImpl.of(typeFactory, table.rowType, format) + val childPhysType = result.physType + val o = Expressions.parameter(childPhysType.javaRowType, "o") + val expressionList = List(childPhysType.rowType.fieldCount) { i -> + childPhysType.fieldReference(o, i, physType.getJavaFieldType(i)) + } + + builder.append( + "convertedChild", + Expressions.call( + childExp, + BuiltInMethod.SELECT.method, + Expressions.lambda<org.apache.calcite.linq4j.function.Function<*>>(physType.record(expressionList), o) + ) + ) + } else { + childExp + } + + if (!isInsert) { + throw UnsupportedOperationException("Deletion and update not supported") + } + + val expression = table.getExpression(InsertableTable::class.java) + builder.add( + Expressions.return_( + null, + Expressions.call( + BuiltInMethod.SINGLETON_ENUMERABLE.method, + Expressions.call( + Long::class.java, + expression, + INSERT_METHOD, + convertedChildExpr, + ) + ) + ) + ) + + val rowFormat = if (pref === Prefer.ARRAY) JavaRowFormat.ARRAY else JavaRowFormat.SCALAR + val physType = PhysTypeImpl.of(implementor.typeFactory, getRowType(), rowFormat) + return implementor.result(physType, builder.toBlock()) + } + + private companion object { + /** + * Reference to [InsertableTable.insert] method. + */ + val INSERT_METHOD: Method = Types.lookupMethod(InsertableTable::class.java, "insert", Enumerable::class.java) + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt new file mode 100644 index 00000000..7572e381 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.adapter.enumerable.EnumerableConvention +import org.apache.calcite.plan.Convention +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.TableModify +import org.apache.calcite.rel.logical.LogicalTableModify +import org.apache.calcite.schema.ModifiableTable + +/** + * A [ConverterRule] from a [LogicalTableModify] to a [TraceTableModify]. + */ +internal class TraceTableModifyRule(config: Config) : ConverterRule(config) { + override fun convert(rel: RelNode): RelNode? { + val modify = rel as TableModify + val table = modify.table!! + + // Make sure that the table is modifiable + if (table.unwrap(ModifiableTable::class.java) == null) { + return null + } + + val traitSet = modify.traitSet.replace(EnumerableConvention.INSTANCE) + return TraceTableModify( + modify.cluster, traitSet, + table, + modify.catalogReader, + convert(modify.input, traitSet), + modify.operation, + modify.updateColumnList, + modify.sourceExpressionList, + modify.isFlattened + ) + } + + companion object { + /** Default configuration. */ + val DEFAULT: Config = Config.INSTANCE + .withConversion(LogicalTableModify::class.java, Convention.NONE, EnumerableConvention.INSTANCE, "TraceTableModificationRule") + .withRuleFactory { config: Config -> TraceTableModifyRule(config) } + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt new file mode 100644 index 00000000..d2877d7c --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.apache.calcite.jdbc.CalciteConnection +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.opendc.trace.Trace +import java.nio.file.Files +import java.nio.file.Paths +import java.sql.DriverManager +import java.sql.ResultSet +import java.sql.Statement +import java.sql.Timestamp +import java.util.* + +/** + * Smoke test for Apache Calcite integration. + */ +class CalciteTest { + /** + * The trace to experiment with. + */ + private val trace = Trace.open(Paths.get("src/test/resources/trace"), format = "opendc-vm") + + @Test + fun testResources() { + runQuery(trace, "SELECT * FROM trace.resources") { rs -> + assertAll( + { assertTrue(rs.next()) }, + { assertEquals("1019", rs.getString("id")) }, + { assertEquals(1, rs.getInt("cpu_count")) }, + { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("start_time")) }, + { assertEquals(181352.0, rs.getDouble("mem_capacity")) }, + { assertTrue(rs.next()) }, + { assertEquals("1023", rs.getString("id")) }, + { assertTrue(rs.next()) }, + { assertEquals("1052", rs.getString("id")) }, + { assertTrue(rs.next()) }, + { assertEquals("1073", rs.getString("id")) }, + { assertFalse(rs.next()) } + ) + } + } + + @Test + fun testResourceStates() { + runQuery(trace, "SELECT * FROM trace.resource_states") { rs -> + assertAll( + { assertTrue(rs.next()) }, + { assertEquals("1019", rs.getString("id")) }, + { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("timestamp")) }, + { assertEquals(300000, rs.getLong("duration")) }, + { assertEquals(0.0, rs.getDouble("cpu_usage")) }, + { assertTrue(rs.next()) }, + { assertEquals("1019", rs.getString("id")) }, + ) + } + } + + @Test + fun testInterferenceGroups() { + runQuery(trace, "SELECT * FROM trace.interference_groups") { rs -> + assertAll( + { assertTrue(rs.next()) }, + { assertArrayEquals(arrayOf("1019", "1023", "1052"), rs.getArray("members").array as Array<*>) }, + { assertEquals(0.0, rs.getDouble("target")) }, + { assertEquals(0.8830158730158756, rs.getDouble("score")) }, + ) + } + } + + @Test + fun testComplexQuery() { + runQuery(trace, "SELECT max(cpu_usage) as max_cpu_usage, avg(cpu_usage) as avg_cpu_usage FROM trace.resource_states") { rs -> + assertAll( + { assertTrue(rs.next()) }, + { assertEquals(249.59993808, rs.getDouble("max_cpu_usage")) }, + { assertEquals(5.387240309118493, rs.getDouble("avg_cpu_usage")) }, + ) + } + } + + @Test + fun testInsert() { + val tmp = Files.createTempDirectory("opendc") + val newTrace = Trace.create(tmp, "opendc-vm") + + runStatement(newTrace) { stmt -> + val count = stmt.executeUpdate( + """ + INSERT INTO trace.resources (id, start_time, stop_time, cpu_count, cpu_capacity, mem_capacity) + VALUES (1234, '2013-08-12 13:35:46.0', '2013-09-11 13:39:58.0', 1, 2926.0, 1024.0) + """.trimIndent() + ) + assertEquals(1, count) + } + + runQuery(newTrace, "SELECT * FROM trace.resources") { rs -> + assertAll( + { assertTrue(rs.next()) }, + { assertEquals("1234", rs.getString("id")) }, + { assertEquals(1, rs.getInt("cpu_count")) }, + { assertEquals(Timestamp.valueOf("2013-08-12 13:35:46.0"), rs.getTimestamp("start_time")) }, + { assertEquals(2926.0, rs.getDouble("cpu_capacity")) }, + { assertEquals(1024.0, rs.getDouble("mem_capacity")) } + ) + } + } + + /** + * Helper function to run statement for the specified trace. + */ + private fun runQuery(trace: Trace, query: String, block: (ResultSet) -> Unit) { + runStatement(trace) { stmt -> + val rs = stmt.executeQuery(query) + rs.use { block(rs) } + } + } + + /** + * Helper function to run statement for the specified trace. + */ + private fun runStatement(trace: Trace, block: (Statement) -> Unit) { + val info = Properties() + info.setProperty("lex", "JAVA") + val connection = DriverManager.getConnection("jdbc:calcite:", info).unwrap(CalciteConnection::class.java) + connection.rootSchema.add("trace", TraceSchema(trace)) + + val stmt = connection.createStatement() + try { + block(stmt) + } finally { + stmt.close() + connection.close() + } + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt new file mode 100644 index 00000000..0a552e74 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.calcite + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import java.sql.DriverManager +import java.sql.Timestamp +import java.util.* + +/** + * Test suite for [TraceSchemaFactory]. + */ +class TraceSchemaFactoryTest { + @Test + fun testSmoke() { + val info = Properties() + info.setProperty("lex", "JAVA") + val connection = DriverManager.getConnection("jdbc:calcite:model=src/test/resources/model.json", info) + val stmt = connection.createStatement() + val rs = stmt.executeQuery("SELECT * FROM trace.resources") + try { + assertAll( + { assertTrue(rs.next()) }, + { assertEquals("1019", rs.getString("id")) }, + { assertEquals(1, rs.getInt("cpu_count")) }, + { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("start_time")) }, + { assertEquals(181352.0, rs.getDouble("mem_capacity")) }, + ) + } finally { + rs.close() + stmt.close() + connection.close() + } + } + + @Test + fun testWithoutParams() { + assertThrows<java.lang.RuntimeException> { + DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory") + } + } + + @Test + fun testWithoutPath() { + assertThrows<java.lang.RuntimeException> { + DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory; schema.format=opendc-vm") + } + } + + @Test + fun testWithoutFormat() { + assertThrows<java.lang.RuntimeException> { + DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory; schema.path=trace") + } + } +} diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/model.json b/opendc-trace/opendc-trace-calcite/src/test/resources/model.json new file mode 100644 index 00000000..91e2657f --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/test/resources/model.json @@ -0,0 +1,15 @@ +{ + "version": "1.0", + "defaultSchema": "trace", + "schemas": [ + { + "name": "trace", + "type": "custom", + "factory": "org.opendc.trace.calcite.TraceSchemaFactory", + "operand": { + "path": "trace", + "format": "opendc-vm" + } + } + ] +} diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json new file mode 100644 index 00000000..6a0616d9 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json @@ -0,0 +1,20 @@ +[ + { + "vms": [ + "1019", + "1023", + "1052" + ], + "minServerLoad": 0.0, + "performanceScore": 0.8830158730158756 + }, + { + "vms": [ + "1023", + "1052", + "1073" + ], + "minServerLoad": 0.0, + "performanceScore": 0.7133055555552751 + } +] diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet Binary files differnew file mode 100644 index 00000000..d8184945 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet Binary files differnew file mode 100644 index 00000000..00ab5835 --- /dev/null +++ b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt index 63688523..8d9eab82 100644 --- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt @@ -70,7 +70,7 @@ public class GwfTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { return when (table) { TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile())) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt index 9bf28ad7..411d45d0 100644 --- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt @@ -58,7 +58,7 @@ internal class GwfTraceFormatTest { @Test fun testTableReader() { val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, null) assertAll( { assertTrue(reader.nextRow()) }, @@ -73,7 +73,7 @@ internal class GwfTraceFormatTest { @Test fun testReadingRowWithDependencies() { val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI()) - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, null) // Move to row 7 for (x in 1..6) @@ -85,7 +85,7 @@ internal class GwfTraceFormatTest { { assertEquals("7", reader.get(TASK_ID)) }, { assertEquals(Instant.ofEpochSecond(87), reader.get(TASK_SUBMIT_TIME)) }, { assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) }, - { assertEquals(setOf<String>("4", "5", "6"), reader.get(TASK_PARENTS)) }, + { assertEquals(setOf("4", "5", "6"), reader.get(TASK_PARENTS)) }, ) } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt index b82da888..7a01b881 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt @@ -22,38 +22,30 @@ package org.opendc.trace.opendc -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.opendc.parquet.ResourceState import org.opendc.trace.util.parquet.LocalParquetReader -import java.time.Duration -import java.time.Instant /** * A [TableReader] implementation for the OpenDC virtual machine trace format. */ -internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader { +internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader<ResourceState>) : TableReader { /** * The current record. */ - private var record: GenericRecord? = null - - /** - * A flag to indicate that the columns have been initialized. - */ - private var hasInitializedColumns = false + private var record: ResourceState? = null override fun nextRow(): Boolean { - val record = reader.read() - this.record = record + try { + val record = reader.read() + this.record = record - if (!hasInitializedColumns && record != null) { - initColumns(record.schema) - hasInitializedColumns = true + return record != null + } catch (e: Throwable) { + this.record = null + throw e } - - return record != null } override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 @@ -67,36 +59,36 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_ID -> record[AVRO_COL_ID].toString() - COL_TIMESTAMP -> Instant.ofEpochMilli(record[AVRO_COL_TIMESTAMP] as Long) - COL_DURATION -> Duration.ofMillis(record[AVRO_COL_DURATION] as Long) - COL_CPU_COUNT -> getInt(index) - COL_CPU_USAGE -> getDouble(index) - else -> throw IllegalArgumentException("Invalid column") + COL_ID -> record.id + COL_TIMESTAMP -> record.timestamp + COL_DURATION -> record.duration + COL_CPU_COUNT -> record.cpuCount + COL_CPU_USAGE -> record.cpuUsage + else -> throw IllegalArgumentException("Invalid column index $index") } } override fun getBoolean(index: Int): Boolean { - throw IllegalArgumentException("Invalid column") + throw IllegalArgumentException("Invalid column or type [index $index]") } override fun getInt(index: Int): Int { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int - else -> throw IllegalArgumentException("Invalid column") + COL_CPU_COUNT -> record.cpuCount + else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } override fun getLong(index: Int): Long { - throw IllegalArgumentException("Invalid column") + throw IllegalArgumentException("Invalid column or type [index $index]") } override fun getDouble(index: Int): Double { val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_CPU_USAGE -> (record[AVRO_COL_CPU_USAGE] as Number).toDouble() - else -> throw IllegalArgumentException("Invalid column") + COL_CPU_USAGE -> record.cpuUsage + else -> throw IllegalArgumentException("Invalid column or type [index $index]") } } @@ -106,28 +98,6 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea override fun toString(): String = "OdcVmResourceStateTableReader" - /** - * Initialize the columns for the reader based on [schema]. - */ - private fun initColumns(schema: Schema) { - try { - AVRO_COL_ID = schema.getField("id").pos() - AVRO_COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos() - AVRO_COL_DURATION = schema.getField("duration").pos() - AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos() - AVRO_COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() - } catch (e: NullPointerException) { - // This happens when the field we are trying to access does not exist - throw IllegalArgumentException("Invalid schema", e) - } - } - - private var AVRO_COL_ID = -1 - private var AVRO_COL_TIMESTAMP = -1 - private var AVRO_COL_DURATION = -1 - private var AVRO_COL_CPU_COUNT = -1 - private var AVRO_COL_CPU_USAGE = -1 - private val COL_ID = 0 private val COL_TIMESTAMP = 1 private val COL_DURATION = 2 diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt index 01b9750c..97af5b59 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt @@ -22,84 +22,85 @@ package org.opendc.trace.opendc -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord -import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.opendc.parquet.ResourceState import java.time.Duration import java.time.Instant /** * A [TableWriter] implementation for the OpenDC virtual machine trace format. */ -internal class OdcVmResourceStateTableWriter( - private val writer: ParquetWriter<GenericRecord>, - private val schema: Schema -) : TableWriter { +internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<ResourceState>) : TableWriter { /** - * The current builder for the record that is being written. + * The current state for the record that is being written. */ - private var builder: GenericRecordBuilder? = null - - /** - * The fields belonging to the resource state schema. - */ - private val fields = schema.fields + private var _isActive = false + private var _id: String = "" + private var _timestamp: Instant = Instant.MIN + private var _duration: Duration = Duration.ZERO + private var _cpuCount: Int = 0 + private var _cpuUsage: Double = Double.NaN override fun startRow() { - builder = GenericRecordBuilder(schema) + _isActive = true + _id = "" + _timestamp = Instant.MIN + _duration = Duration.ZERO + _cpuCount = 0 + _cpuUsage = Double.NaN } override fun endRow() { - val builder = checkNotNull(builder) { "No active row" } - this.builder = null - - val record = builder.build() - val id = record[COL_ID] as String - val timestamp = record[COL_TIMESTAMP] as Long + check(_isActive) { "No active row" } + _isActive = false - check(lastId != id || timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" } + check(lastId != _id || _timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" } - writer.write(builder.build()) + writer.write(ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage)) - lastId = id - lastTimestamp = timestamp + lastId = _id + lastTimestamp = _timestamp } - override fun resolve(column: TableColumn<*>): Int { - val schema = schema - return when (column) { - RESOURCE_ID -> schema.getField("id").pos() - RESOURCE_STATE_TIMESTAMP -> (schema.getField("timestamp") ?: schema.getField("time")).pos() - RESOURCE_STATE_DURATION -> schema.getField("duration").pos() - RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("cores")).pos() - RESOURCE_STATE_CPU_USAGE -> (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos() - else -> -1 - } - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 override fun set(index: Int, value: Any) { - val builder = checkNotNull(builder) { "No active row" } - - builder.set( - fields[index], - when (index) { - COL_TIMESTAMP -> (value as Instant).toEpochMilli() - COL_DURATION -> (value as Duration).toMillis() - else -> value - } - ) + check(_isActive) { "No active row" } + + when (index) { + COL_ID -> _id = value as String + COL_TIMESTAMP -> _timestamp = value as Instant + COL_DURATION -> _duration = value as Duration + COL_CPU_COUNT -> _cpuCount = value as Int + COL_CPU_USAGE -> _cpuUsage = value as Double + } } - override fun setBoolean(index: Int, value: Boolean) = set(index, value) + override fun setBoolean(index: Int, value: Boolean) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } - override fun setInt(index: Int, value: Int) = set(index, value) + override fun setInt(index: Int, value: Int) { + check(_isActive) { "No active row" } + when (index) { + COL_CPU_COUNT -> _cpuCount = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } - override fun setLong(index: Int, value: Long) = set(index, value) + override fun setLong(index: Int, value: Long) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } - override fun setDouble(index: Int, value: Double) = set(index, value) + override fun setDouble(index: Int, value: Double) { + check(_isActive) { "No active row" } + when (index) { + COL_CPU_USAGE -> _cpuUsage = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } override fun flush() { // Not available @@ -113,12 +114,19 @@ internal class OdcVmResourceStateTableWriter( * Last column values that are used to check for correct partitioning. */ private var lastId: String? = null - private var lastTimestamp: Long = Long.MIN_VALUE - - /** - * Columns with special behavior. - */ - private val COL_ID = resolve(RESOURCE_ID) - private val COL_TIMESTAMP = resolve(RESOURCE_STATE_TIMESTAMP) - private val COL_DURATION = resolve(RESOURCE_STATE_DURATION) + private var lastTimestamp: Instant = Instant.MAX + + private val COL_ID = 0 + private val COL_TIMESTAMP = 1 + private val COL_DURATION = 2 + private val COL_CPU_COUNT = 3 + private val COL_CPU_USAGE = 4 + + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP, + RESOURCE_STATE_DURATION to COL_DURATION, + RESOURCE_CPU_COUNT to COL_CPU_COUNT, + RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE, + ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt index 4909e70e..6102332f 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt @@ -22,37 +22,30 @@ package org.opendc.trace.opendc -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.opendc.parquet.Resource import org.opendc.trace.util.parquet.LocalParquetReader -import java.time.Instant /** - * A [TableReader] implementation for the resources table in the OpenDC virtual machine trace format. + * A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format. */ -internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader { +internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<Resource>) : TableReader { /** * The current record. */ - private var record: GenericRecord? = null - - /** - * A flag to indicate that the columns have been initialized. - */ - private var hasInitializedColumns = false + private var record: Resource? = null override fun nextRow(): Boolean { - val record = reader.read() - this.record = record + try { + val record = reader.read() + this.record = record - if (!hasInitializedColumns && record != null) { - initColumns(record.schema) - hasInitializedColumns = true + return record != null + } catch (e: Throwable) { + this.record = null + throw e } - - return record != null } override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 @@ -66,9 +59,9 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_ID -> record[AVRO_COL_ID].toString() - COL_START_TIME -> Instant.ofEpochMilli(record[AVRO_COL_START_TIME] as Long) - COL_STOP_TIME -> Instant.ofEpochMilli(record[AVRO_COL_STOP_TIME] as Long) + COL_ID -> record.id + COL_START_TIME -> record.startTime + COL_STOP_TIME -> record.stopTime COL_CPU_COUNT -> getInt(index) COL_CPU_CAPACITY -> getDouble(index) COL_MEM_CAPACITY -> getDouble(index) @@ -84,7 +77,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int + COL_CPU_COUNT -> record.cpuCount else -> throw IllegalArgumentException("Invalid column") } } @@ -97,8 +90,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_CPU_CAPACITY -> if (AVRO_COL_CPU_CAPACITY >= 0) (record[AVRO_COL_CPU_CAPACITY] as Number).toDouble() else 0.0 - COL_MEM_CAPACITY -> (record[AVRO_COL_MEM_CAPACITY] as Number).toDouble() + COL_CPU_CAPACITY -> record.cpuCapacity + COL_MEM_CAPACITY -> record.memCapacity else -> throw IllegalArgumentException("Invalid column") } } @@ -109,30 +102,6 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G override fun toString(): String = "OdcVmResourceTableReader" - /** - * Initialize the columns for the reader based on [schema]. - */ - private fun initColumns(schema: Schema) { - try { - AVRO_COL_ID = schema.getField("id").pos() - AVRO_COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos() - AVRO_COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos() - AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos() - AVRO_COL_CPU_CAPACITY = schema.getField("cpu_capacity")?.pos() ?: -1 - AVRO_COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos() - } catch (e: NullPointerException) { - // This happens when the field we are trying to access does not exist - throw IllegalArgumentException("Invalid schema") - } - } - - private var AVRO_COL_ID = -1 - private var AVRO_COL_START_TIME = -1 - private var AVRO_COL_STOP_TIME = -1 - private var AVRO_COL_CPU_COUNT = -1 - private var AVRO_COL_CPU_CAPACITY = -1 - private var AVRO_COL_MEM_CAPACITY = -1 - private val COL_ID = 0 private val COL_START_TIME = 1 private val COL_STOP_TIME = 2 diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt index edc89ee6..cae65faa 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt @@ -22,74 +22,82 @@ package org.opendc.trace.opendc -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord -import org.apache.avro.generic.GenericRecordBuilder import org.apache.parquet.hadoop.ParquetWriter import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.opendc.parquet.Resource import java.time.Instant -import kotlin.math.roundToLong /** * A [TableWriter] implementation for the OpenDC virtual machine trace format. */ -internal class OdcVmResourceTableWriter( - private val writer: ParquetWriter<GenericRecord>, - private val schema: Schema -) : TableWriter { +internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resource>) : TableWriter { /** - * The current builder for the record that is being written. + * The current state for the record that is being written. */ - private var builder: GenericRecordBuilder? = null - - /** - * The fields belonging to the resource schema. - */ - private val fields = schema.fields + private var _isActive = false + private var _id: String = "" + private var _startTime: Instant = Instant.MIN + private var _stopTime: Instant = Instant.MIN + private var _cpuCount: Int = 0 + private var _cpuCapacity: Double = Double.NaN + private var _memCapacity: Double = Double.NaN override fun startRow() { - builder = GenericRecordBuilder(schema) + _isActive = true + _id = "" + _startTime = Instant.MIN + _stopTime = Instant.MIN + _cpuCount = 0 + _cpuCapacity = Double.NaN + _memCapacity = Double.NaN } override fun endRow() { - val builder = checkNotNull(builder) { "No active row" } - this.builder = null - writer.write(builder.build()) + check(_isActive) { "No active row" } + _isActive = false + writer.write(Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity)) } - override fun resolve(column: TableColumn<*>): Int { - val schema = schema - return when (column) { - RESOURCE_ID -> schema.getField("id").pos() - RESOURCE_START_TIME -> (schema.getField("start_time") ?: schema.getField("submissionTime")).pos() - RESOURCE_STOP_TIME -> (schema.getField("stop_time") ?: schema.getField("endTime")).pos() - RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos() - RESOURCE_CPU_CAPACITY -> schema.getField("cpu_capacity").pos() - RESOURCE_MEM_CAPACITY -> (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos() - else -> -1 - } - } + override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 override fun set(index: Int, value: Any) { - val builder = checkNotNull(builder) { "No active row" } - builder.set( - fields[index], - when (index) { - COL_START_TIME, COL_STOP_TIME -> (value as Instant).toEpochMilli() - COL_MEM_CAPACITY -> (value as Double).roundToLong() - else -> value - } - ) + check(_isActive) { "No active row" } + when (index) { + COL_ID -> _id = value as String + COL_START_TIME -> _startTime = value as Instant + COL_STOP_TIME -> _stopTime = value as Instant + COL_CPU_COUNT -> _cpuCount = value as Int + COL_CPU_CAPACITY -> _cpuCapacity = value as Double + COL_MEM_CAPACITY -> _memCapacity = value as Double + else -> throw IllegalArgumentException("Invalid column index $index") + } } - override fun setBoolean(index: Int, value: Boolean) = set(index, value) + override fun setBoolean(index: Int, value: Boolean) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } - override fun setInt(index: Int, value: Int) = set(index, value) + override fun setInt(index: Int, value: Int) { + check(_isActive) { "No active row" } + when (index) { + COL_CPU_COUNT -> _cpuCount = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } - override fun setLong(index: Int, value: Long) = set(index, value) + override fun setLong(index: Int, value: Long) { + throw IllegalArgumentException("Invalid column or type [index $index]") + } - override fun setDouble(index: Int, value: Double) = set(index, value) + override fun setDouble(index: Int, value: Double) { + check(_isActive) { "No active row" } + when (index) { + COL_CPU_CAPACITY -> _cpuCapacity = value + COL_MEM_CAPACITY -> _memCapacity = value + else -> throw IllegalArgumentException("Invalid column or type [index $index]") + } + } override fun flush() { // Not available @@ -99,10 +107,19 @@ internal class OdcVmResourceTableWriter( writer.close() } - /** - * Columns with special behavior. - */ - private val COL_START_TIME = resolve(RESOURCE_START_TIME) - private val COL_STOP_TIME = resolve(RESOURCE_STOP_TIME) - private val COL_MEM_CAPACITY = resolve(RESOURCE_MEM_CAPACITY) + private val COL_ID = 0 + private val COL_START_TIME = 1 + private val COL_STOP_TIME = 2 + private val COL_CPU_COUNT = 3 + private val COL_CPU_CAPACITY = 4 + private val COL_MEM_CAPACITY = 5 + + private val columns = mapOf( + RESOURCE_ID to COL_ID, + RESOURCE_START_TIME to COL_START_TIME, + RESOURCE_STOP_TIME to COL_STOP_TIME, + RESOURCE_CPU_COUNT to COL_CPU_COUNT, + RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY, + RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY, + ) } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index 36a1b4a0..d45910c6 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -22,19 +22,19 @@ package org.opendc.trace.opendc -import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericRecord -import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.column.ParquetProperties import org.apache.parquet.hadoop.ParquetFileWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.opendc.trace.* import org.opendc.trace.conv.* +import org.opendc.trace.opendc.parquet.ResourceReadSupport +import org.opendc.trace.opendc.parquet.ResourceStateReadSupport +import org.opendc.trace.opendc.parquet.ResourceStateWriteSupport +import org.opendc.trace.opendc.parquet.ResourceWriteSupport import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat -import org.opendc.trace.util.parquet.LocalOutputFile import org.opendc.trace.util.parquet.LocalParquetReader -import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA +import org.opendc.trace.util.parquet.LocalParquetWriter import shaded.parquet.com.fasterxml.jackson.core.JsonEncoding import shaded.parquet.com.fasterxml.jackson.core.JsonFactory import java.nio.file.Files @@ -102,14 +102,14 @@ public class OdcVmTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { return when (table) { TABLE_RESOURCES -> { - val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet")) + val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection)) OdcVmResourceTableReader(reader) } TABLE_RESOURCE_STATES -> { - val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet")) + val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport(projection)) OdcVmResourceStateTableReader(reader) } TABLE_INTERFERENCE_GROUPS -> { @@ -128,24 +128,24 @@ public class OdcVmTraceFormat : TraceFormat { override fun newWriter(path: Path, table: String): TableWriter { return when (table) { TABLE_RESOURCES -> { - val schema = RESOURCES_SCHEMA - val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("meta.parquet"))) - .withSchema(schema) + val writer = LocalParquetWriter.builder(path.resolve("meta.parquet"), ResourceWriteSupport()) .withCompressionCodec(CompressionCodecName.ZSTD) + .withPageWriteChecksumEnabled(true) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build() - OdcVmResourceTableWriter(writer, schema) + OdcVmResourceTableWriter(writer) } TABLE_RESOURCE_STATES -> { - val schema = RESOURCE_STATES_SCHEMA - val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("trace.parquet"))) - .withSchema(schema) + val writer = LocalParquetWriter.builder(path.resolve("trace.parquet"), ResourceStateWriteSupport()) .withCompressionCodec(CompressionCodecName.ZSTD) .withDictionaryEncoding("id", true) .withBloomFilterEnabled("id", true) + .withPageWriteChecksumEnabled(true) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build() - OdcVmResourceStateTableWriter(writer, schema) + OdcVmResourceStateTableWriter(writer) } TABLE_INTERFERENCE_GROUPS -> { val generator = jsonFactory.createGenerator(path.resolve("interference-model.json").toFile(), JsonEncoding.UTF8) @@ -154,37 +154,4 @@ public class OdcVmTraceFormat : TraceFormat { else -> throw IllegalArgumentException("Table $table not supported") } } - - public companion object { - /** - * Schema for the resources table in the trace. - */ - @JvmStatic - public val RESOURCES_SCHEMA: Schema = SchemaBuilder - .record("resource") - .namespace("org.opendc.trace.opendc") - .fields() - .requiredString("id") - .name("start_time").type(TIMESTAMP_SCHEMA).noDefault() - .name("stop_time").type(TIMESTAMP_SCHEMA).noDefault() - .requiredInt("cpu_count") - .requiredDouble("cpu_capacity") - .requiredLong("mem_capacity") - .endRecord() - - /** - * Schema for the resource states table in the trace. - */ - @JvmStatic - public val RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder - .record("resource_state") - .namespace("org.opendc.trace.opendc") - .fields() - .requiredString("id") - .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault() - .requiredLong("duration") - .requiredInt("cpu_count") - .requiredDouble("cpu_usage") - .endRecord() - } } diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt new file mode 100644 index 00000000..c6db45b5 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import java.time.Instant + +/** + * A description of a resource in a trace. + */ +internal data class Resource( + val id: String, + val startTime: Instant, + val stopTime: Instant, + val cpuCount: Int, + val cpuCapacity: Double, + val memCapacity: Double +) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt new file mode 100644 index 00000000..0d70446d --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.* +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.* + +/** + * A [ReadSupport] instance for [Resource] objects. + */ +internal class ResourceReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Resource>() { + /** + * Mapping from field names to [TableColumn]s. + */ + private val fieldMap = mapOf<String, TableColumn<*>>( + "id" to RESOURCE_ID, + "submissionTime" to RESOURCE_START_TIME, + "start_time" to RESOURCE_START_TIME, + "endTime" to RESOURCE_STOP_TIME, + "stop_time" to RESOURCE_STOP_TIME, + "maxCores" to RESOURCE_CPU_COUNT, + "cpu_count" to RESOURCE_CPU_COUNT, + "cpu_capacity" to RESOURCE_CPU_CAPACITY, + "requiredMemory" to RESOURCE_MEM_CAPACITY, + "mem_capacity" to RESOURCE_MEM_CAPACITY, + ) + + override fun init(context: InitContext): ReadContext { + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val projectionSet = projection.toSet() + + for (field in READ_SCHEMA.fields) { + val col = fieldMap[field.name] ?: continue + if (col in projectionSet) { + addField(field) + } + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + + return ReadContext(projectedSchema) + } + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType, + readContext: ReadContext + ): RecordMaterializer<Resource> = ResourceRecordMaterializer(readContext.requestedSchema) + + companion object { + /** + * Parquet read schema (version 2.0) for the "resources" table in the trace. + */ + @JvmStatic + val READ_SCHEMA_V2_0: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("submissionTime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("endTime"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("maxCores"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("requiredMemory"), + ) + .named("resource") + + /** + * Parquet read schema (version 2.1) for the "resources" table in the trace. + */ + @JvmStatic + val READ_SCHEMA_V2_1: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("start_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("stop_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + ) + .named("resource") + + /** + * Parquet read schema for the "resources" table in the trace. + */ + @JvmStatic + val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0 + .union(READ_SCHEMA_V2_1) + } +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt new file mode 100644 index 00000000..3adb0709 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import org.apache.parquet.io.api.* +import org.apache.parquet.schema.MessageType +import java.time.Instant + +/** + * A [RecordMaterializer] for [Resource] records. + */ +internal class ResourceRecordMaterializer(schema: MessageType) : RecordMaterializer<Resource>() { + /** + * State of current record being read. + */ + private var _id = "" + private var _startTime = Instant.MIN + private var _stopTime = Instant.MIN + private var _cpuCount = 0 + private var _cpuCapacity = 0.0 + private var _memCapacity = 0.0 + + /** + * Root converter for the record. + */ + private val root = object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = schema.fields.map { type -> + when (type.name) { + "id" -> object : PrimitiveConverter() { + override fun addBinary(value: Binary) { + _id = value.toStringUsingUTF8() + } + } + "start_time", "submissionTime" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _startTime = Instant.ofEpochMilli(value) + } + } + "stop_time", "endTime" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _stopTime = Instant.ofEpochMilli(value) + } + } + "cpu_count", "maxCores" -> object : PrimitiveConverter() { + override fun addInt(value: Int) { + _cpuCount = value + } + } + "cpu_capacity" -> object : PrimitiveConverter() { + override fun addDouble(value: Double) { + _cpuCapacity = value + } + } + "mem_capacity", "requiredMemory" -> object : PrimitiveConverter() { + override fun addDouble(value: Double) { + _memCapacity = value + } + + override fun addLong(value: Long) { + _memCapacity = value.toDouble() + } + } + else -> error("Unknown column $type") + } + } + + override fun start() { + _id = "" + _startTime = Instant.MIN + _stopTime = Instant.MIN + _cpuCount = 0 + _cpuCapacity = 0.0 + _memCapacity = 0.0 + } + + override fun end() {} + + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } + + override fun getCurrentRecord(): Resource = Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity) + + override fun getRootConverter(): GroupConverter = root +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt new file mode 100644 index 00000000..9ad58764 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import java.time.Duration +import java.time.Instant + +internal class ResourceState( + val id: String, + val timestamp: Instant, + val duration: Duration, + val cpuCount: Int, + val cpuUsage: Double +) diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt new file mode 100644 index 00000000..97aa00b2 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.* +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.* + +/** + * A [ReadSupport] instance for [ResourceState] objects. + */ +internal class ResourceStateReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<ResourceState>() { + /** + * Mapping from field names to [TableColumn]s. + */ + private val fieldMap = mapOf<String, TableColumn<*>>( + "id" to RESOURCE_ID, + "time" to RESOURCE_STATE_TIMESTAMP, + "timestamp" to RESOURCE_STATE_TIMESTAMP, + "duration" to RESOURCE_STATE_DURATION, + "cores" to RESOURCE_CPU_COUNT, + "cpu_count" to RESOURCE_CPU_COUNT, + "cpuUsage" to RESOURCE_STATE_CPU_USAGE, + "cpu_usage" to RESOURCE_STATE_CPU_USAGE, + ) + + override fun init(context: InitContext): ReadContext { + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val projectionSet = projection.toSet() + + for (field in READ_SCHEMA.fields) { + val col = fieldMap[field.name] ?: continue + if (col in projectionSet) { + addField(field) + } + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + + return ReadContext(projectedSchema) + } + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType, + readContext: ReadContext + ): RecordMaterializer<ResourceState> = ResourceStateRecordMaterializer(readContext.requestedSchema) + + companion object { + /** + * Parquet read schema (version 2.0) for the "resource states" table in the trace. + */ + @JvmStatic + val READ_SCHEMA_V2_0: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cores"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpuUsage") + ) + .named("resource_state") + + /** + * Parquet read schema (version 2.1) for the "resource states" table in the trace. + */ + @JvmStatic + val READ_SCHEMA_V2_1: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage") + ) + .named("resource_state") + + /** + * Parquet read schema for the "resource states" table in the trace. + */ + @JvmStatic + val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0.union(READ_SCHEMA_V2_1) + } +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt new file mode 100644 index 00000000..f8b0c3c2 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import org.apache.parquet.io.api.* +import org.apache.parquet.schema.MessageType +import java.time.Duration +import java.time.Instant + +/** + * A [RecordMaterializer] for [ResourceState] records. + */ +internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMaterializer<ResourceState>() { + /** + * State of current record being read. + */ + private var _id = "" + private var _timestamp = Instant.MIN + private var _duration = Duration.ZERO + private var _cpuCount = 0 + private var _cpuUsage = 0.0 + + /** + * Root converter for the record. + */ + private val root = object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = schema.fields.map { type -> + when (type.name) { + "id" -> object : PrimitiveConverter() { + override fun addBinary(value: Binary) { + _id = value.toStringUsingUTF8() + } + } + "timestamp", "time" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _timestamp = Instant.ofEpochMilli(value) + } + } + "duration" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _duration = Duration.ofMillis(value) + } + } + "cpu_count", "cores" -> object : PrimitiveConverter() { + override fun addInt(value: Int) { + _cpuCount = value + } + } + "cpu_usage", "cpuUsage" -> object : PrimitiveConverter() { + override fun addDouble(value: Double) { + _cpuUsage = value + } + } + "flops" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + // Ignore to support v1 format + } + } + else -> error("Unknown column $type") + } + } + + override fun start() { + _id = "" + _timestamp = Instant.MIN + _duration = Duration.ZERO + _cpuCount = 0 + _cpuUsage = 0.0 + } + + override fun end() {} + + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } + + override fun getCurrentRecord(): ResourceState = ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage) + + override fun getRootConverter(): GroupConverter = root +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt new file mode 100644 index 00000000..e2f3df31 --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.Binary +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.* + +/** + * Support for writing [Resource] instances to Parquet format. + */ +internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() { + /** + * The current active record consumer. + */ + private lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(WRITE_SCHEMA, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: ResourceState) { + write(recordConsumer, record) + } + + private fun write(consumer: RecordConsumer, record: ResourceState) { + consumer.startMessage() + + consumer.startField("id", 0) + consumer.addBinary(Binary.fromCharSequence(record.id)) + consumer.endField("id", 0) + + consumer.startField("timestamp", 1) + consumer.addLong(record.timestamp.toEpochMilli()) + consumer.endField("timestamp", 1) + + consumer.startField("duration", 2) + consumer.addLong(record.duration.toMillis()) + consumer.endField("duration", 2) + + consumer.startField("cpu_count", 3) + consumer.addInteger(record.cpuCount) + consumer.endField("cpu_count", 3) + + consumer.startField("cpu_usage", 4) + consumer.addDouble(record.cpuUsage) + consumer.endField("cpu_usage", 4) + + consumer.endMessage() + } + + companion object { + /** + * Parquet schema for the "resource states" table in the trace. + */ + @JvmStatic + val WRITE_SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("timestamp"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("duration"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_usage") + ) + .named("resource_state") + } +} diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt new file mode 100644 index 00000000..14cadabb --- /dev/null +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.opendc.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.Binary +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.* +import kotlin.math.roundToLong + +/** + * Support for writing [Resource] instances to Parquet format. + */ +internal class ResourceWriteSupport : WriteSupport<Resource>() { + /** + * The current active record consumer. + */ + private lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(WRITE_SCHEMA, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: Resource) { + write(recordConsumer, record) + } + + private fun write(consumer: RecordConsumer, record: Resource) { + consumer.startMessage() + + consumer.startField("id", 0) + consumer.addBinary(Binary.fromCharSequence(record.id)) + consumer.endField("id", 0) + + consumer.startField("start_time", 1) + consumer.addLong(record.startTime.toEpochMilli()) + consumer.endField("start_time", 1) + + consumer.startField("stop_time", 2) + consumer.addLong(record.stopTime.toEpochMilli()) + consumer.endField("stop_time", 2) + + consumer.startField("cpu_count", 3) + consumer.addInteger(record.cpuCount) + consumer.endField("cpu_count", 3) + + consumer.startField("cpu_capacity", 4) + consumer.addDouble(record.cpuCapacity) + consumer.endField("cpu_capacity", 4) + + consumer.startField("mem_capacity", 5) + consumer.addLong(record.memCapacity.roundToLong()) + consumer.endField("mem_capacity", 5) + + consumer.endMessage() + } + + companion object { + /** + * Parquet schema for the "resources" table in the trace. + */ + @JvmStatic + val WRITE_SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .`as`(LogicalTypeAnnotation.stringType()) + .named("id"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("start_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("stop_time"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("cpu_count"), + Types + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("cpu_capacity"), + Types + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("mem_capacity"), + ) + .named("resource") + } +} diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index c8742624..1f4f6195 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -29,7 +29,9 @@ import org.junit.jupiter.api.assertThrows import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import org.opendc.trace.conv.* +import java.nio.file.Files import java.nio.file.Paths +import java.time.Instant /** * Test suite for the [OdcVmTraceFormat] implementation. @@ -61,11 +63,12 @@ internal class OdcVmTraceFormatTest { @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testResources(name: String) { val path = Paths.get("src/test/resources/$name") - val reader = format.newReader(path, TABLE_RESOURCES) + val reader = format.newReader(path, TABLE_RESOURCES, listOf(RESOURCE_ID, RESOURCE_START_TIME)) assertAll( { assertTrue(reader.nextRow()) }, { assertEquals("1019", reader.get(RESOURCE_ID)) }, + { assertEquals(Instant.ofEpochMilli(1376314846000), reader.get(RESOURCE_START_TIME)) }, { assertTrue(reader.nextRow()) }, { assertEquals("1023", reader.get(RESOURCE_ID)) }, { assertTrue(reader.nextRow()) }, @@ -78,11 +81,46 @@ internal class OdcVmTraceFormatTest { reader.close() } + @Test + fun testResourcesWrite() { + val path = Files.createTempDirectory("opendc") + val writer = format.newWriter(path, TABLE_RESOURCES) + + writer.startRow() + writer.set(RESOURCE_ID, "1019") + writer.set(RESOURCE_START_TIME, Instant.EPOCH) + writer.set(RESOURCE_STOP_TIME, Instant.EPOCH) + writer.setInt(RESOURCE_CPU_COUNT, 1) + writer.setDouble(RESOURCE_CPU_CAPACITY, 1024.0) + writer.setDouble(RESOURCE_MEM_CAPACITY, 1024.0) + writer.endRow() + writer.close() + + val reader = format.newReader(path, TABLE_RESOURCES, null) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("1019", reader.get(RESOURCE_ID)) }, + { assertEquals(Instant.EPOCH, reader.get(RESOURCE_START_TIME)) }, + { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STOP_TIME)) }, + { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, + { assertEquals(1024.0, reader.getDouble(RESOURCE_CPU_CAPACITY)) }, + { assertEquals(1024.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) }, + { assertFalse(reader.nextRow()) }, + ) + + reader.close() + } + @ParameterizedTest @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testSmoke(name: String) { val path = Paths.get("src/test/resources/$name") - val reader = format.newReader(path, TABLE_RESOURCE_STATES) + val reader = format.newReader( + path, + TABLE_RESOURCE_STATES, + listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_CPU_USAGE) + ) assertAll( { assertTrue(reader.nextRow()) }, @@ -95,9 +133,40 @@ internal class OdcVmTraceFormatTest { } @Test + fun testResourceStatesWrite() { + val path = Files.createTempDirectory("opendc") + val writer = format.newWriter(path, TABLE_RESOURCE_STATES) + + writer.startRow() + writer.set(RESOURCE_ID, "1019") + writer.set(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH) + writer.setDouble(RESOURCE_STATE_CPU_USAGE, 23.0) + writer.setInt(RESOURCE_CPU_COUNT, 1) + writer.endRow() + writer.close() + + val reader = format.newReader(path, TABLE_RESOURCE_STATES, null) + + assertAll( + { assertTrue(reader.nextRow()) }, + { assertEquals("1019", reader.get(RESOURCE_ID)) }, + { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STATE_TIMESTAMP)) }, + { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) }, + { assertEquals(23.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE)) }, + { assertFalse(reader.nextRow()) }, + ) + + reader.close() + } + + @Test fun testInterferenceGroups() { val path = Paths.get("src/test/resources/trace-v2.1") - val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS) + val reader = format.newReader( + path, + TABLE_INTERFERENCE_GROUPS, + listOf(INTERFERENCE_GROUP_MEMBERS, INTERFERENCE_GROUP_TARGET, INTERFERENCE_GROUP_SCORE) + ) assertAll( { assertTrue(reader.nextRow()) }, @@ -117,7 +186,7 @@ internal class OdcVmTraceFormatTest { @Test fun testInterferenceGroupsEmpty() { val path = Paths.get("src/test/resources/trace-v2.0") - val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS) + val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS, listOf(INTERFERENCE_GROUP_MEMBERS)) assertFalse(reader.nextRow()) reader.close() diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts index 302c0b14..e6415586 100644 --- a/opendc-trace/opendc-trace-parquet/build.gradle.kts +++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts @@ -32,7 +32,7 @@ dependencies { api(libs.parquet) { exclude(group = "org.apache.hadoop") } - runtimeOnly(libs.hadoop.common) { + api(libs.hadoop.common) { exclude(group = "org.slf4j", module = "slf4j-log4j12") exclude(group = "log4j") exclude(group = "org.apache.hadoop") diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt index ef9eaeb3..eef83956 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt @@ -22,8 +22,8 @@ package org.opendc.trace.util.parquet -import org.apache.parquet.avro.AvroParquetReader import org.apache.parquet.hadoop.ParquetReader +import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.InputFile import java.io.File import java.io.IOException @@ -32,11 +32,17 @@ import java.nio.file.Path import kotlin.io.path.isDirectory /** - * A helper class to read Parquet files. + * A helper class to read Parquet files from the filesystem. + * + * This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets. * * @param path The path to the Parquet file or directory to read. + * @param readSupport Helper class to perform conversion from Parquet to [T]. */ -public class LocalParquetReader<out T>(path: Path) : AutoCloseable { +public class LocalParquetReader<out T>( + path: Path, + private val readSupport: ReadSupport<T> +) : AutoCloseable { /** * The input files to process. */ @@ -57,7 +63,7 @@ public class LocalParquetReader<out T>(path: Path) : AutoCloseable { /** * Construct a [LocalParquetReader] for the specified [file]. */ - public constructor(file: File) : this(file.toPath()) + public constructor(file: File, readSupport: ReadSupport<T>) : this(file.toPath(), readSupport) /** * Read a single entry in the Parquet file. @@ -93,20 +99,24 @@ public class LocalParquetReader<out T>(path: Path) : AutoCloseable { private fun initReader() { reader?.close() - this.reader = if (filesIterator.hasNext()) { - createReader(filesIterator.next()) - } else { - null + try { + this.reader = if (filesIterator.hasNext()) { + createReader(filesIterator.next()) + } else { + null + } + } catch (e: Throwable) { + this.reader = null + throw e } } /** - * Create a Parquet reader for the specified file. + * Construct a [ParquetReader] for the specified [input] with a custom [ReadSupport]. */ private fun createReader(input: InputFile): ParquetReader<T> { - return AvroParquetReader - .builder<T>(input) - .disableCompatibility() - .build() + return object : ParquetReader.Builder<T>(input) { + override fun getReadSupport(): ReadSupport<@UnsafeVariance T> = this@LocalParquetReader.readSupport + }.build() } } diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt new file mode 100644 index 00000000..b5eb1deb --- /dev/null +++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.util.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.OutputFile +import java.nio.file.Path + +/** + * Helper class for writing Parquet records to local disk. + */ +public class LocalParquetWriter { + /** + * A [ParquetWriter.Builder] implementation supporting custom [OutputFile]s and [WriteSupport] implementations. + */ + public class Builder<T> internal constructor( + output: OutputFile, + private val writeSupport: WriteSupport<T> + ) : ParquetWriter.Builder<T, Builder<T>>(output) { + override fun self(): Builder<T> = this + + override fun getWriteSupport(conf: Configuration): WriteSupport<T> = writeSupport + } + + public companion object { + /** + * Create a [Builder] instance that writes a Parquet file at the specified [path]. + */ + @JvmStatic + public fun <T> builder(path: Path, writeSupport: WriteSupport<T>): Builder<T> = + Builder(LocalOutputFile(path), writeSupport) + } +} diff --git a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt index 8ef4d1fb..be354319 100644 --- a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt +++ b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt @@ -22,36 +22,81 @@ package org.opendc.trace.util.parquet -import org.apache.avro.SchemaBuilder -import org.apache.avro.generic.GenericData -import org.apache.parquet.avro.AvroParquetReader -import org.apache.parquet.avro.AvroParquetWriter +import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.ParquetFileWriter +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.io.api.Converter +import org.apache.parquet.io.api.GroupConverter +import org.apache.parquet.io.api.PrimitiveConverter +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.PrimitiveType +import org.apache.parquet.schema.Type +import org.apache.parquet.schema.Types import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals -import java.io.File import java.nio.file.FileAlreadyExistsException +import java.nio.file.Files import java.nio.file.NoSuchFileException +import java.nio.file.Path /** * Test suite for the Parquet helper classes. */ internal class ParquetTest { - private val schema = SchemaBuilder - .record("test") - .namespace("org.opendc.format.util") - .fields() - .name("field").type().intType().noDefault() - .endRecord() + private lateinit var path: Path - private lateinit var file: File + private val schema = Types.buildMessage() + .addField( + Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED) + .named("field") + ) + .named("test") + private val writeSupport = object : WriteSupport<Int>() { + lateinit var recordConsumer: RecordConsumer + + override fun init(configuration: Configuration): WriteContext { + return WriteContext(schema, emptyMap()) + } + + override fun prepareForWrite(recordConsumer: RecordConsumer) { + this.recordConsumer = recordConsumer + } + + override fun write(record: Int) { + val consumer = recordConsumer + + consumer.startMessage() + consumer.startField("field", 0) + consumer.addInteger(record) + consumer.endField("field", 0) + consumer.endMessage() + } + } + + private val readSupport = object : ReadSupport<Int>() { + override fun init( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType + ): ReadContext = ReadContext(fileSchema) + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType, + readContext: ReadContext + ): RecordMaterializer<Int> = TestRecordMaterializer() + } /** - * Setup the test + * Set up the test */ @BeforeEach fun setUp() { - file = File.createTempFile("opendc", "parquet") + path = Files.createTempFile("opendc", "parquet") } /** @@ -59,7 +104,7 @@ internal class ParquetTest { */ @AfterEach fun tearDown() { - file.delete() + Files.deleteIfExists(path) } /** @@ -68,29 +113,24 @@ internal class ParquetTest { @Test fun testSmoke() { val n = 4 - val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(file)) - .withSchema(schema) + val writer = LocalParquetWriter.builder(path, writeSupport) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build() try { repeat(n) { i -> - val record = GenericData.Record(schema) - record.put("field", i) - writer.write(record) + writer.write(i) } } finally { writer.close() } - val reader = AvroParquetReader.builder<GenericData.Record>(LocalInputFile(file)) - .build() - + val reader = LocalParquetReader(path, readSupport) var counter = 0 try { while (true) { val record = reader.read() ?: break - assertEquals(counter++, record.get("field")) + assertEquals(counter++, record) } } finally { reader.close() @@ -105,9 +145,7 @@ internal class ParquetTest { @Test fun testOverwrite() { assertThrows<FileAlreadyExistsException> { - AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(file)) - .withSchema(schema) - .build() + LocalParquetWriter.builder(path, writeSupport).build() } } @@ -116,10 +154,30 @@ internal class ParquetTest { */ @Test fun testNonExistent() { - file.delete() + Files.deleteIfExists(path) assertThrows<NoSuchFileException> { - AvroParquetReader.builder<GenericData.Record>(LocalInputFile(file)) - .build() + LocalParquetReader(path, readSupport) + } + } + + private class TestRecordMaterializer : RecordMaterializer<Int>() { + private var current: Int = 0 + private val fieldConverter = object : PrimitiveConverter() { + override fun addInt(value: Int) { + current = value + } + } + private val root = object : GroupConverter() { + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return fieldConverter + } + override fun start() {} + override fun end() {} } + + override fun getCurrentRecord(): Int = current + + override fun getRootConverter(): GroupConverter = root } } diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt index b969f3ef..916a5eca 100644 --- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt +++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt @@ -64,7 +64,7 @@ public class SwfTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { return when (table) { TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader()) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt index 1698f644..c3d644e8 100644 --- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt @@ -58,7 +58,7 @@ internal class SwfTraceFormatTest { @Test fun testReader() { val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI()) - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, null) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/opendc-trace/opendc-trace-tools/build.gradle.kts b/opendc-trace/opendc-trace-tools/build.gradle.kts index 0c1e179e..e98fb932 100644 --- a/opendc-trace/opendc-trace-tools/build.gradle.kts +++ b/opendc-trace/opendc-trace-tools/build.gradle.kts @@ -29,16 +29,22 @@ plugins { } application { - mainClass.set("org.opendc.trace.tools.TraceConverter") + mainClass.set("org.opendc.trace.tools.TraceTools") } dependencies { implementation(projects.opendcTrace.opendcTraceApi) + implementation(projects.opendcTrace.opendcTraceCalcite) implementation(libs.kotlin.logging) implementation(libs.clikt) + implementation(libs.jline) runtimeOnly(projects.opendcTrace.opendcTraceOpendc) runtimeOnly(projects.opendcTrace.opendcTraceBitbrains) runtimeOnly(projects.opendcTrace.opendcTraceAzure) + runtimeOnly(projects.opendcTrace.opendcTraceGwf) + runtimeOnly(projects.opendcTrace.opendcTraceSwf) + runtimeOnly(projects.opendcTrace.opendcTraceWfformat) + runtimeOnly(projects.opendcTrace.opendcTraceWtf) runtimeOnly(libs.log4j.slf4j) } diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt index c71035d4..970de0f4 100644 --- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt @@ -20,7 +20,6 @@ * SOFTWARE. */ -@file:JvmName("TraceConverter") package org.opendc.trace.tools import com.github.ajalt.clikt.core.CliktCommand @@ -44,14 +43,9 @@ import kotlin.math.max import kotlin.math.min /** - * A script to convert a trace in text format into a Parquet trace. + * A [CliktCommand] that can convert between workload trace formats. */ -fun main(args: Array<String>): Unit = TraceConverterCli().main(args) - -/** - * Represents the command for converting traces - */ -internal class TraceConverterCli : CliktCommand(name = "trace-converter") { +internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert between workload trace formats") { /** * The logger instance for the converter. */ @@ -73,7 +67,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") { /** * The input format of the trace. */ - private val inputFormat by option("-f", "--input-format", help = "format of output trace") + private val inputFormat by option("-f", "--input-format", help = "format of input trace") .required() /** diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt new file mode 100644 index 00000000..b0f95de2 --- /dev/null +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.tools + +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.parameters.arguments.argument +import com.github.ajalt.clikt.parameters.options.option +import com.github.ajalt.clikt.parameters.options.required +import com.github.ajalt.clikt.parameters.types.file +import org.apache.calcite.jdbc.CalciteConnection +import org.jline.builtins.Styles +import org.jline.console.Printer +import org.jline.console.impl.DefaultPrinter +import org.jline.terminal.Terminal +import org.jline.terminal.TerminalBuilder +import org.jline.utils.AttributedStringBuilder +import org.opendc.trace.Trace +import org.opendc.trace.calcite.TraceSchema +import java.nio.charset.StandardCharsets +import java.sql.DriverManager +import java.sql.ResultSet +import java.sql.ResultSetMetaData +import java.util.* + +/** + * A [CliktCommand] that allows users to query workload traces using SQL. + */ +internal class QueryCommand : CliktCommand(name = "query", help = "Query workload traces") { + /** + * The trace to open. + */ + private val input by option("-i", "--input") + .file(mustExist = true) + .required() + + /** + * The input format of the trace. + */ + private val inputFormat by option("-f", "--format", help = "format of the trace") + .required() + + /** + * The query to execute. + */ + private val query by argument() + + /** + * Access to the terminal. + */ + private val terminal = TerminalBuilder.builder() + .system(false) + .streams(System.`in`, System.out) + .encoding(StandardCharsets.UTF_8) + .build() + + /** + * Helper class to print results to console. + */ + private val printer = QueryPrinter(terminal) + + override fun run() { + val inputTrace = Trace.open(input, format = inputFormat) + val info = Properties().apply { this["lex"] = "JAVA" } + val connection = DriverManager.getConnection("jdbc:calcite:", info).unwrap(CalciteConnection::class.java) + connection.rootSchema.add("trace", TraceSchema(inputTrace)) + connection.schema = "trace" + + val stmt = connection.createStatement() + stmt.executeQuery(query) + + val start = System.currentTimeMillis() + val hasResults = stmt.execute(query) + + try { + if (hasResults) { + do { + stmt.resultSet.use { rs -> + val count: Int = printResults(rs) + val duration = (System.currentTimeMillis() - start) / 1000.0 + printer.println("$count rows selected (${"%.3f".format(duration)} seconds)") + } + } while (stmt.moreResults) + } else { + val count: Int = stmt.updateCount + val duration = (System.currentTimeMillis() - start) / 1000.0 + + printer.println("$count rows affected (${"%0.3f".format(duration)} seconds)") + } + } finally { + stmt.close() + connection.close() + } + } + + /** + * Helper function to print the results to console. + */ + private fun printResults(rs: ResultSet): Int { + var count = 0 + val meta: ResultSetMetaData = rs.metaData + + val options = mapOf( + Printer.COLUMNS to List(meta.columnCount) { meta.getColumnName(it + 1) }, + Printer.BORDER to "|", + ) + val data = mutableListOf<Map<String, Any>>() + + while (rs.next()) { + val row = mutableMapOf<String, Any>() + for (i in 1..meta.columnCount) { + row[meta.getColumnName(i)] = rs.getObject(i) + } + data.add(row) + + count++ + } + + printer.println(options, data) + + return count + } + + /** + * Helper class to print the results of the query. + */ + private class QueryPrinter(private val terminal: Terminal) : DefaultPrinter(null) { + override fun terminal(): Terminal = terminal + + override fun highlightAndPrint(options: MutableMap<String, Any>, exception: Throwable) { + if (options.getOrDefault("exception", "stack") == "stack") { + exception.printStackTrace() + } else { + val asb = AttributedStringBuilder() + asb.append(exception.message, Styles.prntStyle().resolve(".em")) + asb.toAttributedString().println(terminal()) + } + } + } +} diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceTools.kt index 086b900b..b480484b 100644 --- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt +++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceTools.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,25 +20,25 @@ * SOFTWARE. */ -@file:JvmName("AvroUtils") -package org.opendc.trace.util.parquet +@file:JvmName("TraceTools") +package org.opendc.trace.tools -import org.apache.avro.LogicalTypes -import org.apache.avro.Schema +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.core.subcommands /** - * Schema for UUID type. + * A script for querying and manipulating workload traces supported by OpenDC. */ -public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)) +fun main(args: Array<String>): Unit = TraceToolsCli().main(args) /** - * Schema for timestamp type. + * The primary [CliktCommand] for the trace tools offered by OpenDC. */ -public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)) +class TraceToolsCli : CliktCommand(name = "trace-tools") { + init { + subcommands(QueryCommand()) + subcommands(ConvertCommand()) + } -/** - * Helper function to make a [Schema] field optional. - */ -public fun Schema.optional(): Schema { - return Schema.createUnion(Schema.create(Schema.Type.NULL), this) + override fun run() {} } diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt index bc175b58..8db4c169 100644 --- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt +++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt @@ -63,7 +63,7 @@ public class WfFormatTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { return when (table) { TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile())) else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt index 710de88e..4a8b2792 100644 --- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt @@ -62,7 +62,7 @@ class WfFormatTraceFormatTest { @Test fun testTableReader() { val path = Paths.get("src/test/resources/trace.json") - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, null) assertAll( { assertTrue(reader.nextRow()) }, @@ -89,7 +89,7 @@ class WfFormatTraceFormatTest { @Test fun testTableReaderFull() { val path = Paths.get("src/test/resources/trace.json") - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, null) assertDoesNotThrow { while (reader.nextRow()) { diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt index 1e332aca..f0db78b7 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt @@ -22,38 +22,30 @@ package org.opendc.trace.wtf -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.conv.* import org.opendc.trace.util.parquet.LocalParquetReader -import java.time.Duration -import java.time.Instant +import org.opendc.trace.wtf.parquet.Task /** * A [TableReader] implementation for the WTF format. */ -internal class WtfTaskTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader { +internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) : TableReader { /** * The current record. */ - private var record: GenericRecord? = null - - /** - * A flag to indicate that the columns have been initialized. - */ - private var hasInitializedColumns = false + private var record: Task? = null override fun nextRow(): Boolean { - val record = reader.read() - this.record = record + try { + val record = reader.read() + this.record = record - if (!hasInitializedColumns && record != null) { - initColumns(record.schema) - hasInitializedColumns = true + return record != null + } catch (e: Throwable) { + this.record = null + throw e } - - return record != null } override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1 @@ -65,16 +57,15 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic override fun get(index: Int): Any? { val record = checkNotNull(record) { "Reader in invalid state" } - @Suppress("UNCHECKED_CAST") return when (index) { - COL_ID -> (record[AVRO_COL_ID] as Long).toString() - COL_WORKFLOW_ID -> (record[AVRO_COL_WORKFLOW_ID] as Long).toString() - COL_SUBMIT_TIME -> Instant.ofEpochMilli(record[AVRO_COL_SUBMIT_TIME] as Long) - COL_WAIT_TIME -> Duration.ofMillis(record[AVRO_COL_WAIT_TIME] as Long) - COL_RUNTIME -> Duration.ofMillis(record[AVRO_COL_RUNTIME] as Long) + COL_ID -> record.id + COL_WORKFLOW_ID -> record.workflowId + COL_SUBMIT_TIME -> record.submitTime + COL_WAIT_TIME -> record.waitTime + COL_RUNTIME -> record.runtime COL_REQ_NCPUS, COL_GROUP_ID, COL_USER_ID -> getInt(index) - COL_PARENTS -> (record[AVRO_COL_PARENTS] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet() - COL_CHILDREN -> (record[AVRO_COL_CHILDREN] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet() + COL_PARENTS -> record.parents + COL_CHILDREN -> record.children else -> throw IllegalArgumentException("Invalid column") } } @@ -87,9 +78,9 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic val record = checkNotNull(record) { "Reader in invalid state" } return when (index) { - COL_REQ_NCPUS -> (record[AVRO_COL_REQ_NCPUS] as Double).toInt() - COL_GROUP_ID -> record[AVRO_COL_GROUP_ID] as Int - COL_USER_ID -> record[AVRO_COL_USER_ID] as Int + COL_REQ_NCPUS -> record.requestedCpus + COL_GROUP_ID -> record.groupId + COL_USER_ID -> record.userId else -> throw IllegalArgumentException("Invalid column") } } @@ -106,38 +97,6 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic reader.close() } - /** - * Initialize the columns for the reader based on [schema]. - */ - private fun initColumns(schema: Schema) { - try { - AVRO_COL_ID = schema.getField("id").pos() - AVRO_COL_WORKFLOW_ID = schema.getField("workflow_id").pos() - AVRO_COL_SUBMIT_TIME = schema.getField("ts_submit").pos() - AVRO_COL_WAIT_TIME = schema.getField("wait_time").pos() - AVRO_COL_RUNTIME = schema.getField("runtime").pos() - AVRO_COL_REQ_NCPUS = schema.getField("resource_amount_requested").pos() - AVRO_COL_PARENTS = schema.getField("parents").pos() - AVRO_COL_CHILDREN = schema.getField("children").pos() - AVRO_COL_GROUP_ID = schema.getField("group_id").pos() - AVRO_COL_USER_ID = schema.getField("user_id").pos() - } catch (e: NullPointerException) { - // This happens when the field we are trying to access does not exist - throw IllegalArgumentException("Invalid schema", e) - } - } - - private var AVRO_COL_ID = -1 - private var AVRO_COL_WORKFLOW_ID = -1 - private var AVRO_COL_SUBMIT_TIME = -1 - private var AVRO_COL_WAIT_TIME = -1 - private var AVRO_COL_RUNTIME = -1 - private var AVRO_COL_REQ_NCPUS = -1 - private var AVRO_COL_PARENTS = -1 - private var AVRO_COL_CHILDREN = -1 - private var AVRO_COL_GROUP_ID = -1 - private var AVRO_COL_USER_ID = -1 - private val COL_ID = 0 private val COL_WORKFLOW_ID = 1 private val COL_SUBMIT_TIME = 2 diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt index c8f9ecaa..e71253ac 100644 --- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt @@ -22,12 +22,12 @@ package org.opendc.trace.wtf -import org.apache.avro.generic.GenericRecord import org.opendc.trace.* import org.opendc.trace.conv.* import org.opendc.trace.spi.TableDetails import org.opendc.trace.spi.TraceFormat import org.opendc.trace.util.parquet.LocalParquetReader +import org.opendc.trace.wtf.parquet.TaskReadSupport import java.nio.file.Path /** @@ -63,10 +63,10 @@ public class WtfTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { return when (table) { TABLE_TASKS -> { - val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0")) + val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection)) WtfTaskTableReader(reader) } else -> throw IllegalArgumentException("Table $table not supported") diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt new file mode 100644 index 00000000..71557f96 --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf.parquet + +import java.time.Duration +import java.time.Instant + +/** + * A task in the Workflow Trace Format. + */ +internal data class Task( + val id: String, + val workflowId: String, + val submitTime: Instant, + val waitTime: Duration, + val runtime: Duration, + val requestedCpus: Int, + val groupId: Int, + val userId: Int, + val parents: Set<String>, + val children: Set<String> +) diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt new file mode 100644 index 00000000..8e7325de --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport +import org.apache.parquet.io.api.RecordMaterializer +import org.apache.parquet.schema.* +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.* + +/** + * A [ReadSupport] instance for [Task] objects. + * + * @param projection The projection of the table to read. + */ +internal class TaskReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Task>() { + /** + * Mapping of table columns to their Parquet column names. + */ + private val colMap = mapOf<TableColumn<*>, String>( + TASK_ID to "id", + TASK_WORKFLOW_ID to "workflow_id", + TASK_SUBMIT_TIME to "ts_submit", + TASK_WAIT_TIME to "wait_time", + TASK_RUNTIME to "runtime", + TASK_REQ_NCPUS to "resource_amount_requested", + TASK_PARENTS to "parents", + TASK_CHILDREN to "children", + TASK_GROUP_ID to "group_id", + TASK_USER_ID to "user_id" + ) + + override fun init(context: InitContext): ReadContext { + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val fieldByName = READ_SCHEMA.fields.associateBy { it.name } + + for (col in projection) { + val fieldName = colMap[col] ?: continue + addField(fieldByName.getValue(fieldName)) + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + return ReadContext(projectedSchema) + } + + override fun prepareForRead( + configuration: Configuration, + keyValueMetaData: Map<String, String>, + fileSchema: MessageType, + readContext: ReadContext + ): RecordMaterializer<Task> = TaskRecordMaterializer(readContext.requestedSchema) + + companion object { + /** + * Parquet read schema for the "tasks" table in the trace. + */ + @JvmStatic + val READ_SCHEMA: MessageType = Types.buildMessage() + .addFields( + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("workflow_id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("ts_submit"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("wait_time"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("runtime"), + Types + .optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("resource_amount_requested"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("user_id"), + Types + .optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("group_id"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) + .named("list") + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("children"), + Types + .buildGroup(Type.Repetition.OPTIONAL) + .addField( + Types.repeatedGroup() + .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item")) + .named("list") + ) + .`as`(LogicalTypeAnnotation.listType()) + .named("parents"), + ) + .named("task") + } +} diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt new file mode 100644 index 00000000..08da5eaf --- /dev/null +++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.trace.wtf.parquet + +import org.apache.parquet.io.api.* +import org.apache.parquet.schema.MessageType +import java.time.Duration +import java.time.Instant +import kotlin.math.roundToInt + +/** + * A [RecordMaterializer] for [Task] records. + */ +internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<Task>() { + /** + * State of current record being read. + */ + private var _id = "" + private var _workflowId = "" + private var _submitTime = Instant.MIN + private var _waitTime = Duration.ZERO + private var _runtime = Duration.ZERO + private var _requestedCpus = 0 + private var _groupId = 0 + private var _userId = 0 + private var _parents = mutableSetOf<String>() + private var _children = mutableSetOf<String>() + + /** + * Root converter for the record. + */ + private val root = object : GroupConverter() { + /** + * The converters for the columns of the schema. + */ + private val converters = schema.fields.map { type -> + when (type.name) { + "id" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _id = value.toString() + } + } + "workflow_id" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _workflowId = value.toString() + } + } + "ts_submit" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _submitTime = Instant.ofEpochMilli(value) + } + } + "wait_time" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _waitTime = Duration.ofMillis(value) + } + } + "runtime" -> object : PrimitiveConverter() { + override fun addLong(value: Long) { + _runtime = Duration.ofMillis(value) + } + } + "resource_amount_requested" -> object : PrimitiveConverter() { + override fun addDouble(value: Double) { + _requestedCpus = value.roundToInt() + } + } + "group_id" -> object : PrimitiveConverter() { + override fun addInt(value: Int) { + _groupId = value + } + } + "user_id" -> object : PrimitiveConverter() { + override fun addInt(value: Int) { + _userId = value + } + } + "children" -> RelationConverter(_children) + "parents" -> RelationConverter(_parents) + else -> error("Unknown column $type") + } + } + + override fun start() { + _id = "" + _workflowId = "" + _submitTime = Instant.MIN + _waitTime = Duration.ZERO + _runtime = Duration.ZERO + _requestedCpus = 0 + _groupId = 0 + _userId = 0 + _parents.clear() + _children.clear() + } + + override fun end() {} + + override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex] + } + + override fun getCurrentRecord(): Task = Task( + _id, + _workflowId, + _submitTime, + _waitTime, + _runtime, + _requestedCpus, + _groupId, + _userId, + _parents.toSet(), + _children.toSet() + ) + + override fun getRootConverter(): GroupConverter = root + + /** + * Helper class to convert parent and child relations and add them to [relations]. + */ + private class RelationConverter(private val relations: MutableSet<String>) : GroupConverter() { + private val entryConverter = object : PrimitiveConverter() { + override fun addLong(value: Long) { + relations.add(value.toString()) + } + } + + private val listConverter = object : GroupConverter() { + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return entryConverter + } + + override fun start() {} + override fun end() {} + } + + override fun getConverter(fieldIndex: Int): Converter { + require(fieldIndex == 0) + return listConverter + } + + override fun start() {} + override fun end() {} + } +} diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt index 0f0e422d..c0eb3f08 100644 --- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt @@ -61,7 +61,7 @@ class WtfTraceFormatTest { @Test fun testTableReader() { val path = Paths.get("src/test/resources/wtf-trace") - val reader = format.newReader(path, TABLE_TASKS) + val reader = format.newReader(path, TABLE_TASKS, listOf(TASK_ID, TASK_WORKFLOW_ID, TASK_SUBMIT_TIME, TASK_RUNTIME, TASK_PARENTS)) assertAll( { assertTrue(reader.nextRow()) }, diff --git a/settings.gradle.kts b/settings.gradle.kts index cc454b19..a779edcc 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -59,6 +59,7 @@ include(":opendc-trace:opendc-trace-bitbrains") include(":opendc-trace:opendc-trace-azure") include(":opendc-trace:opendc-trace-opendc") include(":opendc-trace:opendc-trace-parquet") +include(":opendc-trace:opendc-trace-calcite") include(":opendc-trace:opendc-trace-tools") include(":opendc-harness:opendc-harness-api") include(":opendc-harness:opendc-harness-engine") |
