diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-02 14:17:55 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-02 15:37:04 +0200 |
| commit | 670cd279ea7789e07b6d778a21fdec68347ab305 (patch) | |
| tree | df91a7fd81a5d1afc51a0380fd938adf701bc43b /opendc-trace/opendc-trace-opendc/src/main | |
| parent | 9411845b3f26536a1e6ea40504e396f19d25a09a (diff) | |
feat(trace/api): Add support for projecting tables
This change adds support for projecting certain columns of a table. This
enables faster reading for tables with high number of columns.
Currently, we support projection in the Parquet-based workload formats.
Other formats are text-based and will probably not benefit much from
projection.
Diffstat (limited to 'opendc-trace/opendc-trace-opendc/src/main')
3 files changed, 77 insertions, 7 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index b455a2cf..d45910c6 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -102,14 +102,14 @@ public class OdcVmTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { return when (table) { TABLE_RESOURCES -> { - val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport()) + val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection)) OdcVmResourceTableReader(reader) } TABLE_RESOURCE_STATES -> { - val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport()) + val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport(projection)) OdcVmResourceStateTableReader(reader) } TABLE_INTERFERENCE_GROUPS -> { diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt index 47cce914..0d70446d 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt @@ -27,13 +27,49 @@ import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.* +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.* /** * A [ReadSupport] instance for [Resource] objects. */ -internal class ResourceReadSupport : ReadSupport<Resource>() { +internal class ResourceReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Resource>() { + /** + * Mapping from field names to [TableColumn]s. + */ + private val fieldMap = mapOf<String, TableColumn<*>>( + "id" to RESOURCE_ID, + "submissionTime" to RESOURCE_START_TIME, + "start_time" to RESOURCE_START_TIME, + "endTime" to RESOURCE_STOP_TIME, + "stop_time" to RESOURCE_STOP_TIME, + "maxCores" to RESOURCE_CPU_COUNT, + "cpu_count" to RESOURCE_CPU_COUNT, + "cpu_capacity" to RESOURCE_CPU_CAPACITY, + "requiredMemory" to RESOURCE_MEM_CAPACITY, + "mem_capacity" to RESOURCE_MEM_CAPACITY, + ) + override fun init(context: InitContext): ReadContext { - return ReadContext(READ_SCHEMA) + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val projectionSet = projection.toSet() + + for (field in READ_SCHEMA.fields) { + val col = fieldMap[field.name] ?: continue + if (col in projectionSet) { + addField(field) + } + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + + return ReadContext(projectedSchema) } override fun prepareForRead( diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt index 17840ceb..97aa00b2 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt @@ -27,13 +27,47 @@ import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.* +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.* /** * A [ReadSupport] instance for [ResourceState] objects. */ -internal class ResourceStateReadSupport : ReadSupport<ResourceState>() { +internal class ResourceStateReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<ResourceState>() { + /** + * Mapping from field names to [TableColumn]s. + */ + private val fieldMap = mapOf<String, TableColumn<*>>( + "id" to RESOURCE_ID, + "time" to RESOURCE_STATE_TIMESTAMP, + "timestamp" to RESOURCE_STATE_TIMESTAMP, + "duration" to RESOURCE_STATE_DURATION, + "cores" to RESOURCE_CPU_COUNT, + "cpu_count" to RESOURCE_CPU_COUNT, + "cpuUsage" to RESOURCE_STATE_CPU_USAGE, + "cpu_usage" to RESOURCE_STATE_CPU_USAGE, + ) + override fun init(context: InitContext): ReadContext { - return ReadContext(READ_SCHEMA) + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val projectionSet = projection.toSet() + + for (field in READ_SCHEMA.fields) { + val col = fieldMap[field.name] ?: continue + if (col in projectionSet) { + addField(field) + } + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + + return ReadContext(projectedSchema) } override fun prepareForRead( |
