summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gradle/libs.versions.toml8
-rw-r--r--opendc-compute/opendc-compute-workload/build.gradle.kts2
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt32
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt214
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt196
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt121
-rw-r--r--opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt38
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt79
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt73
-rw-r--r--opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt67
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt6
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt2
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt7
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt15
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt25
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt4
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt25
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt4
-rw-r--r--opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt4
-rw-r--r--opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt4
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt2
-rw-r--r--opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt4
-rw-r--r--opendc-trace/opendc-trace-calcite/build.gradle.kts37
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt39
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt93
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt47
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt50
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt176
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt138
-rw-r--r--opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt65
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt158
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt78
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/resources/model.json15
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json20
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquetbin0 -> 1679 bytes
-rw-r--r--opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquetbin0 -> 65174 bytes
-rw-r--r--opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt6
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt74
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt124
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt65
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt117
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt67
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt37
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt147
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt107
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt34
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt139
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt102
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt105
-rw-r--r--opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt114
-rw-r--r--opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt77
-rw-r--r--opendc-trace/opendc-trace-parquet/build.gradle.kts2
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt36
-rw-r--r--opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt55
-rw-r--r--opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt118
-rw-r--r--opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt2
-rw-r--r--opendc-trace/opendc-trace-tools/build.gradle.kts8
-rw-r--r--opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt (renamed from opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt)12
-rw-r--r--opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt159
-rw-r--r--opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceTools.kt (renamed from opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt)28
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt2
-rw-r--r--opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt4
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt81
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt6
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt42
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt134
-rw-r--r--opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt165
-rw-r--r--opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt2
-rw-r--r--settings.gradle.kts1
74 files changed, 3426 insertions, 604 deletions
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 43568067..b05af368 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -1,4 +1,5 @@
[versions]
+calcite = "1.30.0"
classgraph = "4.8.143"
clikt = "3.4.1"
config = "1.4.2"
@@ -8,6 +9,7 @@ gradle-node = "3.2.1"
hadoop = "3.3.1"
jackson = "2.13.2"
jandex-gradle = "0.12.0"
+jline = "3.21.0"
jmh-gradle = "0.6.6"
jakarta-validation = "2.0.2"
junit-jupiter = "5.8.2"
@@ -68,7 +70,7 @@ jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", ver
jackson-module-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" }
jackson-datatype-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" }
jackson-dataformat-csv = { module = "com.fasterxml.jackson.dataformat:jackson-dataformat-csv", version.ref = "jackson" }
-parquet = { module = "org.apache.parquet:parquet-avro", version.ref = "parquet" }
+parquet = { module = "org.apache.parquet:parquet-hadoop", version.ref = "parquet" }
config = { module = "com.typesafe:config", version.ref = "config" }
# Quarkus
@@ -101,6 +103,10 @@ quarkus-test-security = { module = "io.quarkus:quarkus-test-security" }
restassured-core = { module = "io.rest-assured:rest-assured" }
restassured-kotlin = { module = "io.rest-assured:kotlin-extensions" }
+# Calcite (SQL)
+calcite-core = { module = "org.apache.calcite:calcite-core", version.ref = "calcite" }
+jline = { module = "org.jline:jline", version.ref = "jline" }
+
# Other
classgraph = { module = "io.github.classgraph:classgraph", version.ref = "classgraph" }
jakarta-validation = { module = "jakarta.validation:jakarta.validation-api", version.ref = "jakarta-validation" }
diff --git a/opendc-compute/opendc-compute-workload/build.gradle.kts b/opendc-compute/opendc-compute-workload/build.gradle.kts
index 9ced95a7..319b2ae3 100644
--- a/opendc-compute/opendc-compute-workload/build.gradle.kts
+++ b/opendc-compute/opendc-compute-workload/build.gradle.kts
@@ -39,4 +39,6 @@ dependencies {
implementation(libs.opentelemetry.semconv)
implementation(libs.kotlin.logging)
+
+ testImplementation(libs.slf4j.simple)
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
index 84387bbc..c854d874 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetDataWriter.kt
@@ -23,14 +23,12 @@
package org.opendc.compute.workload.export.parquet
import mu.KotlinLogging
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.opendc.trace.util.parquet.LocalOutputFile
+import org.opendc.trace.util.parquet.LocalParquetWriter
import java.io.File
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.BlockingQueue
@@ -38,10 +36,13 @@ import kotlin.concurrent.thread
/**
* A writer that writes data in Parquet format.
+ *
+ * @param path The path to the file to write the data to.
+ * @param writeSupport The [WriteSupport] implementation for converting the records to Parquet format.
*/
public abstract class ParquetDataWriter<in T>(
path: File,
- private val schema: Schema,
+ private val writeSupport: WriteSupport<T>,
bufferSize: Int = 4096
) : AutoCloseable {
/**
@@ -52,7 +53,7 @@ public abstract class ParquetDataWriter<in T>(
/**
* The queue of records to process.
*/
- private val queue: BlockingQueue<GenericData.Record> = ArrayBlockingQueue(bufferSize)
+ private val queue: BlockingQueue<T> = ArrayBlockingQueue(bufferSize)
/**
* An exception to be propagated to the actual writer.
@@ -64,15 +65,15 @@ public abstract class ParquetDataWriter<in T>(
*/
private val writerThread = thread(start = false, name = this.toString()) {
val writer = let {
- val builder = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(path))
- .withSchema(schema)
+ val builder = LocalParquetWriter.builder(path.toPath(), writeSupport)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.withCompressionCodec(CompressionCodecName.ZSTD)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
buildWriter(builder)
}
val queue = queue
- val buf = mutableListOf<GenericData.Record>()
+ val buf = mutableListOf<T>()
var shouldStop = false
try {
@@ -101,16 +102,11 @@ public abstract class ParquetDataWriter<in T>(
/**
* Build the [ParquetWriter] used to write the Parquet files.
*/
- protected open fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ protected open fun buildWriter(builder: LocalParquetWriter.Builder<@UnsafeVariance T>): ParquetWriter<@UnsafeVariance T> {
return builder.build()
}
/**
- * Convert the specified [data] into a Parquet record.
- */
- protected abstract fun convert(builder: GenericRecordBuilder, data: T)
-
- /**
* Write the specified metrics to the database.
*/
public fun write(data: T) {
@@ -119,9 +115,7 @@ public abstract class ParquetDataWriter<in T>(
throw IllegalStateException("Writer thread failed", exception)
}
- val builder = GenericRecordBuilder(schema)
- convert(builder, data)
- queue.put(builder.build())
+ queue.put(data)
}
/**
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
index 2b7cac8f..0d5b6b34 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetHostDataWriter.kt
@@ -22,81 +22,189 @@
package org.opendc.compute.workload.export.parquet
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
import org.opendc.telemetry.compute.table.HostTableReader
-import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
-import org.opendc.trace.util.parquet.UUID_SCHEMA
-import org.opendc.trace.util.parquet.optional
+import org.opendc.trace.util.parquet.LocalParquetWriter
import java.io.File
+import java.util.*
/**
* A Parquet event writer for [HostTableReader]s.
*/
public class ParquetHostDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<HostTableReader>(path, SCHEMA, bufferSize) {
+ ParquetDataWriter<HostTableReader>(path, HostDataWriteSupport(), bufferSize) {
- override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ override fun buildWriter(builder: LocalParquetWriter.Builder<HostTableReader>): ParquetWriter<HostTableReader> {
return builder
.withDictionaryEncoding("host_id", true)
.build()
}
- override fun convert(builder: GenericRecordBuilder, data: HostTableReader) {
- builder["timestamp"] = data.timestamp.toEpochMilli()
+ override fun toString(): String = "host-writer"
- builder["host_id"] = data.host.id
+ /**
+ * A [WriteSupport] implementation for a [HostTableReader].
+ */
+ private class HostDataWriteSupport : WriteSupport<HostTableReader>() {
+ lateinit var recordConsumer: RecordConsumer
- builder["uptime"] = data.uptime
- builder["downtime"] = data.downtime
- val bootTime = data.bootTime
- builder["boot_time"] = bootTime?.toEpochMilli()
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(SCHEMA, emptyMap())
+ }
- builder["cpu_count"] = data.host.cpuCount
- builder["cpu_limit"] = data.cpuLimit
- builder["cpu_time_active"] = data.cpuActiveTime
- builder["cpu_time_idle"] = data.cpuIdleTime
- builder["cpu_time_steal"] = data.cpuStealTime
- builder["cpu_time_lost"] = data.cpuLostTime
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
- builder["mem_limit"] = data.host.memCapacity
+ override fun write(record: HostTableReader) {
+ write(recordConsumer, record)
+ }
- builder["power_total"] = data.powerTotal
+ private fun write(consumer: RecordConsumer, data: HostTableReader) {
+ consumer.startMessage()
- builder["guests_terminated"] = data.guestsTerminated
- builder["guests_running"] = data.guestsRunning
- builder["guests_error"] = data.guestsError
- builder["guests_invalid"] = data.guestsInvalid
- }
+ consumer.startField("timestamp", 0)
+ consumer.addLong(data.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 0)
- override fun toString(): String = "host-writer"
+ consumer.startField("host_id", 1)
+ consumer.addBinary(UUID.fromString(data.host.id).toBinary())
+ consumer.endField("host_id", 1)
+
+ consumer.startField("uptime", 2)
+ consumer.addLong(data.uptime)
+ consumer.endField("uptime", 2)
+
+ consumer.startField("downtime", 3)
+ consumer.addLong(data.downtime)
+ consumer.endField("downtime", 3)
+
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ consumer.startField("boot_time", 4)
+ consumer.addLong(bootTime.toEpochMilli())
+ consumer.endField("boot_time", 4)
+ }
+
+ consumer.startField("cpu_count", 5)
+ consumer.addInteger(data.host.cpuCount)
+ consumer.endField("cpu_count", 5)
+
+ consumer.startField("cpu_limit", 6)
+ consumer.addDouble(data.cpuLimit)
+ consumer.endField("cpu_limit", 6)
+
+ consumer.startField("cpu_time_active", 7)
+ consumer.addLong(data.cpuActiveTime)
+ consumer.endField("cpu_time_active", 7)
+
+ consumer.startField("cpu_time_idle", 8)
+ consumer.addLong(data.cpuIdleTime)
+ consumer.endField("cpu_time_idle", 8)
+
+ consumer.startField("cpu_time_steal", 9)
+ consumer.addLong(data.cpuStealTime)
+ consumer.endField("cpu_time_steal", 9)
+
+ consumer.startField("cpu_time_lost", 10)
+ consumer.addLong(data.cpuLostTime)
+ consumer.endField("cpu_time_lost", 10)
+
+ consumer.startField("mem_limit", 11)
+ consumer.addLong(data.host.memCapacity)
+ consumer.endField("mem_limit", 11)
+
+ consumer.startField("power_total", 12)
+ consumer.addDouble(data.powerTotal)
+ consumer.endField("power_total", 12)
+
+ consumer.startField("guests_terminated", 13)
+ consumer.addInteger(data.guestsTerminated)
+ consumer.endField("guests_terminated", 13)
+
+ consumer.startField("guests_running", 14)
+ consumer.addInteger(data.guestsRunning)
+ consumer.endField("guests_running", 14)
+
+ consumer.startField("guests_error", 15)
+ consumer.addInteger(data.guestsError)
+ consumer.endField("guests_error", 15)
+
+ consumer.startField("guests_invalid", 16)
+ consumer.addInteger(data.guestsInvalid)
+ consumer.endField("guests_invalid", 16)
+
+ consumer.endMessage()
+ }
+ }
private companion object {
- private val SCHEMA: Schema = SchemaBuilder
- .record("host")
- .namespace("org.opendc.telemetry.compute")
- .fields()
- .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
- .name("host_id").type(UUID_SCHEMA).noDefault()
- .requiredLong("uptime")
- .requiredLong("downtime")
- .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
- .requiredInt("cpu_count")
- .requiredDouble("cpu_limit")
- .requiredLong("cpu_time_active")
- .requiredLong("cpu_time_idle")
- .requiredLong("cpu_time_steal")
- .requiredLong("cpu_time_lost")
- .requiredLong("mem_limit")
- .requiredDouble("power_total")
- .requiredInt("guests_terminated")
- .requiredInt("guests_running")
- .requiredInt("guests_error")
- .requiredInt("guests_invalid")
- .endRecord()
+ /**
+ * The schema of the host data.
+ */
+ val SCHEMA: MessageType = Types
+ .buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+ .length(16)
+ .`as`(LogicalTypeAnnotation.uuidType())
+ .named("host_id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("uptime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("downtime"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("boot_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_limit"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_active"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_idle"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_steal"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_lost"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_limit"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("power_total"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_terminated"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_running"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_error"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("guests_invalid"),
+ )
+ .named("host")
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
index 144b6624..5d11629b 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServerDataWriter.kt
@@ -22,73 +22,177 @@
package org.opendc.compute.workload.export.parquet
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.avro.generic.GenericRecordBuilder
-import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
import org.opendc.telemetry.compute.table.ServerTableReader
-import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
-import org.opendc.trace.util.parquet.UUID_SCHEMA
-import org.opendc.trace.util.parquet.optional
+import org.opendc.trace.util.parquet.LocalParquetWriter
import java.io.File
+import java.util.*
/**
* A Parquet event writer for [ServerTableReader]s.
*/
public class ParquetServerDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServerTableReader>(path, SCHEMA, bufferSize) {
+ ParquetDataWriter<ServerTableReader>(path, ServerDataWriteSupport(), bufferSize) {
- override fun buildWriter(builder: AvroParquetWriter.Builder<GenericData.Record>): ParquetWriter<GenericData.Record> {
+ override fun buildWriter(builder: LocalParquetWriter.Builder<ServerTableReader>): ParquetWriter<ServerTableReader> {
return builder
.withDictionaryEncoding("server_id", true)
.withDictionaryEncoding("host_id", true)
.build()
}
- override fun convert(builder: GenericRecordBuilder, data: ServerTableReader) {
- builder["timestamp"] = data.timestamp.toEpochMilli()
+ override fun toString(): String = "server-writer"
- builder["server_id"] = data.server.id
- builder["host_id"] = data.host?.id
+ /**
+ * A [WriteSupport] implementation for a [ServerTableReader].
+ */
+ private class ServerDataWriteSupport : WriteSupport<ServerTableReader>() {
+ lateinit var recordConsumer: RecordConsumer
- builder["uptime"] = data.uptime
- builder["downtime"] = data.downtime
- builder["boot_time"] = data.bootTime?.toEpochMilli()
- builder["provision_time"] = data.provisionTime?.toEpochMilli()
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(SCHEMA, emptyMap())
+ }
- builder["cpu_count"] = data.server.cpuCount
- builder["cpu_limit"] = data.cpuLimit
- builder["cpu_time_active"] = data.cpuActiveTime
- builder["cpu_time_idle"] = data.cpuIdleTime
- builder["cpu_time_steal"] = data.cpuStealTime
- builder["cpu_time_lost"] = data.cpuLostTime
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
- builder["mem_limit"] = data.server.memCapacity
- }
+ override fun write(record: ServerTableReader) {
+ write(recordConsumer, record)
+ }
- override fun toString(): String = "server-writer"
+ private fun write(consumer: RecordConsumer, data: ServerTableReader) {
+ consumer.startMessage()
+
+ consumer.startField("timestamp", 0)
+ consumer.addLong(data.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 0)
+
+ consumer.startField("server_id", 1)
+ consumer.addBinary(UUID.fromString(data.server.id).toBinary())
+ consumer.endField("server_id", 1)
+
+ val hostId = data.host?.id
+ if (hostId != null) {
+ consumer.startField("host_id", 2)
+ consumer.addBinary(UUID.fromString(hostId).toBinary())
+ consumer.endField("host_id", 2)
+ }
+
+ consumer.startField("uptime", 3)
+ consumer.addLong(data.uptime)
+ consumer.endField("uptime", 3)
+
+ consumer.startField("downtime", 4)
+ consumer.addLong(data.downtime)
+ consumer.endField("downtime", 4)
+
+ val bootTime = data.bootTime
+ if (bootTime != null) {
+ consumer.startField("boot_time", 5)
+ consumer.addLong(bootTime.toEpochMilli())
+ consumer.endField("boot_time", 5)
+ }
+
+ val provisionTime = data.provisionTime
+ if (provisionTime != null) {
+ consumer.startField("provision_time", 6)
+ consumer.addLong(provisionTime.toEpochMilli())
+ consumer.endField("provision_time", 6)
+ }
+
+ consumer.startField("cpu_count", 7)
+ consumer.addInteger(data.server.cpuCount)
+ consumer.endField("cpu_count", 7)
+
+ consumer.startField("cpu_limit", 8)
+ consumer.addDouble(data.cpuLimit)
+ consumer.endField("cpu_limit", 8)
+
+ consumer.startField("cpu_time_active", 9)
+ consumer.addLong(data.cpuActiveTime)
+ consumer.endField("cpu_time_active", 9)
+
+ consumer.startField("cpu_time_idle", 10)
+ consumer.addLong(data.cpuIdleTime)
+ consumer.endField("cpu_time_idle", 10)
+
+ consumer.startField("cpu_time_steal", 11)
+ consumer.addLong(data.cpuStealTime)
+ consumer.endField("cpu_time_steal", 11)
+
+ consumer.startField("cpu_time_lost", 12)
+ consumer.addLong(data.cpuLostTime)
+ consumer.endField("cpu_time_lost", 12)
+
+ consumer.startField("mem_limit", 13)
+ consumer.addLong(data.server.memCapacity)
+ consumer.endField("mem_limit", 13)
+
+ consumer.endMessage()
+ }
+ }
private companion object {
- private val SCHEMA: Schema = SchemaBuilder
- .record("server")
- .namespace("org.opendc.telemetry.compute")
- .fields()
- .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
- .name("server_id").type(UUID_SCHEMA).noDefault()
- .name("host_id").type(UUID_SCHEMA.optional()).noDefault()
- .requiredLong("uptime")
- .requiredLong("downtime")
- .name("provision_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
- .name("boot_time").type(TIMESTAMP_SCHEMA.optional()).noDefault()
- .requiredInt("cpu_count")
- .requiredDouble("cpu_limit")
- .requiredLong("cpu_time_active")
- .requiredLong("cpu_time_idle")
- .requiredLong("cpu_time_steal")
- .requiredLong("cpu_time_lost")
- .requiredLong("mem_limit")
- .endRecord()
+ /**
+ * The schema of the server data.
+ */
+ val SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+ .length(16)
+ .`as`(LogicalTypeAnnotation.uuidType())
+ .named("server_id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+ .length(16)
+ .`as`(LogicalTypeAnnotation.uuidType())
+ .named("host_id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("uptime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("downtime"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("provision_time"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("boot_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_limit"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_active"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_idle"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_steal"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("cpu_time_lost"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_limit")
+ )
+ .named("server")
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
index ec8a2b65..5ad3b95e 100644
--- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/ParquetServiceDataWriter.kt
@@ -22,45 +22,108 @@
package org.opendc.compute.workload.export.parquet
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericRecordBuilder
+import io.opentelemetry.context.ContextKey.named
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
import org.opendc.telemetry.compute.table.ServiceTableReader
-import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
import java.io.File
/**
* A Parquet event writer for [ServiceTableReader]s.
*/
public class ParquetServiceDataWriter(path: File, bufferSize: Int) :
- ParquetDataWriter<ServiceTableReader>(path, SCHEMA, bufferSize) {
-
- override fun convert(builder: GenericRecordBuilder, data: ServiceTableReader) {
- builder["timestamp"] = data.timestamp.toEpochMilli()
- builder["hosts_up"] = data.hostsUp
- builder["hosts_down"] = data.hostsDown
- builder["servers_pending"] = data.serversPending
- builder["servers_active"] = data.serversActive
- builder["attempts_success"] = data.attemptsSuccess
- builder["attempts_failure"] = data.attemptsFailure
- builder["attempts_error"] = data.attemptsError
- }
+ ParquetDataWriter<ServiceTableReader>(path, ServiceDataWriteSupport(), bufferSize) {
override fun toString(): String = "service-writer"
+ /**
+ * A [WriteSupport] implementation for a [ServiceTableReader].
+ */
+ private class ServiceDataWriteSupport : WriteSupport<ServiceTableReader>() {
+ lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(SCHEMA, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: ServiceTableReader) {
+ write(recordConsumer, record)
+ }
+
+ private fun write(consumer: RecordConsumer, data: ServiceTableReader) {
+ consumer.startMessage()
+
+ consumer.startField("timestamp", 0)
+ consumer.addLong(data.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 0)
+
+ consumer.startField("hosts_up", 1)
+ consumer.addInteger(data.hostsUp)
+ consumer.endField("hosts_up", 1)
+
+ consumer.startField("hosts_down", 2)
+ consumer.addInteger(data.hostsDown)
+ consumer.endField("hosts_down", 2)
+
+ consumer.startField("servers_pending", 3)
+ consumer.addInteger(data.serversPending)
+ consumer.endField("servers_pending", 3)
+
+ consumer.startField("servers_active", 4)
+ consumer.addInteger(data.serversActive)
+ consumer.endField("servers_active", 4)
+
+ consumer.startField("attempts_success", 5)
+ consumer.addInteger(data.attemptsSuccess)
+ consumer.endField("attempts_pending", 5)
+
+ consumer.startField("attempts_failure", 6)
+ consumer.addInteger(data.attemptsFailure)
+ consumer.endField("attempts_failure", 6)
+
+ consumer.startField("attempts_error", 7)
+ consumer.addInteger(data.attemptsError)
+ consumer.endField("attempts_error", 7)
+
+ consumer.endMessage()
+ }
+ }
+
private companion object {
- private val SCHEMA: Schema = SchemaBuilder
- .record("service")
- .namespace("org.opendc.telemetry.compute")
- .fields()
- .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
- .requiredInt("hosts_up")
- .requiredInt("hosts_down")
- .requiredInt("servers_pending")
- .requiredInt("servers_active")
- .requiredInt("attempts_success")
- .requiredInt("attempts_failure")
- .requiredInt("attempts_error")
- .endRecord()
+ private val SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("hosts_up"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("hosts_down"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("servers_pending"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("servers_active"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("attempts_success"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("attempts_failure"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("attempts_error"),
+ )
+ .named("service")
}
}
diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt
new file mode 100644
index 00000000..9921f5b8
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/export/parquet/Utils.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import org.apache.parquet.io.api.Binary
+import java.nio.ByteBuffer
+import java.util.UUID
+
+/**
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+internal fun UUID.toBinary(): Binary {
+ val bb = ByteBuffer.wrap(ByteArray(16))
+ bb.putLong(mostSignificantBits)
+ bb.putLong(leastSignificantBits)
+ return Binary.fromConstantByteBuffer(bb)
+}
diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt
new file mode 100644
index 00000000..dae03513
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/HostDataWriterTest.kt
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.opendc.telemetry.compute.table.HostInfo
+import org.opendc.telemetry.compute.table.HostTableReader
+import java.nio.file.Files
+import java.time.Instant
+
+/**
+ * Test suite for [ParquetHostDataWriter]
+ */
+class HostDataWriterTest {
+ /**
+ * The path to write the data file to.
+ */
+ private val path = Files.createTempFile("opendc", "parquet")
+
+ /**
+ * The writer used to write the data.
+ */
+ private val writer = ParquetHostDataWriter(path.toFile(), bufferSize = 4096)
+
+ @AfterEach
+ fun tearDown() {
+ writer.close()
+ Files.deleteIfExists(path)
+ }
+
+ @Test
+ fun testSmoke() {
+ assertDoesNotThrow {
+ writer.write(object : HostTableReader {
+ override val timestamp: Instant = Instant.now()
+ override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096)
+ override val guestsTerminated: Int = 0
+ override val guestsRunning: Int = 0
+ override val guestsError: Int = 0
+ override val guestsInvalid: Int = 0
+ override val cpuLimit: Double = 4096.0
+ override val cpuUsage: Double = 1.0
+ override val cpuDemand: Double = 1.0
+ override val cpuUtilization: Double = 0.0
+ override val cpuActiveTime: Long = 1
+ override val cpuIdleTime: Long = 1
+ override val cpuStealTime: Long = 1
+ override val cpuLostTime: Long = 1
+ override val powerUsage: Double = 1.0
+ override val powerTotal: Double = 1.0
+ override val uptime: Long = 1
+ override val downtime: Long = 1
+ override val bootTime: Instant? = null
+ })
+ }
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt
new file mode 100644
index 00000000..280f5ef8
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServerDataWriterTest.kt
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.opendc.telemetry.compute.table.HostInfo
+import org.opendc.telemetry.compute.table.ServerInfo
+import org.opendc.telemetry.compute.table.ServerTableReader
+import java.nio.file.Files
+import java.time.Instant
+
+/**
+ * Test suite for [ParquetServerDataWriter]
+ */
+class ServerDataWriterTest {
+ /**
+ * The path to write the data file to.
+ */
+ private val path = Files.createTempFile("opendc", "parquet")
+
+ /**
+ * The writer used to write the data.
+ */
+ private val writer = ParquetServerDataWriter(path.toFile(), bufferSize = 4096)
+
+ @AfterEach
+ fun tearDown() {
+ writer.close()
+ Files.deleteIfExists(path)
+ }
+
+ @Test
+ fun testSmoke() {
+ assertDoesNotThrow {
+ writer.write(object : ServerTableReader {
+ override val timestamp: Instant = Instant.now()
+ override val server: ServerInfo = ServerInfo("id", "test", "vm", "x86", "test", "test", 2, 4096)
+ override val host: HostInfo = HostInfo("id", "test", "x86", 4, 4096)
+ override val cpuLimit: Double = 4096.0
+ override val cpuActiveTime: Long = 1
+ override val cpuIdleTime: Long = 1
+ override val cpuStealTime: Long = 1
+ override val cpuLostTime: Long = 1
+ override val uptime: Long = 1
+ override val downtime: Long = 1
+ override val provisionTime: Instant = timestamp
+ override val bootTime: Instant? = null
+ })
+ }
+ }
+}
diff --git a/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt
new file mode 100644
index 00000000..7ffa7186
--- /dev/null
+++ b/opendc-compute/opendc-compute-workload/src/test/kotlin/org/opendc/compute/workload/export/parquet/ServiceDataWriterTest.kt
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.compute.workload.export.parquet
+
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertDoesNotThrow
+import org.opendc.telemetry.compute.table.ServiceTableReader
+import java.nio.file.Files
+import java.time.Instant
+
+/**
+ * Test suite for [ParquetServiceDataWriter]
+ */
+class ServiceDataWriterTest {
+ /**
+ * The path to write the data file to.
+ */
+ private val path = Files.createTempFile("opendc", "parquet")
+
+ /**
+ * The writer used to write the data.
+ */
+ private val writer = ParquetServiceDataWriter(path.toFile(), bufferSize = 4096)
+
+ @AfterEach
+ fun tearDown() {
+ writer.close()
+ Files.deleteIfExists(path)
+ }
+
+ @Test
+ fun testSmoke() {
+ assertDoesNotThrow {
+ writer.write(object : ServiceTableReader {
+ override val timestamp: Instant = Instant.now()
+ override val hostsUp: Int = 1
+ override val hostsDown: Int = 0
+ override val serversPending: Int = 1
+ override val serversActive: Int = 1
+ override val attemptsSuccess: Int = 1
+ override val attemptsFailure: Int = 0
+ override val attemptsError: Int = 0
+ })
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
index b0181cbc..05d0234a 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/Table.kt
@@ -42,9 +42,11 @@ public interface Table {
public val partitionKeys: List<TableColumn<*>>
/**
- * Open a [TableReader] for this table.
+ * Open a [TableReader] for a projection of this table.
+ *
+ * @param projection The list of columns to fetch from the table or `null` if no projection is performed.
*/
- public fun newReader(): TableReader
+ public fun newReader(projection: List<TableColumn<*>>? = null): TableReader
/**
* Open a [TableWriter] for this table.
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt
index 776c40c0..b77a2982 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/TableColumn.kt
@@ -33,7 +33,7 @@ public class TableColumn<out T>(public val name: String, type: Class<T>) {
/**
* The type of the column.
*/
- private val type: Class<*> = type
+ public val type: Class<*> = type
/**
* Determine whether the type of the column is a subtype of [column].
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt
index 532f6d24..5e8859e4 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/InterferenceGroupColumns.kt
@@ -24,22 +24,21 @@
package org.opendc.trace.conv
import org.opendc.trace.TableColumn
-import org.opendc.trace.column
/**
* Members of the interference group.
*/
@JvmField
-public val INTERFERENCE_GROUP_MEMBERS: TableColumn<Set<String>> = column("interference_group:members")
+public val INTERFERENCE_GROUP_MEMBERS: TableColumn<Set<String>> = column("members")
/**
* Target load after which the interference occurs.
*/
@JvmField
-public val INTERFERENCE_GROUP_TARGET: TableColumn<Double> = column("interference_group:target")
+public val INTERFERENCE_GROUP_TARGET: TableColumn<Double> = column("target")
/**
* Performance score when the interference occurs.
*/
@JvmField
-public val INTERFERENCE_GROUP_SCORE: TableColumn<Double> = column("interference_group:score")
+public val INTERFERENCE_GROUP_SCORE: TableColumn<Double> = column("score")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
index e9fc5d44..e602e534 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceColumns.kt
@@ -24,47 +24,46 @@
package org.opendc.trace.conv
import org.opendc.trace.TableColumn
-import org.opendc.trace.column
import java.time.Instant
/**
* Identifier of the resource.
*/
@JvmField
-public val RESOURCE_ID: TableColumn<String> = column("resource:id")
+public val RESOURCE_ID: TableColumn<String> = column("id")
/**
* The cluster to which the resource belongs.
*/
@JvmField
-public val RESOURCE_CLUSTER_ID: TableColumn<String> = column("resource:cluster_id")
+public val RESOURCE_CLUSTER_ID: TableColumn<String> = column("cluster_id")
/**
* Start time for the resource.
*/
@JvmField
-public val RESOURCE_START_TIME: TableColumn<Instant> = column("resource:start_time")
+public val RESOURCE_START_TIME: TableColumn<Instant> = column("start_time")
/**
* End time for the resource.
*/
@JvmField
-public val RESOURCE_STOP_TIME: TableColumn<Instant> = column("resource:stop_time")
+public val RESOURCE_STOP_TIME: TableColumn<Instant> = column("stop_time")
/**
* Number of CPUs for the resource.
*/
@JvmField
-public val RESOURCE_CPU_COUNT: TableColumn<Int> = column("resource:cpu_count")
+public val RESOURCE_CPU_COUNT: TableColumn<Int> = column("cpu_count")
/**
* Total CPU capacity of the resource in MHz.
*/
@JvmField
-public val RESOURCE_CPU_CAPACITY: TableColumn<Double> = column("resource:cpu_capacity")
+public val RESOURCE_CPU_CAPACITY: TableColumn<Double> = column("cpu_capacity")
/**
* Memory capacity for the resource in KB.
*/
@JvmField
-public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = column("resource:mem_capacity")
+public val RESOURCE_MEM_CAPACITY: TableColumn<Double> = column("mem_capacity")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt
index d5bbafd7..3a44f817 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/ResourceStateColumns.kt
@@ -24,7 +24,6 @@
package org.opendc.trace.conv
import org.opendc.trace.TableColumn
-import org.opendc.trace.column
import java.time.Duration
import java.time.Instant
@@ -32,70 +31,70 @@ import java.time.Instant
* The timestamp at which the state was recorded.
*/
@JvmField
-public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = column("resource_state:timestamp")
+public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = column("timestamp")
/**
* Duration for the state.
*/
@JvmField
-public val RESOURCE_STATE_DURATION: TableColumn<Duration> = column("resource_state:duration")
+public val RESOURCE_STATE_DURATION: TableColumn<Duration> = column("duration")
/**
* A flag to indicate that the resource is powered on.
*/
@JvmField
-public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = column("resource_state:powered_on")
+public val RESOURCE_STATE_POWERED_ON: TableColumn<Boolean> = column("powered_on")
/**
* Total CPU usage of the resource in MHz.
*/
@JvmField
-public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = column("resource_state:cpu_usage")
+public val RESOURCE_STATE_CPU_USAGE: TableColumn<Double> = column("cpu_usage")
/**
* Total CPU usage of the resource in percentage.
*/
@JvmField
-public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = column("resource_state:cpu_usage_pct")
+public val RESOURCE_STATE_CPU_USAGE_PCT: TableColumn<Double> = column("cpu_usage_pct")
/**
* Total CPU demand of the resource in MHz.
*/
@JvmField
-public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = column("resource_state:cpu_demand")
+public val RESOURCE_STATE_CPU_DEMAND: TableColumn<Double> = column("cpu_demand")
/**
* CPU ready percentage.
*/
@JvmField
-public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = column("resource_state:cpu_ready_pct")
+public val RESOURCE_STATE_CPU_READY_PCT: TableColumn<Double> = column("cpu_ready_pct")
/**
* Memory usage of the resource in KB.
*/
@JvmField
-public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = column("resource_state:mem_usage")
+public val RESOURCE_STATE_MEM_USAGE: TableColumn<Double> = column("mem_usage")
/**
* Disk read throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = column("resource_state:disk_read")
+public val RESOURCE_STATE_DISK_READ: TableColumn<Double> = column("disk_read")
/**
* Disk write throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = column("resource_state:disk_write")
+public val RESOURCE_STATE_DISK_WRITE: TableColumn<Double> = column("disk_write")
/**
* Network receive throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_NET_RX: TableColumn<Double> = column("resource_state:net_rx")
+public val RESOURCE_STATE_NET_RX: TableColumn<Double> = column("net_rx")
/**
* Network transmit throughput of the resource in KB/s.
*/
@JvmField
-public val RESOURCE_STATE_NET_TX: TableColumn<Double> = column("resource_state:net_tx")
+public val RESOURCE_STATE_NET_TX: TableColumn<Double> = column("net_tx")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt
index 31a58360..a58505e9 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TableColumns.kt
@@ -21,7 +21,9 @@
*/
@file:JvmName("TableColumns")
-package org.opendc.trace
+package org.opendc.trace.conv
+
+import org.opendc.trace.TableColumn
/**
* Construct a [TableColumn] with the specified [name] and type [T].
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
index 397c0794..e6daafb7 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/conv/TaskColumns.kt
@@ -24,7 +24,6 @@
package org.opendc.trace.conv
import org.opendc.trace.TableColumn
-import org.opendc.trace.column
import java.time.Duration
import java.time.Instant
@@ -32,70 +31,70 @@ import java.time.Instant
* A column containing the task identifier.
*/
@JvmField
-public val TASK_ID: TableColumn<String> = column("task:id")
+public val TASK_ID: TableColumn<String> = column("id")
/**
* A column containing the identifier of the workflow.
*/
@JvmField
-public val TASK_WORKFLOW_ID: TableColumn<String> = column("task:workflow_id")
+public val TASK_WORKFLOW_ID: TableColumn<String> = column("workflow_id")
/**
* A column containing the submission time of the task.
*/
@JvmField
-public val TASK_SUBMIT_TIME: TableColumn<Instant> = column("task:submit_time")
+public val TASK_SUBMIT_TIME: TableColumn<Instant> = column("submit_time")
/**
* A column containing the wait time of the task.
*/
@JvmField
-public val TASK_WAIT_TIME: TableColumn<Instant> = column("task:wait_time")
+public val TASK_WAIT_TIME: TableColumn<Instant> = column("wait_time")
/**
* A column containing the runtime time of the task.
*/
@JvmField
-public val TASK_RUNTIME: TableColumn<Duration> = column("task:runtime")
+public val TASK_RUNTIME: TableColumn<Duration> = column("runtime")
/**
* A column containing the parents of a task.
*/
@JvmField
-public val TASK_PARENTS: TableColumn<Set<String>> = column("task:parents")
+public val TASK_PARENTS: TableColumn<Set<String>> = column("parents")
/**
* A column containing the children of a task.
*/
@JvmField
-public val TASK_CHILDREN: TableColumn<Set<String>> = column("task:children")
+public val TASK_CHILDREN: TableColumn<Set<String>> = column("children")
/**
* A column containing the requested CPUs of a task.
*/
@JvmField
-public val TASK_REQ_NCPUS: TableColumn<Int> = column("task:req_ncpus")
+public val TASK_REQ_NCPUS: TableColumn<Int> = column("req_ncpus")
/**
* A column containing the allocated CPUs of a task.
*/
@JvmField
-public val TASK_ALLOC_NCPUS: TableColumn<Int> = column("task:alloc_ncpus")
+public val TASK_ALLOC_NCPUS: TableColumn<Int> = column("alloc_ncpus")
/**
* A column containing the status of a task.
*/
@JvmField
-public val TASK_STATUS: TableColumn<Int> = column("task:status")
+public val TASK_STATUS: TableColumn<Int> = column("status")
/**
* A column containing the group id of a task.
*/
@JvmField
-public val TASK_GROUP_ID: TableColumn<Int> = column("task:group_id")
+public val TASK_GROUP_ID: TableColumn<Int> = column("group_id")
/**
* A column containing the user id of a task.
*/
@JvmField
-public val TASK_USER_ID: TableColumn<Int> = column("task:user_id")
+public val TASK_USER_ID: TableColumn<Int> = column("user_id")
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt
index 24551edb..b848e19a 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/internal/TableImpl.kt
@@ -43,7 +43,9 @@ internal class TableImpl(val trace: TraceImpl, override val name: String) : Tabl
override val partitionKeys: List<TableColumn<*>>
get() = details.partitionKeys
- override fun newReader(): TableReader = trace.format.newReader(trace.path, name)
+ override fun newReader(projection: List<TableColumn<*>>?): TableReader {
+ return trace.format.newReader(trace.path, name, projection)
+ }
override fun newWriter(): TableWriter = trace.format.newWriter(trace.path, name)
diff --git a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
index f2e610db..47761e0f 100644
--- a/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
+++ b/opendc-trace/opendc-trace-api/src/main/kotlin/org/opendc/trace/spi/TraceFormat.kt
@@ -22,6 +22,7 @@
package org.opendc.trace.spi
+import org.opendc.trace.TableColumn
import org.opendc.trace.TableReader
import org.opendc.trace.TableWriter
import java.nio.file.Path
@@ -68,10 +69,11 @@ public interface TraceFormat {
*
* @param path The path to the trace to open.
* @param table The name of the table to open a [TableReader] for.
+ * @param projection The list of [TableColumn]s to project or `null` if no projection is performed.
* @throws IllegalArgumentException If [table] does not exist.
* @return A [TableReader] instance for the table.
*/
- public fun newReader(path: Path, table: String): TableReader
+ public fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader
/**
* Open a [TableWriter] for the specified [table].
diff --git a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
index 8e3e60cc..73978990 100644
--- a/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
+++ b/opendc-trace/opendc-trace-azure/src/main/kotlin/org/opendc/trace/azure/AzureTraceFormat.kt
@@ -81,7 +81,7 @@ public class AzureTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_RESOURCES -> {
val stream = GZIPInputStream(path.resolve("vmtable/vmtable.csv.gz").inputStream())
diff --git a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
index 56f9a940..263d26ce 100644
--- a/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-azure/src/test/kotlin/org/opendc/trace/azure/AzureTraceFormatTest.kt
@@ -57,7 +57,7 @@ class AzureTraceFormatTest {
@Test
fun testResources() {
val path = Paths.get("src/test/resources/trace")
- val reader = format.newReader(path, TABLE_RESOURCES)
+ val reader = format.newReader(path, TABLE_RESOURCES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
{ assertEquals("x/XsOfHO4ocsV99i4NluqKDuxctW2MMVmwqOPAlg4wp8mqbBOe3wxBlQo0+Qx+uf", reader.get(RESOURCE_ID)) },
@@ -71,7 +71,7 @@ class AzureTraceFormatTest {
@Test
fun testSmoke() {
val path = Paths.get("src/test/resources/trace")
- val reader = format.newReader(path, TABLE_RESOURCE_STATES)
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
index 11d21a04..82e454ad 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormat.kt
@@ -72,7 +72,7 @@ public class BitbrainsExTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_RESOURCE_STATES -> newResourceStateReader(path)
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
index e1e7604a..a374e951 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/main/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormat.kt
@@ -81,7 +81,7 @@ public class BitbrainsTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_RESOURCES -> {
val vms = Files.walk(path, 1)
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
index 77429e3e..c944cb98 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsExTraceFormatTest.kt
@@ -59,7 +59,7 @@ internal class BitbrainsExTraceFormatTest {
@Test
fun testSmoke() {
val path = Paths.get("src/test/resources/vm.txt")
- val reader = format.newReader(path, TABLE_RESOURCE_STATES)
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
diff --git a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
index 9309beb1..841801e6 100644
--- a/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-bitbrains/src/test/kotlin/org/opendc/trace/bitbrains/BitbrainsTraceFormatTest.kt
@@ -57,7 +57,7 @@ class BitbrainsTraceFormatTest {
@Test
fun testResources() {
val path = Paths.get("src/test/resources/bitbrains.csv")
- val reader = format.newReader(path, TABLE_RESOURCES)
+ val reader = format.newReader(path, TABLE_RESOURCES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -71,7 +71,7 @@ class BitbrainsTraceFormatTest {
@Test
fun testSmoke() {
val path = Paths.get("src/test/resources/bitbrains.csv")
- val reader = format.newReader(path, TABLE_RESOURCE_STATES)
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
assertAll(
{ assertTrue(reader.nextRow()) },
diff --git a/opendc-trace/opendc-trace-calcite/build.gradle.kts b/opendc-trace/opendc-trace-calcite/build.gradle.kts
new file mode 100644
index 00000000..2ffdac3c
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/build.gradle.kts
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "Apache Calcite (SQL) integration for the OpenDC trace library"
+
+/* Build configuration */
+plugins {
+ `kotlin-library-conventions`
+}
+
+dependencies {
+ api(projects.opendcTrace.opendcTraceApi)
+
+ api(libs.calcite.core)
+
+ testRuntimeOnly(projects.opendcTrace.opendcTraceOpendc)
+ testRuntimeOnly(libs.slf4j.simple)
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt
new file mode 100644
index 00000000..9c7b69a2
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/InsertableTable.kt
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.linq4j.Enumerable
+import org.apache.calcite.schema.Table
+
+/**
+ * A Calcite [Table] to which rows can be inserted.
+ */
+internal interface InsertableTable : Table {
+ /**
+ * Insert [rows] into this table.
+ *
+ * @param rows The rows to insert into the table.
+ * @return The number of rows inserted.
+ */
+ fun insert(rows: Enumerable<Array<Any?>>): Long
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
new file mode 100644
index 00000000..1854f262
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceReaderEnumerator.kt
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.linq4j.Enumerator
+import org.opendc.trace.TableColumn
+import org.opendc.trace.TableReader
+import java.sql.Timestamp
+import java.time.Duration
+import java.time.Instant
+import java.util.concurrent.atomic.AtomicBoolean
+
+/**
+ * An [Enumerator] for a [TableReader].
+ */
+internal class TraceReaderEnumerator<E>(
+ private val reader: TableReader,
+ private val columns: List<TableColumn<*>>,
+ private val cancelFlag: AtomicBoolean
+) : Enumerator<E> {
+ private val columnIndices = columns.map { reader.resolve(it) }.toIntArray()
+ private var current: E? = null
+
+ override fun moveNext(): Boolean {
+ if (cancelFlag.get()) {
+ return false
+ }
+
+ val reader = reader
+ val res = reader.nextRow()
+
+ if (res) {
+ @Suppress("UNCHECKED_CAST")
+ current = convertRow(reader) as E
+ } else {
+ current = null
+ }
+
+ return res
+ }
+
+ override fun current(): E = checkNotNull(current)
+
+ override fun reset() {
+ throw UnsupportedOperationException()
+ }
+
+ override fun close() {
+ reader.close()
+ }
+
+ private fun convertRow(reader: TableReader): Array<Any?> {
+ val res = arrayOfNulls<Any?>(columns.size)
+ val columnIndices = columnIndices
+
+ for ((index, column) in columns.withIndex()) {
+ val columnIndex = columnIndices[index]
+ res[index] = convertColumn(reader, column, columnIndex)
+ }
+ return res
+ }
+
+ private fun convertColumn(reader: TableReader, column: TableColumn<*>, columnIndex: Int): Any? {
+ val value = reader.get(columnIndex)
+
+ return when (column.type) {
+ Instant::class.java -> Timestamp.from(value as Instant)
+ Duration::class.java -> (value as Duration).toMillis()
+ Set::class.java -> (value as Set<*>).toTypedArray()
+ else -> value
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt
new file mode 100644
index 00000000..3249546d
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchema.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.schema.Schema
+import org.apache.calcite.schema.Table
+import org.apache.calcite.schema.impl.AbstractSchema
+import org.opendc.trace.Trace
+
+/**
+ * A Calcite [Schema] that exposes an OpenDC [Trace] into multiple SQL tables.
+ *
+ * @param trace The [Trace] to create a schema for.
+ */
+public class TraceSchema(private val trace: Trace) : AbstractSchema() {
+ /**
+ * The [Table]s that belong to this schema.
+ */
+ private val tables: Map<String, TraceTable> by lazy {
+ trace.tables.associateWith {
+ val table = checkNotNull(trace.getTable(it)) { "Unexpected null table" }
+ TraceTable(table)
+ }
+ }
+
+ override fun getTableMap(): Map<String, Table> = tables
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt
new file mode 100644
index 00000000..3c6badc8
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceSchemaFactory.kt
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.model.ModelHandler
+import org.apache.calcite.schema.Schema
+import org.apache.calcite.schema.SchemaFactory
+import org.apache.calcite.schema.SchemaPlus
+import org.opendc.trace.Trace
+import java.io.File
+import java.nio.file.Paths
+
+/**
+ * Factory that creates a [TraceSchema].
+ *
+ * This factory allows users to include a schema that references a trace in a `model.json` file.
+ */
+public class TraceSchemaFactory : SchemaFactory {
+ override fun create(parentSchema: SchemaPlus, name: String, operand: Map<String, Any>): Schema {
+ val base = operand[ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName] as File?
+ val pathParam = requireNotNull(operand["path"]) { "Trace path not specified" } as String
+ val path = if (base != null) File(base, pathParam).toPath() else Paths.get(pathParam)
+
+ val format = requireNotNull(operand["format"]) { "Trace format not specified" } as String
+ val create = operand.getOrDefault("create", false) as Boolean
+
+ val trace = if (create) Trace.create(path, format) else Trace.open(path, format)
+ return TraceSchema(trace)
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
new file mode 100644
index 00000000..8c571b82
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTable.kt
@@ -0,0 +1,176 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.DataContext
+import org.apache.calcite.adapter.java.AbstractQueryableTable
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.linq4j.*
+import org.apache.calcite.plan.RelOptCluster
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.prepare.Prepare.CatalogReader
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableModify
+import org.apache.calcite.rel.logical.LogicalTableModify
+import org.apache.calcite.rel.type.RelDataType
+import org.apache.calcite.rel.type.RelDataTypeFactory
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.schema.*
+import org.apache.calcite.schema.impl.AbstractTableQueryable
+import org.apache.calcite.sql.type.SqlTypeName
+import java.time.Duration
+import java.time.Instant
+import java.util.concurrent.atomic.AtomicBoolean
+
+/**
+ * A Calcite [Table] that exposes an OpenDC [org.opendc.trace.Table] as SQL table.
+ */
+internal class TraceTable(private val table: org.opendc.trace.Table) :
+ AbstractQueryableTable(Array<Any?>::class.java),
+ ProjectableFilterableTable,
+ ModifiableTable,
+ InsertableTable {
+ private var rowType: RelDataType? = null
+
+ override fun getRowType(typeFactory: RelDataTypeFactory): RelDataType {
+ var rowType = rowType
+ if (rowType == null) {
+ rowType = deduceRowType(typeFactory as JavaTypeFactory)
+ this.rowType = rowType
+ }
+
+ return rowType
+ }
+
+ override fun scan(root: DataContext, filters: MutableList<RexNode>, projects: IntArray?): Enumerable<Array<Any?>> {
+ // Filters are currently not supported by the OpenDC trace API. By keeping the filters in the list, Calcite
+ // assumes that they are declined and will perform the filters itself.
+
+ val projection = projects?.map { table.columns[it] }
+ val cancelFlag = DataContext.Variable.CANCEL_FLAG.get<AtomicBoolean>(root)
+ return object : AbstractEnumerable<Array<Any?>>() {
+ override fun enumerator(): Enumerator<Array<Any?>> =
+ TraceReaderEnumerator(table.newReader(projection), projection ?: table.columns, cancelFlag)
+ }
+ }
+
+ override fun insert(rows: Enumerable<Array<Any?>>): Long {
+ val table = table
+ val columns = table.columns
+ val writer = table.newWriter()
+ val columnIndices = columns.map { writer.resolve(it) }.toIntArray()
+ var rowCount = 0L
+
+ try {
+ for (row in rows) {
+ writer.startRow()
+
+ for ((index, value) in row.withIndex()) {
+ if (value == null) {
+ continue
+ }
+ val columnType = columns[index].type
+
+ writer.set(
+ columnIndices[index],
+ when (columnType) {
+ Duration::class.java -> Duration.ofMillis(value as Long)
+ Instant::class.java -> Instant.ofEpochMilli(value as Long)
+ Set::class.java -> (value as List<*>).toSet()
+ else -> value
+ }
+ )
+ }
+
+ writer.endRow()
+
+ rowCount++
+ }
+ } finally {
+ writer.close()
+ }
+
+ return rowCount
+ }
+
+ override fun <T> asQueryable(queryProvider: QueryProvider, schema: SchemaPlus, tableName: String): Queryable<T> {
+ return object : AbstractTableQueryable<T>(queryProvider, schema, this@TraceTable, tableName) {
+ override fun enumerator(): Enumerator<T> {
+ val cancelFlag = AtomicBoolean(false)
+ return TraceReaderEnumerator(
+ this@TraceTable.table.newReader(),
+ this@TraceTable.table.columns,
+ cancelFlag
+ )
+ }
+
+ override fun toString(): String = "TraceTableQueryable[table=$tableName]"
+ }
+ }
+
+ override fun getModifiableCollection(): MutableCollection<Any?>? = null
+
+ override fun toModificationRel(
+ cluster: RelOptCluster,
+ table: RelOptTable,
+ catalogReader: CatalogReader,
+ child: RelNode,
+ operation: TableModify.Operation,
+ updateColumnList: MutableList<String>?,
+ sourceExpressionList: MutableList<RexNode>?,
+ flattened: Boolean
+ ): TableModify {
+ cluster.planner.addRule(TraceTableModifyRule.DEFAULT.toRule())
+
+ return LogicalTableModify.create(
+ table,
+ catalogReader,
+ child,
+ operation,
+ updateColumnList,
+ sourceExpressionList,
+ flattened
+ )
+ }
+
+ override fun toString(): String = "TraceTable"
+
+ private fun deduceRowType(typeFactory: JavaTypeFactory): RelDataType {
+ val types = mutableListOf<RelDataType>()
+ val names = mutableListOf<String>()
+
+ for (column in table.columns) {
+ names.add(column.name)
+ types.add(
+ when (column.type) {
+ Instant::class.java -> typeFactory.createSqlType(SqlTypeName.TIMESTAMP)
+ Duration::class.java -> typeFactory.createSqlType(SqlTypeName.BIGINT)
+ Set::class.java -> typeFactory.createMultisetType(typeFactory.createSqlType(SqlTypeName.UNKNOWN), -1)
+ else -> typeFactory.createType(column.type)
+ }
+ )
+ }
+
+ return typeFactory.createStructType(types, names)
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt
new file mode 100644
index 00000000..64dc0cea
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModify.kt
@@ -0,0 +1,138 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.adapter.enumerable.*
+import org.apache.calcite.adapter.enumerable.EnumerableRel.Prefer
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.linq4j.Enumerable
+import org.apache.calcite.linq4j.tree.BlockBuilder
+import org.apache.calcite.linq4j.tree.Expressions
+import org.apache.calcite.linq4j.tree.Types
+import org.apache.calcite.plan.*
+import org.apache.calcite.prepare.Prepare
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableModify
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.schema.ModifiableTable
+import org.apache.calcite.util.BuiltInMethod
+import java.lang.reflect.Method
+
+/**
+ * A [TableModify] expression that modifies a workload trace.
+ */
+internal class TraceTableModify(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ table: RelOptTable,
+ schema: Prepare.CatalogReader,
+ input: RelNode,
+ operation: Operation,
+ updateColumnList: List<String>?,
+ sourceExpressionList: List<RexNode>?,
+ flattened: Boolean
+) : TableModify(cluster, traitSet, table, schema, input, operation, updateColumnList, sourceExpressionList, flattened),
+ EnumerableRel {
+ init {
+ // Make sure the table is modifiable
+ table.unwrap(ModifiableTable::class.java) ?: throw AssertionError() // TODO: user error in validator
+ }
+
+ override fun copy(traitSet: RelTraitSet, inputs: List<RelNode>?): RelNode {
+ return TraceTableModify(
+ cluster,
+ traitSet,
+ table,
+ getCatalogReader(),
+ sole(inputs),
+ operation,
+ updateColumnList,
+ sourceExpressionList,
+ isFlattened
+ )
+ }
+
+ override fun computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery?): RelOptCost {
+ // Prefer this plan compared to the standard EnumerableTableModify.
+ return super.computeSelfCost(planner, mq)!!.multiplyBy(.1)
+ }
+
+ override fun implement(implementor: EnumerableRelImplementor, pref: Prefer): EnumerableRel.Result {
+ val builder = BlockBuilder()
+ val result = implementor.visitChild(this, 0, getInput() as EnumerableRel, pref)
+ val childExp = builder.append("child", result.block)
+ val convertedChildExpr = if (getInput().rowType != rowType) {
+ val typeFactory = cluster.typeFactory as JavaTypeFactory
+ val format = EnumerableTableScan.deduceFormat(table)
+ val physType = PhysTypeImpl.of(typeFactory, table.rowType, format)
+ val childPhysType = result.physType
+ val o = Expressions.parameter(childPhysType.javaRowType, "o")
+ val expressionList = List(childPhysType.rowType.fieldCount) { i ->
+ childPhysType.fieldReference(o, i, physType.getJavaFieldType(i))
+ }
+
+ builder.append(
+ "convertedChild",
+ Expressions.call(
+ childExp,
+ BuiltInMethod.SELECT.method,
+ Expressions.lambda<org.apache.calcite.linq4j.function.Function<*>>(physType.record(expressionList), o)
+ )
+ )
+ } else {
+ childExp
+ }
+
+ if (!isInsert) {
+ throw UnsupportedOperationException("Deletion and update not supported")
+ }
+
+ val expression = table.getExpression(InsertableTable::class.java)
+ builder.add(
+ Expressions.return_(
+ null,
+ Expressions.call(
+ BuiltInMethod.SINGLETON_ENUMERABLE.method,
+ Expressions.call(
+ Long::class.java,
+ expression,
+ INSERT_METHOD,
+ convertedChildExpr,
+ )
+ )
+ )
+ )
+
+ val rowFormat = if (pref === Prefer.ARRAY) JavaRowFormat.ARRAY else JavaRowFormat.SCALAR
+ val physType = PhysTypeImpl.of(implementor.typeFactory, getRowType(), rowFormat)
+ return implementor.result(physType, builder.toBlock())
+ }
+
+ private companion object {
+ /**
+ * Reference to [InsertableTable.insert] method.
+ */
+ val INSERT_METHOD: Method = Types.lookupMethod(InsertableTable::class.java, "insert", Enumerable::class.java)
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt
new file mode 100644
index 00000000..7572e381
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/main/kotlin/org/opendc/trace/calcite/TraceTableModifyRule.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention
+import org.apache.calcite.plan.Convention
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.TableModify
+import org.apache.calcite.rel.logical.LogicalTableModify
+import org.apache.calcite.schema.ModifiableTable
+
+/**
+ * A [ConverterRule] from a [LogicalTableModify] to a [TraceTableModify].
+ */
+internal class TraceTableModifyRule(config: Config) : ConverterRule(config) {
+ override fun convert(rel: RelNode): RelNode? {
+ val modify = rel as TableModify
+ val table = modify.table!!
+
+ // Make sure that the table is modifiable
+ if (table.unwrap(ModifiableTable::class.java) == null) {
+ return null
+ }
+
+ val traitSet = modify.traitSet.replace(EnumerableConvention.INSTANCE)
+ return TraceTableModify(
+ modify.cluster, traitSet,
+ table,
+ modify.catalogReader,
+ convert(modify.input, traitSet),
+ modify.operation,
+ modify.updateColumnList,
+ modify.sourceExpressionList,
+ modify.isFlattened
+ )
+ }
+
+ companion object {
+ /** Default configuration. */
+ val DEFAULT: Config = Config.INSTANCE
+ .withConversion(LogicalTableModify::class.java, Convention.NONE, EnumerableConvention.INSTANCE, "TraceTableModificationRule")
+ .withRuleFactory { config: Config -> TraceTableModifyRule(config) }
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt
new file mode 100644
index 00000000..d2877d7c
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/CalciteTest.kt
@@ -0,0 +1,158 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.apache.calcite.jdbc.CalciteConnection
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.opendc.trace.Trace
+import java.nio.file.Files
+import java.nio.file.Paths
+import java.sql.DriverManager
+import java.sql.ResultSet
+import java.sql.Statement
+import java.sql.Timestamp
+import java.util.*
+
+/**
+ * Smoke test for Apache Calcite integration.
+ */
+class CalciteTest {
+ /**
+ * The trace to experiment with.
+ */
+ private val trace = Trace.open(Paths.get("src/test/resources/trace"), format = "opendc-vm")
+
+ @Test
+ fun testResources() {
+ runQuery(trace, "SELECT * FROM trace.resources") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertEquals("1019", rs.getString("id")) },
+ { assertEquals(1, rs.getInt("cpu_count")) },
+ { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("start_time")) },
+ { assertEquals(181352.0, rs.getDouble("mem_capacity")) },
+ { assertTrue(rs.next()) },
+ { assertEquals("1023", rs.getString("id")) },
+ { assertTrue(rs.next()) },
+ { assertEquals("1052", rs.getString("id")) },
+ { assertTrue(rs.next()) },
+ { assertEquals("1073", rs.getString("id")) },
+ { assertFalse(rs.next()) }
+ )
+ }
+ }
+
+ @Test
+ fun testResourceStates() {
+ runQuery(trace, "SELECT * FROM trace.resource_states") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertEquals("1019", rs.getString("id")) },
+ { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("timestamp")) },
+ { assertEquals(300000, rs.getLong("duration")) },
+ { assertEquals(0.0, rs.getDouble("cpu_usage")) },
+ { assertTrue(rs.next()) },
+ { assertEquals("1019", rs.getString("id")) },
+ )
+ }
+ }
+
+ @Test
+ fun testInterferenceGroups() {
+ runQuery(trace, "SELECT * FROM trace.interference_groups") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertArrayEquals(arrayOf("1019", "1023", "1052"), rs.getArray("members").array as Array<*>) },
+ { assertEquals(0.0, rs.getDouble("target")) },
+ { assertEquals(0.8830158730158756, rs.getDouble("score")) },
+ )
+ }
+ }
+
+ @Test
+ fun testComplexQuery() {
+ runQuery(trace, "SELECT max(cpu_usage) as max_cpu_usage, avg(cpu_usage) as avg_cpu_usage FROM trace.resource_states") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertEquals(249.59993808, rs.getDouble("max_cpu_usage")) },
+ { assertEquals(5.387240309118493, rs.getDouble("avg_cpu_usage")) },
+ )
+ }
+ }
+
+ @Test
+ fun testInsert() {
+ val tmp = Files.createTempDirectory("opendc")
+ val newTrace = Trace.create(tmp, "opendc-vm")
+
+ runStatement(newTrace) { stmt ->
+ val count = stmt.executeUpdate(
+ """
+ INSERT INTO trace.resources (id, start_time, stop_time, cpu_count, cpu_capacity, mem_capacity)
+ VALUES (1234, '2013-08-12 13:35:46.0', '2013-09-11 13:39:58.0', 1, 2926.0, 1024.0)
+ """.trimIndent()
+ )
+ assertEquals(1, count)
+ }
+
+ runQuery(newTrace, "SELECT * FROM trace.resources") { rs ->
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertEquals("1234", rs.getString("id")) },
+ { assertEquals(1, rs.getInt("cpu_count")) },
+ { assertEquals(Timestamp.valueOf("2013-08-12 13:35:46.0"), rs.getTimestamp("start_time")) },
+ { assertEquals(2926.0, rs.getDouble("cpu_capacity")) },
+ { assertEquals(1024.0, rs.getDouble("mem_capacity")) }
+ )
+ }
+ }
+
+ /**
+ * Helper function to run statement for the specified trace.
+ */
+ private fun runQuery(trace: Trace, query: String, block: (ResultSet) -> Unit) {
+ runStatement(trace) { stmt ->
+ val rs = stmt.executeQuery(query)
+ rs.use { block(rs) }
+ }
+ }
+
+ /**
+ * Helper function to run statement for the specified trace.
+ */
+ private fun runStatement(trace: Trace, block: (Statement) -> Unit) {
+ val info = Properties()
+ info.setProperty("lex", "JAVA")
+ val connection = DriverManager.getConnection("jdbc:calcite:", info).unwrap(CalciteConnection::class.java)
+ connection.rootSchema.add("trace", TraceSchema(trace))
+
+ val stmt = connection.createStatement()
+ try {
+ block(stmt)
+ } finally {
+ stmt.close()
+ connection.close()
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt
new file mode 100644
index 00000000..0a552e74
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/kotlin/org/opendc/trace/calcite/TraceSchemaFactoryTest.kt
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.calcite
+
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import java.sql.DriverManager
+import java.sql.Timestamp
+import java.util.*
+
+/**
+ * Test suite for [TraceSchemaFactory].
+ */
+class TraceSchemaFactoryTest {
+ @Test
+ fun testSmoke() {
+ val info = Properties()
+ info.setProperty("lex", "JAVA")
+ val connection = DriverManager.getConnection("jdbc:calcite:model=src/test/resources/model.json", info)
+ val stmt = connection.createStatement()
+ val rs = stmt.executeQuery("SELECT * FROM trace.resources")
+ try {
+ assertAll(
+ { assertTrue(rs.next()) },
+ { assertEquals("1019", rs.getString("id")) },
+ { assertEquals(1, rs.getInt("cpu_count")) },
+ { assertEquals(Timestamp.valueOf("2013-08-12 13:40:46.0"), rs.getTimestamp("start_time")) },
+ { assertEquals(181352.0, rs.getDouble("mem_capacity")) },
+ )
+ } finally {
+ rs.close()
+ stmt.close()
+ connection.close()
+ }
+ }
+
+ @Test
+ fun testWithoutParams() {
+ assertThrows<java.lang.RuntimeException> {
+ DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory")
+ }
+ }
+
+ @Test
+ fun testWithoutPath() {
+ assertThrows<java.lang.RuntimeException> {
+ DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory; schema.format=opendc-vm")
+ }
+ }
+
+ @Test
+ fun testWithoutFormat() {
+ assertThrows<java.lang.RuntimeException> {
+ DriverManager.getConnection("jdbc:calcite:schemaFactory=org.opendc.trace.calcite.TraceSchemaFactory; schema.path=trace")
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/model.json b/opendc-trace/opendc-trace-calcite/src/test/resources/model.json
new file mode 100644
index 00000000..91e2657f
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/resources/model.json
@@ -0,0 +1,15 @@
+{
+ "version": "1.0",
+ "defaultSchema": "trace",
+ "schemas": [
+ {
+ "name": "trace",
+ "type": "custom",
+ "factory": "org.opendc.trace.calcite.TraceSchemaFactory",
+ "operand": {
+ "path": "trace",
+ "format": "opendc-vm"
+ }
+ }
+ ]
+}
diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json
new file mode 100644
index 00000000..6a0616d9
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/interference-model.json
@@ -0,0 +1,20 @@
+[
+ {
+ "vms": [
+ "1019",
+ "1023",
+ "1052"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.8830158730158756
+ },
+ {
+ "vms": [
+ "1023",
+ "1052",
+ "1073"
+ ],
+ "minServerLoad": 0.0,
+ "performanceScore": 0.7133055555552751
+ }
+]
diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet
new file mode 100644
index 00000000..d8184945
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/meta.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet
new file mode 100644
index 00000000..00ab5835
--- /dev/null
+++ b/opendc-trace/opendc-trace-calcite/src/test/resources/trace/trace.parquet
Binary files differ
diff --git a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
index 63688523..8d9eab82 100644
--- a/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-gwf/src/main/kotlin/org/opendc/trace/gwf/GwfTraceFormat.kt
@@ -70,7 +70,7 @@ public class GwfTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_TASKS -> GwfTaskTableReader(factory.createParser(path.toFile()))
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
index 9bf28ad7..411d45d0 100644
--- a/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-gwf/src/test/kotlin/org/opendc/trace/gwf/GwfTraceFormatTest.kt
@@ -58,7 +58,7 @@ internal class GwfTraceFormatTest {
@Test
fun testTableReader() {
val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI())
- val reader = format.newReader(path, TABLE_TASKS)
+ val reader = format.newReader(path, TABLE_TASKS, null)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -73,7 +73,7 @@ internal class GwfTraceFormatTest {
@Test
fun testReadingRowWithDependencies() {
val path = Paths.get(checkNotNull(GwfTraceFormatTest::class.java.getResource("/trace.gwf")).toURI())
- val reader = format.newReader(path, TABLE_TASKS)
+ val reader = format.newReader(path, TABLE_TASKS, null)
// Move to row 7
for (x in 1..6)
@@ -85,7 +85,7 @@ internal class GwfTraceFormatTest {
{ assertEquals("7", reader.get(TASK_ID)) },
{ assertEquals(Instant.ofEpochSecond(87), reader.get(TASK_SUBMIT_TIME)) },
{ assertEquals(Duration.ofSeconds(11), reader.get(TASK_RUNTIME)) },
- { assertEquals(setOf<String>("4", "5", "6"), reader.get(TASK_PARENTS)) },
+ { assertEquals(setOf("4", "5", "6"), reader.get(TASK_PARENTS)) },
)
}
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
index b82da888..7a01b881 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableReader.kt
@@ -22,38 +22,30 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.ResourceState
import org.opendc.trace.util.parquet.LocalParquetReader
-import java.time.Duration
-import java.time.Instant
/**
* A [TableReader] implementation for the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+internal class OdcVmResourceStateTableReader(private val reader: LocalParquetReader<ResourceState>) : TableReader {
/**
* The current record.
*/
- private var record: GenericRecord? = null
-
- /**
- * A flag to indicate that the columns have been initialized.
- */
- private var hasInitializedColumns = false
+ private var record: ResourceState? = null
override fun nextRow(): Boolean {
- val record = reader.read()
- this.record = record
+ try {
+ val record = reader.read()
+ this.record = record
- if (!hasInitializedColumns && record != null) {
- initColumns(record.schema)
- hasInitializedColumns = true
+ return record != null
+ } catch (e: Throwable) {
+ this.record = null
+ throw e
}
-
- return record != null
}
override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
@@ -67,36 +59,36 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_ID -> record[AVRO_COL_ID].toString()
- COL_TIMESTAMP -> Instant.ofEpochMilli(record[AVRO_COL_TIMESTAMP] as Long)
- COL_DURATION -> Duration.ofMillis(record[AVRO_COL_DURATION] as Long)
- COL_CPU_COUNT -> getInt(index)
- COL_CPU_USAGE -> getDouble(index)
- else -> throw IllegalArgumentException("Invalid column")
+ COL_ID -> record.id
+ COL_TIMESTAMP -> record.timestamp
+ COL_DURATION -> record.duration
+ COL_CPU_COUNT -> record.cpuCount
+ COL_CPU_USAGE -> record.cpuUsage
+ else -> throw IllegalArgumentException("Invalid column index $index")
}
}
override fun getBoolean(index: Int): Boolean {
- throw IllegalArgumentException("Invalid column")
+ throw IllegalArgumentException("Invalid column or type [index $index]")
}
override fun getInt(index: Int): Int {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int
- else -> throw IllegalArgumentException("Invalid column")
+ COL_CPU_COUNT -> record.cpuCount
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
override fun getLong(index: Int): Long {
- throw IllegalArgumentException("Invalid column")
+ throw IllegalArgumentException("Invalid column or type [index $index]")
}
override fun getDouble(index: Int): Double {
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_USAGE -> (record[AVRO_COL_CPU_USAGE] as Number).toDouble()
- else -> throw IllegalArgumentException("Invalid column")
+ COL_CPU_USAGE -> record.cpuUsage
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
}
}
@@ -106,28 +98,6 @@ internal class OdcVmResourceStateTableReader(private val reader: LocalParquetRea
override fun toString(): String = "OdcVmResourceStateTableReader"
- /**
- * Initialize the columns for the reader based on [schema].
- */
- private fun initColumns(schema: Schema) {
- try {
- AVRO_COL_ID = schema.getField("id").pos()
- AVRO_COL_TIMESTAMP = (schema.getField("timestamp") ?: schema.getField("time")).pos()
- AVRO_COL_DURATION = schema.getField("duration").pos()
- AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
- AVRO_COL_CPU_USAGE = (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
- } catch (e: NullPointerException) {
- // This happens when the field we are trying to access does not exist
- throw IllegalArgumentException("Invalid schema", e)
- }
- }
-
- private var AVRO_COL_ID = -1
- private var AVRO_COL_TIMESTAMP = -1
- private var AVRO_COL_DURATION = -1
- private var AVRO_COL_CPU_COUNT = -1
- private var AVRO_COL_CPU_USAGE = -1
-
private val COL_ID = 0
private val COL_TIMESTAMP = 1
private val COL_DURATION = 2
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
index 01b9750c..97af5b59 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceStateTableWriter.kt
@@ -22,84 +22,85 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
-import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.ResourceState
import java.time.Duration
import java.time.Instant
/**
* A [TableWriter] implementation for the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceStateTableWriter(
- private val writer: ParquetWriter<GenericRecord>,
- private val schema: Schema
-) : TableWriter {
+internal class OdcVmResourceStateTableWriter(private val writer: ParquetWriter<ResourceState>) : TableWriter {
/**
- * The current builder for the record that is being written.
+ * The current state for the record that is being written.
*/
- private var builder: GenericRecordBuilder? = null
-
- /**
- * The fields belonging to the resource state schema.
- */
- private val fields = schema.fields
+ private var _isActive = false
+ private var _id: String = ""
+ private var _timestamp: Instant = Instant.MIN
+ private var _duration: Duration = Duration.ZERO
+ private var _cpuCount: Int = 0
+ private var _cpuUsage: Double = Double.NaN
override fun startRow() {
- builder = GenericRecordBuilder(schema)
+ _isActive = true
+ _id = ""
+ _timestamp = Instant.MIN
+ _duration = Duration.ZERO
+ _cpuCount = 0
+ _cpuUsage = Double.NaN
}
override fun endRow() {
- val builder = checkNotNull(builder) { "No active row" }
- this.builder = null
-
- val record = builder.build()
- val id = record[COL_ID] as String
- val timestamp = record[COL_TIMESTAMP] as Long
+ check(_isActive) { "No active row" }
+ _isActive = false
- check(lastId != id || timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" }
+ check(lastId != _id || _timestamp >= lastTimestamp) { "Records need to be ordered by (id, timestamp)" }
- writer.write(builder.build())
+ writer.write(ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage))
- lastId = id
- lastTimestamp = timestamp
+ lastId = _id
+ lastTimestamp = _timestamp
}
- override fun resolve(column: TableColumn<*>): Int {
- val schema = schema
- return when (column) {
- RESOURCE_ID -> schema.getField("id").pos()
- RESOURCE_STATE_TIMESTAMP -> (schema.getField("timestamp") ?: schema.getField("time")).pos()
- RESOURCE_STATE_DURATION -> schema.getField("duration").pos()
- RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("cores")).pos()
- RESOURCE_STATE_CPU_USAGE -> (schema.getField("cpu_usage") ?: schema.getField("cpuUsage")).pos()
- else -> -1
- }
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
override fun set(index: Int, value: Any) {
- val builder = checkNotNull(builder) { "No active row" }
-
- builder.set(
- fields[index],
- when (index) {
- COL_TIMESTAMP -> (value as Instant).toEpochMilli()
- COL_DURATION -> (value as Duration).toMillis()
- else -> value
- }
- )
+ check(_isActive) { "No active row" }
+
+ when (index) {
+ COL_ID -> _id = value as String
+ COL_TIMESTAMP -> _timestamp = value as Instant
+ COL_DURATION -> _duration = value as Duration
+ COL_CPU_COUNT -> _cpuCount = value as Int
+ COL_CPU_USAGE -> _cpuUsage = value as Double
+ }
}
- override fun setBoolean(index: Int, value: Boolean) = set(index, value)
+ override fun setBoolean(index: Int, value: Boolean) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
- override fun setInt(index: Int, value: Int) = set(index, value)
+ override fun setInt(index: Int, value: Int) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_CPU_COUNT -> _cpuCount = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
- override fun setLong(index: Int, value: Long) = set(index, value)
+ override fun setLong(index: Int, value: Long) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
- override fun setDouble(index: Int, value: Double) = set(index, value)
+ override fun setDouble(index: Int, value: Double) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_CPU_USAGE -> _cpuUsage = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
override fun flush() {
// Not available
@@ -113,12 +114,19 @@ internal class OdcVmResourceStateTableWriter(
* Last column values that are used to check for correct partitioning.
*/
private var lastId: String? = null
- private var lastTimestamp: Long = Long.MIN_VALUE
-
- /**
- * Columns with special behavior.
- */
- private val COL_ID = resolve(RESOURCE_ID)
- private val COL_TIMESTAMP = resolve(RESOURCE_STATE_TIMESTAMP)
- private val COL_DURATION = resolve(RESOURCE_STATE_DURATION)
+ private var lastTimestamp: Instant = Instant.MAX
+
+ private val COL_ID = 0
+ private val COL_TIMESTAMP = 1
+ private val COL_DURATION = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_CPU_USAGE = 4
+
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_STATE_TIMESTAMP to COL_TIMESTAMP,
+ RESOURCE_STATE_DURATION to COL_DURATION,
+ RESOURCE_CPU_COUNT to COL_CPU_COUNT,
+ RESOURCE_STATE_CPU_USAGE to COL_CPU_USAGE,
+ )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
index 4909e70e..6102332f 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableReader.kt
@@ -22,37 +22,30 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.Resource
import org.opendc.trace.util.parquet.LocalParquetReader
-import java.time.Instant
/**
- * A [TableReader] implementation for the resources table in the OpenDC virtual machine trace format.
+ * A [TableReader] implementation for the "resources table" in the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<Resource>) : TableReader {
/**
* The current record.
*/
- private var record: GenericRecord? = null
-
- /**
- * A flag to indicate that the columns have been initialized.
- */
- private var hasInitializedColumns = false
+ private var record: Resource? = null
override fun nextRow(): Boolean {
- val record = reader.read()
- this.record = record
+ try {
+ val record = reader.read()
+ this.record = record
- if (!hasInitializedColumns && record != null) {
- initColumns(record.schema)
- hasInitializedColumns = true
+ return record != null
+ } catch (e: Throwable) {
+ this.record = null
+ throw e
}
-
- return record != null
}
override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
@@ -66,9 +59,9 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_ID -> record[AVRO_COL_ID].toString()
- COL_START_TIME -> Instant.ofEpochMilli(record[AVRO_COL_START_TIME] as Long)
- COL_STOP_TIME -> Instant.ofEpochMilli(record[AVRO_COL_STOP_TIME] as Long)
+ COL_ID -> record.id
+ COL_START_TIME -> record.startTime
+ COL_STOP_TIME -> record.stopTime
COL_CPU_COUNT -> getInt(index)
COL_CPU_CAPACITY -> getDouble(index)
COL_MEM_CAPACITY -> getDouble(index)
@@ -84,7 +77,7 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_COUNT -> record[AVRO_COL_CPU_COUNT] as Int
+ COL_CPU_COUNT -> record.cpuCount
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -97,8 +90,8 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_CPU_CAPACITY -> if (AVRO_COL_CPU_CAPACITY >= 0) (record[AVRO_COL_CPU_CAPACITY] as Number).toDouble() else 0.0
- COL_MEM_CAPACITY -> (record[AVRO_COL_MEM_CAPACITY] as Number).toDouble()
+ COL_CPU_CAPACITY -> record.cpuCapacity
+ COL_MEM_CAPACITY -> record.memCapacity
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -109,30 +102,6 @@ internal class OdcVmResourceTableReader(private val reader: LocalParquetReader<G
override fun toString(): String = "OdcVmResourceTableReader"
- /**
- * Initialize the columns for the reader based on [schema].
- */
- private fun initColumns(schema: Schema) {
- try {
- AVRO_COL_ID = schema.getField("id").pos()
- AVRO_COL_START_TIME = (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
- AVRO_COL_STOP_TIME = (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
- AVRO_COL_CPU_COUNT = (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
- AVRO_COL_CPU_CAPACITY = schema.getField("cpu_capacity")?.pos() ?: -1
- AVRO_COL_MEM_CAPACITY = (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
- } catch (e: NullPointerException) {
- // This happens when the field we are trying to access does not exist
- throw IllegalArgumentException("Invalid schema")
- }
- }
-
- private var AVRO_COL_ID = -1
- private var AVRO_COL_START_TIME = -1
- private var AVRO_COL_STOP_TIME = -1
- private var AVRO_COL_CPU_COUNT = -1
- private var AVRO_COL_CPU_CAPACITY = -1
- private var AVRO_COL_MEM_CAPACITY = -1
-
private val COL_ID = 0
private val COL_START_TIME = 1
private val COL_STOP_TIME = 2
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
index edc89ee6..cae65faa 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmResourceTableWriter.kt
@@ -22,74 +22,82 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
-import org.apache.avro.generic.GenericRecordBuilder
import org.apache.parquet.hadoop.ParquetWriter
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.Resource
import java.time.Instant
-import kotlin.math.roundToLong
/**
* A [TableWriter] implementation for the OpenDC virtual machine trace format.
*/
-internal class OdcVmResourceTableWriter(
- private val writer: ParquetWriter<GenericRecord>,
- private val schema: Schema
-) : TableWriter {
+internal class OdcVmResourceTableWriter(private val writer: ParquetWriter<Resource>) : TableWriter {
/**
- * The current builder for the record that is being written.
+ * The current state for the record that is being written.
*/
- private var builder: GenericRecordBuilder? = null
-
- /**
- * The fields belonging to the resource schema.
- */
- private val fields = schema.fields
+ private var _isActive = false
+ private var _id: String = ""
+ private var _startTime: Instant = Instant.MIN
+ private var _stopTime: Instant = Instant.MIN
+ private var _cpuCount: Int = 0
+ private var _cpuCapacity: Double = Double.NaN
+ private var _memCapacity: Double = Double.NaN
override fun startRow() {
- builder = GenericRecordBuilder(schema)
+ _isActive = true
+ _id = ""
+ _startTime = Instant.MIN
+ _stopTime = Instant.MIN
+ _cpuCount = 0
+ _cpuCapacity = Double.NaN
+ _memCapacity = Double.NaN
}
override fun endRow() {
- val builder = checkNotNull(builder) { "No active row" }
- this.builder = null
- writer.write(builder.build())
+ check(_isActive) { "No active row" }
+ _isActive = false
+ writer.write(Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity))
}
- override fun resolve(column: TableColumn<*>): Int {
- val schema = schema
- return when (column) {
- RESOURCE_ID -> schema.getField("id").pos()
- RESOURCE_START_TIME -> (schema.getField("start_time") ?: schema.getField("submissionTime")).pos()
- RESOURCE_STOP_TIME -> (schema.getField("stop_time") ?: schema.getField("endTime")).pos()
- RESOURCE_CPU_COUNT -> (schema.getField("cpu_count") ?: schema.getField("maxCores")).pos()
- RESOURCE_CPU_CAPACITY -> schema.getField("cpu_capacity").pos()
- RESOURCE_MEM_CAPACITY -> (schema.getField("mem_capacity") ?: schema.getField("requiredMemory")).pos()
- else -> -1
- }
- }
+ override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
override fun set(index: Int, value: Any) {
- val builder = checkNotNull(builder) { "No active row" }
- builder.set(
- fields[index],
- when (index) {
- COL_START_TIME, COL_STOP_TIME -> (value as Instant).toEpochMilli()
- COL_MEM_CAPACITY -> (value as Double).roundToLong()
- else -> value
- }
- )
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_ID -> _id = value as String
+ COL_START_TIME -> _startTime = value as Instant
+ COL_STOP_TIME -> _stopTime = value as Instant
+ COL_CPU_COUNT -> _cpuCount = value as Int
+ COL_CPU_CAPACITY -> _cpuCapacity = value as Double
+ COL_MEM_CAPACITY -> _memCapacity = value as Double
+ else -> throw IllegalArgumentException("Invalid column index $index")
+ }
}
- override fun setBoolean(index: Int, value: Boolean) = set(index, value)
+ override fun setBoolean(index: Int, value: Boolean) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
- override fun setInt(index: Int, value: Int) = set(index, value)
+ override fun setInt(index: Int, value: Int) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_CPU_COUNT -> _cpuCount = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
- override fun setLong(index: Int, value: Long) = set(index, value)
+ override fun setLong(index: Int, value: Long) {
+ throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
- override fun setDouble(index: Int, value: Double) = set(index, value)
+ override fun setDouble(index: Int, value: Double) {
+ check(_isActive) { "No active row" }
+ when (index) {
+ COL_CPU_CAPACITY -> _cpuCapacity = value
+ COL_MEM_CAPACITY -> _memCapacity = value
+ else -> throw IllegalArgumentException("Invalid column or type [index $index]")
+ }
+ }
override fun flush() {
// Not available
@@ -99,10 +107,19 @@ internal class OdcVmResourceTableWriter(
writer.close()
}
- /**
- * Columns with special behavior.
- */
- private val COL_START_TIME = resolve(RESOURCE_START_TIME)
- private val COL_STOP_TIME = resolve(RESOURCE_STOP_TIME)
- private val COL_MEM_CAPACITY = resolve(RESOURCE_MEM_CAPACITY)
+ private val COL_ID = 0
+ private val COL_START_TIME = 1
+ private val COL_STOP_TIME = 2
+ private val COL_CPU_COUNT = 3
+ private val COL_CPU_CAPACITY = 4
+ private val COL_MEM_CAPACITY = 5
+
+ private val columns = mapOf(
+ RESOURCE_ID to COL_ID,
+ RESOURCE_START_TIME to COL_START_TIME,
+ RESOURCE_STOP_TIME to COL_STOP_TIME,
+ RESOURCE_CPU_COUNT to COL_CPU_COUNT,
+ RESOURCE_CPU_CAPACITY to COL_CPU_CAPACITY,
+ RESOURCE_MEM_CAPACITY to COL_MEM_CAPACITY,
+ )
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
index 36a1b4a0..d45910c6 100644
--- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt
@@ -22,19 +22,19 @@
package org.opendc.trace.opendc
-import org.apache.avro.Schema
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericRecord
-import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.opendc.trace.*
import org.opendc.trace.conv.*
+import org.opendc.trace.opendc.parquet.ResourceReadSupport
+import org.opendc.trace.opendc.parquet.ResourceStateReadSupport
+import org.opendc.trace.opendc.parquet.ResourceStateWriteSupport
+import org.opendc.trace.opendc.parquet.ResourceWriteSupport
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
-import org.opendc.trace.util.parquet.LocalOutputFile
import org.opendc.trace.util.parquet.LocalParquetReader
-import org.opendc.trace.util.parquet.TIMESTAMP_SCHEMA
+import org.opendc.trace.util.parquet.LocalParquetWriter
import shaded.parquet.com.fasterxml.jackson.core.JsonEncoding
import shaded.parquet.com.fasterxml.jackson.core.JsonFactory
import java.nio.file.Files
@@ -102,14 +102,14 @@ public class OdcVmTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_RESOURCES -> {
- val reader = LocalParquetReader<GenericRecord>(path.resolve("meta.parquet"))
+ val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection))
OdcVmResourceTableReader(reader)
}
TABLE_RESOURCE_STATES -> {
- val reader = LocalParquetReader<GenericRecord>(path.resolve("trace.parquet"))
+ val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport(projection))
OdcVmResourceStateTableReader(reader)
}
TABLE_INTERFERENCE_GROUPS -> {
@@ -128,24 +128,24 @@ public class OdcVmTraceFormat : TraceFormat {
override fun newWriter(path: Path, table: String): TableWriter {
return when (table) {
TABLE_RESOURCES -> {
- val schema = RESOURCES_SCHEMA
- val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("meta.parquet")))
- .withSchema(schema)
+ val writer = LocalParquetWriter.builder(path.resolve("meta.parquet"), ResourceWriteSupport())
.withCompressionCodec(CompressionCodecName.ZSTD)
+ .withPageWriteChecksumEnabled(true)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()
- OdcVmResourceTableWriter(writer, schema)
+ OdcVmResourceTableWriter(writer)
}
TABLE_RESOURCE_STATES -> {
- val schema = RESOURCE_STATES_SCHEMA
- val writer = AvroParquetWriter.builder<GenericRecord>(LocalOutputFile(path.resolve("trace.parquet")))
- .withSchema(schema)
+ val writer = LocalParquetWriter.builder(path.resolve("trace.parquet"), ResourceStateWriteSupport())
.withCompressionCodec(CompressionCodecName.ZSTD)
.withDictionaryEncoding("id", true)
.withBloomFilterEnabled("id", true)
+ .withPageWriteChecksumEnabled(true)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()
- OdcVmResourceStateTableWriter(writer, schema)
+ OdcVmResourceStateTableWriter(writer)
}
TABLE_INTERFERENCE_GROUPS -> {
val generator = jsonFactory.createGenerator(path.resolve("interference-model.json").toFile(), JsonEncoding.UTF8)
@@ -154,37 +154,4 @@ public class OdcVmTraceFormat : TraceFormat {
else -> throw IllegalArgumentException("Table $table not supported")
}
}
-
- public companion object {
- /**
- * Schema for the resources table in the trace.
- */
- @JvmStatic
- public val RESOURCES_SCHEMA: Schema = SchemaBuilder
- .record("resource")
- .namespace("org.opendc.trace.opendc")
- .fields()
- .requiredString("id")
- .name("start_time").type(TIMESTAMP_SCHEMA).noDefault()
- .name("stop_time").type(TIMESTAMP_SCHEMA).noDefault()
- .requiredInt("cpu_count")
- .requiredDouble("cpu_capacity")
- .requiredLong("mem_capacity")
- .endRecord()
-
- /**
- * Schema for the resource states table in the trace.
- */
- @JvmStatic
- public val RESOURCE_STATES_SCHEMA: Schema = SchemaBuilder
- .record("resource_state")
- .namespace("org.opendc.trace.opendc")
- .fields()
- .requiredString("id")
- .name("timestamp").type(TIMESTAMP_SCHEMA).noDefault()
- .requiredLong("duration")
- .requiredInt("cpu_count")
- .requiredDouble("cpu_usage")
- .endRecord()
- }
}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt
new file mode 100644
index 00000000..c6db45b5
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/Resource.kt
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import java.time.Instant
+
+/**
+ * A description of a resource in a trace.
+ */
+internal data class Resource(
+ val id: String,
+ val startTime: Instant,
+ val stopTime: Instant,
+ val cpuCount: Int,
+ val cpuCapacity: Double,
+ val memCapacity: Double
+)
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
new file mode 100644
index 00000000..0d70446d
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.*
+import org.opendc.trace.TableColumn
+import org.opendc.trace.conv.*
+
+/**
+ * A [ReadSupport] instance for [Resource] objects.
+ */
+internal class ResourceReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Resource>() {
+ /**
+ * Mapping from field names to [TableColumn]s.
+ */
+ private val fieldMap = mapOf<String, TableColumn<*>>(
+ "id" to RESOURCE_ID,
+ "submissionTime" to RESOURCE_START_TIME,
+ "start_time" to RESOURCE_START_TIME,
+ "endTime" to RESOURCE_STOP_TIME,
+ "stop_time" to RESOURCE_STOP_TIME,
+ "maxCores" to RESOURCE_CPU_COUNT,
+ "cpu_count" to RESOURCE_CPU_COUNT,
+ "cpu_capacity" to RESOURCE_CPU_CAPACITY,
+ "requiredMemory" to RESOURCE_MEM_CAPACITY,
+ "mem_capacity" to RESOURCE_MEM_CAPACITY,
+ )
+
+ override fun init(context: InitContext): ReadContext {
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val projectionSet = projection.toSet()
+
+ for (field in READ_SCHEMA.fields) {
+ val col = fieldMap[field.name] ?: continue
+ if (col in projectionSet) {
+ addField(field)
+ }
+ }
+ }
+ .named(READ_SCHEMA.name)
+ } else {
+ READ_SCHEMA
+ }
+
+ return ReadContext(projectedSchema)
+ }
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext
+ ): RecordMaterializer<Resource> = ResourceRecordMaterializer(readContext.requestedSchema)
+
+ companion object {
+ /**
+ * Parquet read schema (version 2.0) for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_0: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("submissionTime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("endTime"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("maxCores"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("requiredMemory"),
+ )
+ .named("resource")
+
+ /**
+ * Parquet read schema (version 2.1) for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_1: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("start_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("stop_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ )
+ .named("resource")
+
+ /**
+ * Parquet read schema for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0
+ .union(READ_SCHEMA_V2_1)
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt
new file mode 100644
index 00000000..3adb0709
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceRecordMaterializer.kt
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.parquet.io.api.*
+import org.apache.parquet.schema.MessageType
+import java.time.Instant
+
+/**
+ * A [RecordMaterializer] for [Resource] records.
+ */
+internal class ResourceRecordMaterializer(schema: MessageType) : RecordMaterializer<Resource>() {
+ /**
+ * State of current record being read.
+ */
+ private var _id = ""
+ private var _startTime = Instant.MIN
+ private var _stopTime = Instant.MIN
+ private var _cpuCount = 0
+ private var _cpuCapacity = 0.0
+ private var _memCapacity = 0.0
+
+ /**
+ * Root converter for the record.
+ */
+ private val root = object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters = schema.fields.map { type ->
+ when (type.name) {
+ "id" -> object : PrimitiveConverter() {
+ override fun addBinary(value: Binary) {
+ _id = value.toStringUsingUTF8()
+ }
+ }
+ "start_time", "submissionTime" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _startTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "stop_time", "endTime" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _stopTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "cpu_count", "maxCores" -> object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ _cpuCount = value
+ }
+ }
+ "cpu_capacity" -> object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ _cpuCapacity = value
+ }
+ }
+ "mem_capacity", "requiredMemory" -> object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ _memCapacity = value
+ }
+
+ override fun addLong(value: Long) {
+ _memCapacity = value.toDouble()
+ }
+ }
+ else -> error("Unknown column $type")
+ }
+ }
+
+ override fun start() {
+ _id = ""
+ _startTime = Instant.MIN
+ _stopTime = Instant.MIN
+ _cpuCount = 0
+ _cpuCapacity = 0.0
+ _memCapacity = 0.0
+ }
+
+ override fun end() {}
+
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
+
+ override fun getCurrentRecord(): Resource = Resource(_id, _startTime, _stopTime, _cpuCount, _cpuCapacity, _memCapacity)
+
+ override fun getRootConverter(): GroupConverter = root
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt
new file mode 100644
index 00000000..9ad58764
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceState.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import java.time.Duration
+import java.time.Instant
+
+internal class ResourceState(
+ val id: String,
+ val timestamp: Instant,
+ val duration: Duration,
+ val cpuCount: Int,
+ val cpuUsage: Double
+)
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
new file mode 100644
index 00000000..97aa00b2
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.*
+import org.opendc.trace.TableColumn
+import org.opendc.trace.conv.*
+
+/**
+ * A [ReadSupport] instance for [ResourceState] objects.
+ */
+internal class ResourceStateReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<ResourceState>() {
+ /**
+ * Mapping from field names to [TableColumn]s.
+ */
+ private val fieldMap = mapOf<String, TableColumn<*>>(
+ "id" to RESOURCE_ID,
+ "time" to RESOURCE_STATE_TIMESTAMP,
+ "timestamp" to RESOURCE_STATE_TIMESTAMP,
+ "duration" to RESOURCE_STATE_DURATION,
+ "cores" to RESOURCE_CPU_COUNT,
+ "cpu_count" to RESOURCE_CPU_COUNT,
+ "cpuUsage" to RESOURCE_STATE_CPU_USAGE,
+ "cpu_usage" to RESOURCE_STATE_CPU_USAGE,
+ )
+
+ override fun init(context: InitContext): ReadContext {
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val projectionSet = projection.toSet()
+
+ for (field in READ_SCHEMA.fields) {
+ val col = fieldMap[field.name] ?: continue
+ if (col in projectionSet) {
+ addField(field)
+ }
+ }
+ }
+ .named(READ_SCHEMA.name)
+ } else {
+ READ_SCHEMA
+ }
+
+ return ReadContext(projectedSchema)
+ }
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext
+ ): RecordMaterializer<ResourceState> = ResourceStateRecordMaterializer(readContext.requestedSchema)
+
+ companion object {
+ /**
+ * Parquet read schema (version 2.0) for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_0: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cores"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpuUsage")
+ )
+ .named("resource_state")
+
+ /**
+ * Parquet read schema (version 2.1) for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA_V2_1: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_usage")
+ )
+ .named("resource_state")
+
+ /**
+ * Parquet read schema for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA: MessageType = READ_SCHEMA_V2_0.union(READ_SCHEMA_V2_1)
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt
new file mode 100644
index 00000000..f8b0c3c2
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateRecordMaterializer.kt
@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.parquet.io.api.*
+import org.apache.parquet.schema.MessageType
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A [RecordMaterializer] for [ResourceState] records.
+ */
+internal class ResourceStateRecordMaterializer(schema: MessageType) : RecordMaterializer<ResourceState>() {
+ /**
+ * State of current record being read.
+ */
+ private var _id = ""
+ private var _timestamp = Instant.MIN
+ private var _duration = Duration.ZERO
+ private var _cpuCount = 0
+ private var _cpuUsage = 0.0
+
+ /**
+ * Root converter for the record.
+ */
+ private val root = object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters = schema.fields.map { type ->
+ when (type.name) {
+ "id" -> object : PrimitiveConverter() {
+ override fun addBinary(value: Binary) {
+ _id = value.toStringUsingUTF8()
+ }
+ }
+ "timestamp", "time" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _timestamp = Instant.ofEpochMilli(value)
+ }
+ }
+ "duration" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _duration = Duration.ofMillis(value)
+ }
+ }
+ "cpu_count", "cores" -> object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ _cpuCount = value
+ }
+ }
+ "cpu_usage", "cpuUsage" -> object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ _cpuUsage = value
+ }
+ }
+ "flops" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ // Ignore to support v1 format
+ }
+ }
+ else -> error("Unknown column $type")
+ }
+ }
+
+ override fun start() {
+ _id = ""
+ _timestamp = Instant.MIN
+ _duration = Duration.ZERO
+ _cpuCount = 0
+ _cpuUsage = 0.0
+ }
+
+ override fun end() {}
+
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
+
+ override fun getCurrentRecord(): ResourceState = ResourceState(_id, _timestamp, _duration, _cpuCount, _cpuUsage)
+
+ override fun getRootConverter(): GroupConverter = root
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt
new file mode 100644
index 00000000..e2f3df31
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateWriteSupport.kt
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
+
+/**
+ * Support for writing [Resource] instances to Parquet format.
+ */
+internal class ResourceStateWriteSupport : WriteSupport<ResourceState>() {
+ /**
+ * The current active record consumer.
+ */
+ private lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(WRITE_SCHEMA, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: ResourceState) {
+ write(recordConsumer, record)
+ }
+
+ private fun write(consumer: RecordConsumer, record: ResourceState) {
+ consumer.startMessage()
+
+ consumer.startField("id", 0)
+ consumer.addBinary(Binary.fromCharSequence(record.id))
+ consumer.endField("id", 0)
+
+ consumer.startField("timestamp", 1)
+ consumer.addLong(record.timestamp.toEpochMilli())
+ consumer.endField("timestamp", 1)
+
+ consumer.startField("duration", 2)
+ consumer.addLong(record.duration.toMillis())
+ consumer.endField("duration", 2)
+
+ consumer.startField("cpu_count", 3)
+ consumer.addInteger(record.cpuCount)
+ consumer.endField("cpu_count", 3)
+
+ consumer.startField("cpu_usage", 4)
+ consumer.addDouble(record.cpuUsage)
+ consumer.endField("cpu_usage", 4)
+
+ consumer.endMessage()
+ }
+
+ companion object {
+ /**
+ * Parquet schema for the "resource states" table in the trace.
+ */
+ @JvmStatic
+ val WRITE_SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("timestamp"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("duration"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_usage")
+ )
+ .named("resource_state")
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt
new file mode 100644
index 00000000..14cadabb
--- /dev/null
+++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceWriteSupport.kt
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.opendc.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.Binary
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.*
+import kotlin.math.roundToLong
+
+/**
+ * Support for writing [Resource] instances to Parquet format.
+ */
+internal class ResourceWriteSupport : WriteSupport<Resource>() {
+ /**
+ * The current active record consumer.
+ */
+ private lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(WRITE_SCHEMA, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: Resource) {
+ write(recordConsumer, record)
+ }
+
+ private fun write(consumer: RecordConsumer, record: Resource) {
+ consumer.startMessage()
+
+ consumer.startField("id", 0)
+ consumer.addBinary(Binary.fromCharSequence(record.id))
+ consumer.endField("id", 0)
+
+ consumer.startField("start_time", 1)
+ consumer.addLong(record.startTime.toEpochMilli())
+ consumer.endField("start_time", 1)
+
+ consumer.startField("stop_time", 2)
+ consumer.addLong(record.stopTime.toEpochMilli())
+ consumer.endField("stop_time", 2)
+
+ consumer.startField("cpu_count", 3)
+ consumer.addInteger(record.cpuCount)
+ consumer.endField("cpu_count", 3)
+
+ consumer.startField("cpu_capacity", 4)
+ consumer.addDouble(record.cpuCapacity)
+ consumer.endField("cpu_capacity", 4)
+
+ consumer.startField("mem_capacity", 5)
+ consumer.addLong(record.memCapacity.roundToLong())
+ consumer.endField("mem_capacity", 5)
+
+ consumer.endMessage()
+ }
+
+ companion object {
+ /**
+ * Parquet schema for the "resources" table in the trace.
+ */
+ @JvmStatic
+ val WRITE_SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .`as`(LogicalTypeAnnotation.stringType())
+ .named("id"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("start_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("stop_time"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("cpu_count"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("cpu_capacity"),
+ Types
+ .required(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("mem_capacity"),
+ )
+ .named("resource")
+ }
+}
diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
index c8742624..1f4f6195 100644
--- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt
@@ -29,7 +29,9 @@ import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.opendc.trace.conv.*
+import java.nio.file.Files
import java.nio.file.Paths
+import java.time.Instant
/**
* Test suite for the [OdcVmTraceFormat] implementation.
@@ -61,11 +63,12 @@ internal class OdcVmTraceFormatTest {
@ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
fun testResources(name: String) {
val path = Paths.get("src/test/resources/$name")
- val reader = format.newReader(path, TABLE_RESOURCES)
+ val reader = format.newReader(path, TABLE_RESOURCES, listOf(RESOURCE_ID, RESOURCE_START_TIME))
assertAll(
{ assertTrue(reader.nextRow()) },
{ assertEquals("1019", reader.get(RESOURCE_ID)) },
+ { assertEquals(Instant.ofEpochMilli(1376314846000), reader.get(RESOURCE_START_TIME)) },
{ assertTrue(reader.nextRow()) },
{ assertEquals("1023", reader.get(RESOURCE_ID)) },
{ assertTrue(reader.nextRow()) },
@@ -78,11 +81,46 @@ internal class OdcVmTraceFormatTest {
reader.close()
}
+ @Test
+ fun testResourcesWrite() {
+ val path = Files.createTempDirectory("opendc")
+ val writer = format.newWriter(path, TABLE_RESOURCES)
+
+ writer.startRow()
+ writer.set(RESOURCE_ID, "1019")
+ writer.set(RESOURCE_START_TIME, Instant.EPOCH)
+ writer.set(RESOURCE_STOP_TIME, Instant.EPOCH)
+ writer.setInt(RESOURCE_CPU_COUNT, 1)
+ writer.setDouble(RESOURCE_CPU_CAPACITY, 1024.0)
+ writer.setDouble(RESOURCE_MEM_CAPACITY, 1024.0)
+ writer.endRow()
+ writer.close()
+
+ val reader = format.newReader(path, TABLE_RESOURCES, null)
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("1019", reader.get(RESOURCE_ID)) },
+ { assertEquals(Instant.EPOCH, reader.get(RESOURCE_START_TIME)) },
+ { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STOP_TIME)) },
+ { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
+ { assertEquals(1024.0, reader.getDouble(RESOURCE_CPU_CAPACITY)) },
+ { assertEquals(1024.0, reader.getDouble(RESOURCE_MEM_CAPACITY)) },
+ { assertFalse(reader.nextRow()) },
+ )
+
+ reader.close()
+ }
+
@ParameterizedTest
@ValueSource(strings = ["trace-v2.0", "trace-v2.1"])
fun testSmoke(name: String) {
val path = Paths.get("src/test/resources/$name")
- val reader = format.newReader(path, TABLE_RESOURCE_STATES)
+ val reader = format.newReader(
+ path,
+ TABLE_RESOURCE_STATES,
+ listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_CPU_USAGE)
+ )
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -95,9 +133,40 @@ internal class OdcVmTraceFormatTest {
}
@Test
+ fun testResourceStatesWrite() {
+ val path = Files.createTempDirectory("opendc")
+ val writer = format.newWriter(path, TABLE_RESOURCE_STATES)
+
+ writer.startRow()
+ writer.set(RESOURCE_ID, "1019")
+ writer.set(RESOURCE_STATE_TIMESTAMP, Instant.EPOCH)
+ writer.setDouble(RESOURCE_STATE_CPU_USAGE, 23.0)
+ writer.setInt(RESOURCE_CPU_COUNT, 1)
+ writer.endRow()
+ writer.close()
+
+ val reader = format.newReader(path, TABLE_RESOURCE_STATES, null)
+
+ assertAll(
+ { assertTrue(reader.nextRow()) },
+ { assertEquals("1019", reader.get(RESOURCE_ID)) },
+ { assertEquals(Instant.EPOCH, reader.get(RESOURCE_STATE_TIMESTAMP)) },
+ { assertEquals(1, reader.getInt(RESOURCE_CPU_COUNT)) },
+ { assertEquals(23.0, reader.getDouble(RESOURCE_STATE_CPU_USAGE)) },
+ { assertFalse(reader.nextRow()) },
+ )
+
+ reader.close()
+ }
+
+ @Test
fun testInterferenceGroups() {
val path = Paths.get("src/test/resources/trace-v2.1")
- val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS)
+ val reader = format.newReader(
+ path,
+ TABLE_INTERFERENCE_GROUPS,
+ listOf(INTERFERENCE_GROUP_MEMBERS, INTERFERENCE_GROUP_TARGET, INTERFERENCE_GROUP_SCORE)
+ )
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -117,7 +186,7 @@ internal class OdcVmTraceFormatTest {
@Test
fun testInterferenceGroupsEmpty() {
val path = Paths.get("src/test/resources/trace-v2.0")
- val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS)
+ val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS, listOf(INTERFERENCE_GROUP_MEMBERS))
assertFalse(reader.nextRow())
reader.close()
diff --git a/opendc-trace/opendc-trace-parquet/build.gradle.kts b/opendc-trace/opendc-trace-parquet/build.gradle.kts
index 302c0b14..e6415586 100644
--- a/opendc-trace/opendc-trace-parquet/build.gradle.kts
+++ b/opendc-trace/opendc-trace-parquet/build.gradle.kts
@@ -32,7 +32,7 @@ dependencies {
api(libs.parquet) {
exclude(group = "org.apache.hadoop")
}
- runtimeOnly(libs.hadoop.common) {
+ api(libs.hadoop.common) {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
exclude(group = "log4j")
exclude(group = "org.apache.hadoop")
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
index ef9eaeb3..eef83956 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetReader.kt
@@ -22,8 +22,8 @@
package org.opendc.trace.util.parquet
-import org.apache.parquet.avro.AvroParquetReader
import org.apache.parquet.hadoop.ParquetReader
+import org.apache.parquet.hadoop.api.ReadSupport
import org.apache.parquet.io.InputFile
import java.io.File
import java.io.IOException
@@ -32,11 +32,17 @@ import java.nio.file.Path
import kotlin.io.path.isDirectory
/**
- * A helper class to read Parquet files.
+ * A helper class to read Parquet files from the filesystem.
+ *
+ * This class wraps a [ParquetReader] in order to support reading partitioned Parquet datasets.
*
* @param path The path to the Parquet file or directory to read.
+ * @param readSupport Helper class to perform conversion from Parquet to [T].
*/
-public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
+public class LocalParquetReader<out T>(
+ path: Path,
+ private val readSupport: ReadSupport<T>
+) : AutoCloseable {
/**
* The input files to process.
*/
@@ -57,7 +63,7 @@ public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
/**
* Construct a [LocalParquetReader] for the specified [file].
*/
- public constructor(file: File) : this(file.toPath())
+ public constructor(file: File, readSupport: ReadSupport<T>) : this(file.toPath(), readSupport)
/**
* Read a single entry in the Parquet file.
@@ -93,20 +99,24 @@ public class LocalParquetReader<out T>(path: Path) : AutoCloseable {
private fun initReader() {
reader?.close()
- this.reader = if (filesIterator.hasNext()) {
- createReader(filesIterator.next())
- } else {
- null
+ try {
+ this.reader = if (filesIterator.hasNext()) {
+ createReader(filesIterator.next())
+ } else {
+ null
+ }
+ } catch (e: Throwable) {
+ this.reader = null
+ throw e
}
}
/**
- * Create a Parquet reader for the specified file.
+ * Construct a [ParquetReader] for the specified [input] with a custom [ReadSupport].
*/
private fun createReader(input: InputFile): ParquetReader<T> {
- return AvroParquetReader
- .builder<T>(input)
- .disableCompatibility()
- .build()
+ return object : ParquetReader.Builder<T>(input) {
+ override fun getReadSupport(): ReadSupport<@UnsafeVariance T> = this@LocalParquetReader.readSupport
+ }.build()
}
}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt
new file mode 100644
index 00000000..b5eb1deb
--- /dev/null
+++ b/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/LocalParquetWriter.kt
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.util.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.OutputFile
+import java.nio.file.Path
+
+/**
+ * Helper class for writing Parquet records to local disk.
+ */
+public class LocalParquetWriter {
+ /**
+ * A [ParquetWriter.Builder] implementation supporting custom [OutputFile]s and [WriteSupport] implementations.
+ */
+ public class Builder<T> internal constructor(
+ output: OutputFile,
+ private val writeSupport: WriteSupport<T>
+ ) : ParquetWriter.Builder<T, Builder<T>>(output) {
+ override fun self(): Builder<T> = this
+
+ override fun getWriteSupport(conf: Configuration): WriteSupport<T> = writeSupport
+ }
+
+ public companion object {
+ /**
+ * Create a [Builder] instance that writes a Parquet file at the specified [path].
+ */
+ @JvmStatic
+ public fun <T> builder(path: Path, writeSupport: WriteSupport<T>): Builder<T> =
+ Builder(LocalOutputFile(path), writeSupport)
+ }
+}
diff --git a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
index 8ef4d1fb..be354319 100644
--- a/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
+++ b/opendc-trace/opendc-trace-parquet/src/test/kotlin/org/opendc/trace/util/parquet/ParquetTest.kt
@@ -22,36 +22,81 @@
package org.opendc.trace.util.parquet
-import org.apache.avro.SchemaBuilder
-import org.apache.avro.generic.GenericData
-import org.apache.parquet.avro.AvroParquetReader
-import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.ParquetFileWriter
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.io.api.Converter
+import org.apache.parquet.io.api.GroupConverter
+import org.apache.parquet.io.api.PrimitiveConverter
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.PrimitiveType
+import org.apache.parquet.schema.Type
+import org.apache.parquet.schema.Types
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
-import java.io.File
import java.nio.file.FileAlreadyExistsException
+import java.nio.file.Files
import java.nio.file.NoSuchFileException
+import java.nio.file.Path
/**
* Test suite for the Parquet helper classes.
*/
internal class ParquetTest {
- private val schema = SchemaBuilder
- .record("test")
- .namespace("org.opendc.format.util")
- .fields()
- .name("field").type().intType().noDefault()
- .endRecord()
+ private lateinit var path: Path
- private lateinit var file: File
+ private val schema = Types.buildMessage()
+ .addField(
+ Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED)
+ .named("field")
+ )
+ .named("test")
+ private val writeSupport = object : WriteSupport<Int>() {
+ lateinit var recordConsumer: RecordConsumer
+
+ override fun init(configuration: Configuration): WriteContext {
+ return WriteContext(schema, emptyMap())
+ }
+
+ override fun prepareForWrite(recordConsumer: RecordConsumer) {
+ this.recordConsumer = recordConsumer
+ }
+
+ override fun write(record: Int) {
+ val consumer = recordConsumer
+
+ consumer.startMessage()
+ consumer.startField("field", 0)
+ consumer.addInteger(record)
+ consumer.endField("field", 0)
+ consumer.endMessage()
+ }
+ }
+
+ private val readSupport = object : ReadSupport<Int>() {
+ override fun init(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType
+ ): ReadContext = ReadContext(fileSchema)
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext
+ ): RecordMaterializer<Int> = TestRecordMaterializer()
+ }
/**
- * Setup the test
+ * Set up the test
*/
@BeforeEach
fun setUp() {
- file = File.createTempFile("opendc", "parquet")
+ path = Files.createTempFile("opendc", "parquet")
}
/**
@@ -59,7 +104,7 @@ internal class ParquetTest {
*/
@AfterEach
fun tearDown() {
- file.delete()
+ Files.deleteIfExists(path)
}
/**
@@ -68,29 +113,24 @@ internal class ParquetTest {
@Test
fun testSmoke() {
val n = 4
- val writer = AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(file))
- .withSchema(schema)
+ val writer = LocalParquetWriter.builder(path, writeSupport)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()
try {
repeat(n) { i ->
- val record = GenericData.Record(schema)
- record.put("field", i)
- writer.write(record)
+ writer.write(i)
}
} finally {
writer.close()
}
- val reader = AvroParquetReader.builder<GenericData.Record>(LocalInputFile(file))
- .build()
-
+ val reader = LocalParquetReader(path, readSupport)
var counter = 0
try {
while (true) {
val record = reader.read() ?: break
- assertEquals(counter++, record.get("field"))
+ assertEquals(counter++, record)
}
} finally {
reader.close()
@@ -105,9 +145,7 @@ internal class ParquetTest {
@Test
fun testOverwrite() {
assertThrows<FileAlreadyExistsException> {
- AvroParquetWriter.builder<GenericData.Record>(LocalOutputFile(file))
- .withSchema(schema)
- .build()
+ LocalParquetWriter.builder(path, writeSupport).build()
}
}
@@ -116,10 +154,30 @@ internal class ParquetTest {
*/
@Test
fun testNonExistent() {
- file.delete()
+ Files.deleteIfExists(path)
assertThrows<NoSuchFileException> {
- AvroParquetReader.builder<GenericData.Record>(LocalInputFile(file))
- .build()
+ LocalParquetReader(path, readSupport)
+ }
+ }
+
+ private class TestRecordMaterializer : RecordMaterializer<Int>() {
+ private var current: Int = 0
+ private val fieldConverter = object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ current = value
+ }
+ }
+ private val root = object : GroupConverter() {
+ override fun getConverter(fieldIndex: Int): Converter {
+ require(fieldIndex == 0)
+ return fieldConverter
+ }
+ override fun start() {}
+ override fun end() {}
}
+
+ override fun getCurrentRecord(): Int = current
+
+ override fun getRootConverter(): GroupConverter = root
}
}
diff --git a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
index b969f3ef..916a5eca 100644
--- a/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-swf/src/main/kotlin/org/opendc/trace/swf/SwfTraceFormat.kt
@@ -64,7 +64,7 @@ public class SwfTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_TASKS -> SwfTaskTableReader(path.bufferedReader())
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
index 1698f644..c3d644e8 100644
--- a/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-swf/src/test/kotlin/org/opendc/trace/swf/SwfTraceFormatTest.kt
@@ -58,7 +58,7 @@ internal class SwfTraceFormatTest {
@Test
fun testReader() {
val path = Paths.get(checkNotNull(SwfTraceFormatTest::class.java.getResource("/trace.swf")).toURI())
- val reader = format.newReader(path, TABLE_TASKS)
+ val reader = format.newReader(path, TABLE_TASKS, null)
assertAll(
{ assertTrue(reader.nextRow()) },
diff --git a/opendc-trace/opendc-trace-tools/build.gradle.kts b/opendc-trace/opendc-trace-tools/build.gradle.kts
index 0c1e179e..e98fb932 100644
--- a/opendc-trace/opendc-trace-tools/build.gradle.kts
+++ b/opendc-trace/opendc-trace-tools/build.gradle.kts
@@ -29,16 +29,22 @@ plugins {
}
application {
- mainClass.set("org.opendc.trace.tools.TraceConverter")
+ mainClass.set("org.opendc.trace.tools.TraceTools")
}
dependencies {
implementation(projects.opendcTrace.opendcTraceApi)
+ implementation(projects.opendcTrace.opendcTraceCalcite)
implementation(libs.kotlin.logging)
implementation(libs.clikt)
+ implementation(libs.jline)
runtimeOnly(projects.opendcTrace.opendcTraceOpendc)
runtimeOnly(projects.opendcTrace.opendcTraceBitbrains)
runtimeOnly(projects.opendcTrace.opendcTraceAzure)
+ runtimeOnly(projects.opendcTrace.opendcTraceGwf)
+ runtimeOnly(projects.opendcTrace.opendcTraceSwf)
+ runtimeOnly(projects.opendcTrace.opendcTraceWfformat)
+ runtimeOnly(projects.opendcTrace.opendcTraceWtf)
runtimeOnly(libs.log4j.slf4j)
}
diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt
index c71035d4..970de0f4 100644
--- a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceConverter.kt
+++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/ConvertCommand.kt
@@ -20,7 +20,6 @@
* SOFTWARE.
*/
-@file:JvmName("TraceConverter")
package org.opendc.trace.tools
import com.github.ajalt.clikt.core.CliktCommand
@@ -44,14 +43,9 @@ import kotlin.math.max
import kotlin.math.min
/**
- * A script to convert a trace in text format into a Parquet trace.
+ * A [CliktCommand] that can convert between workload trace formats.
*/
-fun main(args: Array<String>): Unit = TraceConverterCli().main(args)
-
-/**
- * Represents the command for converting traces
- */
-internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
+internal class ConvertCommand : CliktCommand(name = "convert", help = "Convert between workload trace formats") {
/**
* The logger instance for the converter.
*/
@@ -73,7 +67,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
/**
* The input format of the trace.
*/
- private val inputFormat by option("-f", "--input-format", help = "format of output trace")
+ private val inputFormat by option("-f", "--input-format", help = "format of input trace")
.required()
/**
diff --git a/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt
new file mode 100644
index 00000000..b0f95de2
--- /dev/null
+++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/QueryCommand.kt
@@ -0,0 +1,159 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.tools
+
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.parameters.arguments.argument
+import com.github.ajalt.clikt.parameters.options.option
+import com.github.ajalt.clikt.parameters.options.required
+import com.github.ajalt.clikt.parameters.types.file
+import org.apache.calcite.jdbc.CalciteConnection
+import org.jline.builtins.Styles
+import org.jline.console.Printer
+import org.jline.console.impl.DefaultPrinter
+import org.jline.terminal.Terminal
+import org.jline.terminal.TerminalBuilder
+import org.jline.utils.AttributedStringBuilder
+import org.opendc.trace.Trace
+import org.opendc.trace.calcite.TraceSchema
+import java.nio.charset.StandardCharsets
+import java.sql.DriverManager
+import java.sql.ResultSet
+import java.sql.ResultSetMetaData
+import java.util.*
+
+/**
+ * A [CliktCommand] that allows users to query workload traces using SQL.
+ */
+internal class QueryCommand : CliktCommand(name = "query", help = "Query workload traces") {
+ /**
+ * The trace to open.
+ */
+ private val input by option("-i", "--input")
+ .file(mustExist = true)
+ .required()
+
+ /**
+ * The input format of the trace.
+ */
+ private val inputFormat by option("-f", "--format", help = "format of the trace")
+ .required()
+
+ /**
+ * The query to execute.
+ */
+ private val query by argument()
+
+ /**
+ * Access to the terminal.
+ */
+ private val terminal = TerminalBuilder.builder()
+ .system(false)
+ .streams(System.`in`, System.out)
+ .encoding(StandardCharsets.UTF_8)
+ .build()
+
+ /**
+ * Helper class to print results to console.
+ */
+ private val printer = QueryPrinter(terminal)
+
+ override fun run() {
+ val inputTrace = Trace.open(input, format = inputFormat)
+ val info = Properties().apply { this["lex"] = "JAVA" }
+ val connection = DriverManager.getConnection("jdbc:calcite:", info).unwrap(CalciteConnection::class.java)
+ connection.rootSchema.add("trace", TraceSchema(inputTrace))
+ connection.schema = "trace"
+
+ val stmt = connection.createStatement()
+ stmt.executeQuery(query)
+
+ val start = System.currentTimeMillis()
+ val hasResults = stmt.execute(query)
+
+ try {
+ if (hasResults) {
+ do {
+ stmt.resultSet.use { rs ->
+ val count: Int = printResults(rs)
+ val duration = (System.currentTimeMillis() - start) / 1000.0
+ printer.println("$count rows selected (${"%.3f".format(duration)} seconds)")
+ }
+ } while (stmt.moreResults)
+ } else {
+ val count: Int = stmt.updateCount
+ val duration = (System.currentTimeMillis() - start) / 1000.0
+
+ printer.println("$count rows affected (${"%0.3f".format(duration)} seconds)")
+ }
+ } finally {
+ stmt.close()
+ connection.close()
+ }
+ }
+
+ /**
+ * Helper function to print the results to console.
+ */
+ private fun printResults(rs: ResultSet): Int {
+ var count = 0
+ val meta: ResultSetMetaData = rs.metaData
+
+ val options = mapOf(
+ Printer.COLUMNS to List(meta.columnCount) { meta.getColumnName(it + 1) },
+ Printer.BORDER to "|",
+ )
+ val data = mutableListOf<Map<String, Any>>()
+
+ while (rs.next()) {
+ val row = mutableMapOf<String, Any>()
+ for (i in 1..meta.columnCount) {
+ row[meta.getColumnName(i)] = rs.getObject(i)
+ }
+ data.add(row)
+
+ count++
+ }
+
+ printer.println(options, data)
+
+ return count
+ }
+
+ /**
+ * Helper class to print the results of the query.
+ */
+ private class QueryPrinter(private val terminal: Terminal) : DefaultPrinter(null) {
+ override fun terminal(): Terminal = terminal
+
+ override fun highlightAndPrint(options: MutableMap<String, Any>, exception: Throwable) {
+ if (options.getOrDefault("exception", "stack") == "stack") {
+ exception.printStackTrace()
+ } else {
+ val asb = AttributedStringBuilder()
+ asb.append(exception.message, Styles.prntStyle().resolve(".em"))
+ asb.toAttributedString().println(terminal())
+ }
+ }
+ }
+}
diff --git a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceTools.kt
index 086b900b..b480484b 100644
--- a/opendc-trace/opendc-trace-parquet/src/main/kotlin/org/opendc/trace/util/parquet/AvroUtils.kt
+++ b/opendc-trace/opendc-trace-tools/src/main/kotlin/org/opendc/trace/tools/TraceTools.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,25 +20,25 @@
* SOFTWARE.
*/
-@file:JvmName("AvroUtils")
-package org.opendc.trace.util.parquet
+@file:JvmName("TraceTools")
+package org.opendc.trace.tools
-import org.apache.avro.LogicalTypes
-import org.apache.avro.Schema
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.core.subcommands
/**
- * Schema for UUID type.
+ * A script for querying and manipulating workload traces supported by OpenDC.
*/
-public val UUID_SCHEMA: Schema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING))
+fun main(args: Array<String>): Unit = TraceToolsCli().main(args)
/**
- * Schema for timestamp type.
+ * The primary [CliktCommand] for the trace tools offered by OpenDC.
*/
-public val TIMESTAMP_SCHEMA: Schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))
+class TraceToolsCli : CliktCommand(name = "trace-tools") {
+ init {
+ subcommands(QueryCommand())
+ subcommands(ConvertCommand())
+ }
-/**
- * Helper function to make a [Schema] field optional.
- */
-public fun Schema.optional(): Schema {
- return Schema.createUnion(Schema.create(Schema.Type.NULL), this)
+ override fun run() {}
}
diff --git a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
index bc175b58..8db4c169 100644
--- a/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/main/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormat.kt
@@ -63,7 +63,7 @@ public class WfFormatTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_TASKS -> WfFormatTaskTableReader(factory.createParser(path.toFile()))
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
index 710de88e..4a8b2792 100644
--- a/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wfformat/src/test/kotlin/org/opendc/trace/wfformat/WfFormatTraceFormatTest.kt
@@ -62,7 +62,7 @@ class WfFormatTraceFormatTest {
@Test
fun testTableReader() {
val path = Paths.get("src/test/resources/trace.json")
- val reader = format.newReader(path, TABLE_TASKS)
+ val reader = format.newReader(path, TABLE_TASKS, null)
assertAll(
{ assertTrue(reader.nextRow()) },
@@ -89,7 +89,7 @@ class WfFormatTraceFormatTest {
@Test
fun testTableReaderFull() {
val path = Paths.get("src/test/resources/trace.json")
- val reader = format.newReader(path, TABLE_TASKS)
+ val reader = format.newReader(path, TABLE_TASKS, null)
assertDoesNotThrow {
while (reader.nextRow()) {
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
index 1e332aca..f0db78b7 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTaskTableReader.kt
@@ -22,38 +22,30 @@
package org.opendc.trace.wtf
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.conv.*
import org.opendc.trace.util.parquet.LocalParquetReader
-import java.time.Duration
-import java.time.Instant
+import org.opendc.trace.wtf.parquet.Task
/**
* A [TableReader] implementation for the WTF format.
*/
-internal class WtfTaskTableReader(private val reader: LocalParquetReader<GenericRecord>) : TableReader {
+internal class WtfTaskTableReader(private val reader: LocalParquetReader<Task>) : TableReader {
/**
* The current record.
*/
- private var record: GenericRecord? = null
-
- /**
- * A flag to indicate that the columns have been initialized.
- */
- private var hasInitializedColumns = false
+ private var record: Task? = null
override fun nextRow(): Boolean {
- val record = reader.read()
- this.record = record
+ try {
+ val record = reader.read()
+ this.record = record
- if (!hasInitializedColumns && record != null) {
- initColumns(record.schema)
- hasInitializedColumns = true
+ return record != null
+ } catch (e: Throwable) {
+ this.record = null
+ throw e
}
-
- return record != null
}
override fun resolve(column: TableColumn<*>): Int = columns[column] ?: -1
@@ -65,16 +57,15 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
override fun get(index: Int): Any? {
val record = checkNotNull(record) { "Reader in invalid state" }
- @Suppress("UNCHECKED_CAST")
return when (index) {
- COL_ID -> (record[AVRO_COL_ID] as Long).toString()
- COL_WORKFLOW_ID -> (record[AVRO_COL_WORKFLOW_ID] as Long).toString()
- COL_SUBMIT_TIME -> Instant.ofEpochMilli(record[AVRO_COL_SUBMIT_TIME] as Long)
- COL_WAIT_TIME -> Duration.ofMillis(record[AVRO_COL_WAIT_TIME] as Long)
- COL_RUNTIME -> Duration.ofMillis(record[AVRO_COL_RUNTIME] as Long)
+ COL_ID -> record.id
+ COL_WORKFLOW_ID -> record.workflowId
+ COL_SUBMIT_TIME -> record.submitTime
+ COL_WAIT_TIME -> record.waitTime
+ COL_RUNTIME -> record.runtime
COL_REQ_NCPUS, COL_GROUP_ID, COL_USER_ID -> getInt(index)
- COL_PARENTS -> (record[AVRO_COL_PARENTS] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
- COL_CHILDREN -> (record[AVRO_COL_CHILDREN] as ArrayList<GenericRecord>).map { it["item"].toString() }.toSet()
+ COL_PARENTS -> record.parents
+ COL_CHILDREN -> record.children
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -87,9 +78,9 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
val record = checkNotNull(record) { "Reader in invalid state" }
return when (index) {
- COL_REQ_NCPUS -> (record[AVRO_COL_REQ_NCPUS] as Double).toInt()
- COL_GROUP_ID -> record[AVRO_COL_GROUP_ID] as Int
- COL_USER_ID -> record[AVRO_COL_USER_ID] as Int
+ COL_REQ_NCPUS -> record.requestedCpus
+ COL_GROUP_ID -> record.groupId
+ COL_USER_ID -> record.userId
else -> throw IllegalArgumentException("Invalid column")
}
}
@@ -106,38 +97,6 @@ internal class WtfTaskTableReader(private val reader: LocalParquetReader<Generic
reader.close()
}
- /**
- * Initialize the columns for the reader based on [schema].
- */
- private fun initColumns(schema: Schema) {
- try {
- AVRO_COL_ID = schema.getField("id").pos()
- AVRO_COL_WORKFLOW_ID = schema.getField("workflow_id").pos()
- AVRO_COL_SUBMIT_TIME = schema.getField("ts_submit").pos()
- AVRO_COL_WAIT_TIME = schema.getField("wait_time").pos()
- AVRO_COL_RUNTIME = schema.getField("runtime").pos()
- AVRO_COL_REQ_NCPUS = schema.getField("resource_amount_requested").pos()
- AVRO_COL_PARENTS = schema.getField("parents").pos()
- AVRO_COL_CHILDREN = schema.getField("children").pos()
- AVRO_COL_GROUP_ID = schema.getField("group_id").pos()
- AVRO_COL_USER_ID = schema.getField("user_id").pos()
- } catch (e: NullPointerException) {
- // This happens when the field we are trying to access does not exist
- throw IllegalArgumentException("Invalid schema", e)
- }
- }
-
- private var AVRO_COL_ID = -1
- private var AVRO_COL_WORKFLOW_ID = -1
- private var AVRO_COL_SUBMIT_TIME = -1
- private var AVRO_COL_WAIT_TIME = -1
- private var AVRO_COL_RUNTIME = -1
- private var AVRO_COL_REQ_NCPUS = -1
- private var AVRO_COL_PARENTS = -1
- private var AVRO_COL_CHILDREN = -1
- private var AVRO_COL_GROUP_ID = -1
- private var AVRO_COL_USER_ID = -1
-
private val COL_ID = 0
private val COL_WORKFLOW_ID = 1
private val COL_SUBMIT_TIME = 2
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
index c8f9ecaa..e71253ac 100644
--- a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/WtfTraceFormat.kt
@@ -22,12 +22,12 @@
package org.opendc.trace.wtf
-import org.apache.avro.generic.GenericRecord
import org.opendc.trace.*
import org.opendc.trace.conv.*
import org.opendc.trace.spi.TableDetails
import org.opendc.trace.spi.TraceFormat
import org.opendc.trace.util.parquet.LocalParquetReader
+import org.opendc.trace.wtf.parquet.TaskReadSupport
import java.nio.file.Path
/**
@@ -63,10 +63,10 @@ public class WtfTraceFormat : TraceFormat {
}
}
- override fun newReader(path: Path, table: String): TableReader {
+ override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader {
return when (table) {
TABLE_TASKS -> {
- val reader = LocalParquetReader<GenericRecord>(path.resolve("tasks/schema-1.0"))
+ val reader = LocalParquetReader(path.resolve("tasks/schema-1.0"), TaskReadSupport(projection))
WtfTaskTableReader(reader)
}
else -> throw IllegalArgumentException("Table $table not supported")
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt
new file mode 100644
index 00000000..71557f96
--- /dev/null
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/Task.kt
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wtf.parquet
+
+import java.time.Duration
+import java.time.Instant
+
+/**
+ * A task in the Workflow Trace Format.
+ */
+internal data class Task(
+ val id: String,
+ val workflowId: String,
+ val submitTime: Instant,
+ val waitTime: Duration,
+ val runtime: Duration,
+ val requestedCpus: Int,
+ val groupId: Int,
+ val userId: Int,
+ val parents: Set<String>,
+ val children: Set<String>
+)
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
new file mode 100644
index 00000000..8e7325de
--- /dev/null
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskReadSupport.kt
@@ -0,0 +1,134 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wtf.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport
+import org.apache.parquet.io.api.RecordMaterializer
+import org.apache.parquet.schema.*
+import org.opendc.trace.TableColumn
+import org.opendc.trace.conv.*
+
+/**
+ * A [ReadSupport] instance for [Task] objects.
+ *
+ * @param projection The projection of the table to read.
+ */
+internal class TaskReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Task>() {
+ /**
+ * Mapping of table columns to their Parquet column names.
+ */
+ private val colMap = mapOf<TableColumn<*>, String>(
+ TASK_ID to "id",
+ TASK_WORKFLOW_ID to "workflow_id",
+ TASK_SUBMIT_TIME to "ts_submit",
+ TASK_WAIT_TIME to "wait_time",
+ TASK_RUNTIME to "runtime",
+ TASK_REQ_NCPUS to "resource_amount_requested",
+ TASK_PARENTS to "parents",
+ TASK_CHILDREN to "children",
+ TASK_GROUP_ID to "group_id",
+ TASK_USER_ID to "user_id"
+ )
+
+ override fun init(context: InitContext): ReadContext {
+ val projectedSchema =
+ if (projection != null) {
+ Types.buildMessage()
+ .apply {
+ val fieldByName = READ_SCHEMA.fields.associateBy { it.name }
+
+ for (col in projection) {
+ val fieldName = colMap[col] ?: continue
+ addField(fieldByName.getValue(fieldName))
+ }
+ }
+ .named(READ_SCHEMA.name)
+ } else {
+ READ_SCHEMA
+ }
+ return ReadContext(projectedSchema)
+ }
+
+ override fun prepareForRead(
+ configuration: Configuration,
+ keyValueMetaData: Map<String, String>,
+ fileSchema: MessageType,
+ readContext: ReadContext
+ ): RecordMaterializer<Task> = TaskRecordMaterializer(readContext.requestedSchema)
+
+ companion object {
+ /**
+ * Parquet read schema for the "tasks" table in the trace.
+ */
+ @JvmStatic
+ val READ_SCHEMA: MessageType = Types.buildMessage()
+ .addFields(
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("workflow_id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .`as`(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("ts_submit"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("wait_time"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT64)
+ .named("runtime"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.DOUBLE)
+ .named("resource_amount_requested"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("user_id"),
+ Types
+ .optional(PrimitiveType.PrimitiveTypeName.INT32)
+ .named("group_id"),
+ Types
+ .buildGroup(Type.Repetition.OPTIONAL)
+ .addField(
+ Types.repeatedGroup()
+ .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
+ .named("list")
+ )
+ .`as`(LogicalTypeAnnotation.listType())
+ .named("children"),
+ Types
+ .buildGroup(Type.Repetition.OPTIONAL)
+ .addField(
+ Types.repeatedGroup()
+ .addField(Types.optional(PrimitiveType.PrimitiveTypeName.INT64).named("item"))
+ .named("list")
+ )
+ .`as`(LogicalTypeAnnotation.listType())
+ .named("parents"),
+ )
+ .named("task")
+ }
+}
diff --git a/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt
new file mode 100644
index 00000000..08da5eaf
--- /dev/null
+++ b/opendc-trace/opendc-trace-wtf/src/main/kotlin/org/opendc/trace/wtf/parquet/TaskRecordMaterializer.kt
@@ -0,0 +1,165 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.trace.wtf.parquet
+
+import org.apache.parquet.io.api.*
+import org.apache.parquet.schema.MessageType
+import java.time.Duration
+import java.time.Instant
+import kotlin.math.roundToInt
+
+/**
+ * A [RecordMaterializer] for [Task] records.
+ */
+internal class TaskRecordMaterializer(schema: MessageType) : RecordMaterializer<Task>() {
+ /**
+ * State of current record being read.
+ */
+ private var _id = ""
+ private var _workflowId = ""
+ private var _submitTime = Instant.MIN
+ private var _waitTime = Duration.ZERO
+ private var _runtime = Duration.ZERO
+ private var _requestedCpus = 0
+ private var _groupId = 0
+ private var _userId = 0
+ private var _parents = mutableSetOf<String>()
+ private var _children = mutableSetOf<String>()
+
+ /**
+ * Root converter for the record.
+ */
+ private val root = object : GroupConverter() {
+ /**
+ * The converters for the columns of the schema.
+ */
+ private val converters = schema.fields.map { type ->
+ when (type.name) {
+ "id" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _id = value.toString()
+ }
+ }
+ "workflow_id" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _workflowId = value.toString()
+ }
+ }
+ "ts_submit" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _submitTime = Instant.ofEpochMilli(value)
+ }
+ }
+ "wait_time" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _waitTime = Duration.ofMillis(value)
+ }
+ }
+ "runtime" -> object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ _runtime = Duration.ofMillis(value)
+ }
+ }
+ "resource_amount_requested" -> object : PrimitiveConverter() {
+ override fun addDouble(value: Double) {
+ _requestedCpus = value.roundToInt()
+ }
+ }
+ "group_id" -> object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ _groupId = value
+ }
+ }
+ "user_id" -> object : PrimitiveConverter() {
+ override fun addInt(value: Int) {
+ _userId = value
+ }
+ }
+ "children" -> RelationConverter(_children)
+ "parents" -> RelationConverter(_parents)
+ else -> error("Unknown column $type")
+ }
+ }
+
+ override fun start() {
+ _id = ""
+ _workflowId = ""
+ _submitTime = Instant.MIN
+ _waitTime = Duration.ZERO
+ _runtime = Duration.ZERO
+ _requestedCpus = 0
+ _groupId = 0
+ _userId = 0
+ _parents.clear()
+ _children.clear()
+ }
+
+ override fun end() {}
+
+ override fun getConverter(fieldIndex: Int): Converter = converters[fieldIndex]
+ }
+
+ override fun getCurrentRecord(): Task = Task(
+ _id,
+ _workflowId,
+ _submitTime,
+ _waitTime,
+ _runtime,
+ _requestedCpus,
+ _groupId,
+ _userId,
+ _parents.toSet(),
+ _children.toSet()
+ )
+
+ override fun getRootConverter(): GroupConverter = root
+
+ /**
+ * Helper class to convert parent and child relations and add them to [relations].
+ */
+ private class RelationConverter(private val relations: MutableSet<String>) : GroupConverter() {
+ private val entryConverter = object : PrimitiveConverter() {
+ override fun addLong(value: Long) {
+ relations.add(value.toString())
+ }
+ }
+
+ private val listConverter = object : GroupConverter() {
+ override fun getConverter(fieldIndex: Int): Converter {
+ require(fieldIndex == 0)
+ return entryConverter
+ }
+
+ override fun start() {}
+ override fun end() {}
+ }
+
+ override fun getConverter(fieldIndex: Int): Converter {
+ require(fieldIndex == 0)
+ return listConverter
+ }
+
+ override fun start() {}
+ override fun end() {}
+ }
+}
diff --git a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
index 0f0e422d..c0eb3f08 100644
--- a/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
+++ b/opendc-trace/opendc-trace-wtf/src/test/kotlin/org/opendc/trace/wtf/WtfTraceFormatTest.kt
@@ -61,7 +61,7 @@ class WtfTraceFormatTest {
@Test
fun testTableReader() {
val path = Paths.get("src/test/resources/wtf-trace")
- val reader = format.newReader(path, TABLE_TASKS)
+ val reader = format.newReader(path, TABLE_TASKS, listOf(TASK_ID, TASK_WORKFLOW_ID, TASK_SUBMIT_TIME, TASK_RUNTIME, TASK_PARENTS))
assertAll(
{ assertTrue(reader.nextRow()) },
diff --git a/settings.gradle.kts b/settings.gradle.kts
index cc454b19..a779edcc 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -59,6 +59,7 @@ include(":opendc-trace:opendc-trace-bitbrains")
include(":opendc-trace:opendc-trace-azure")
include(":opendc-trace:opendc-trace-opendc")
include(":opendc-trace:opendc-trace-parquet")
+include(":opendc-trace:opendc-trace-calcite")
include(":opendc-trace:opendc-trace-tools")
include(":opendc-harness:opendc-harness-api")
include(":opendc-harness:opendc-harness-engine")