diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-02 14:17:55 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-05-02 15:37:04 +0200 |
| commit | 670cd279ea7789e07b6d778a21fdec68347ab305 (patch) | |
| tree | df91a7fd81a5d1afc51a0380fd938adf701bc43b /opendc-trace/opendc-trace-opendc | |
| parent | 9411845b3f26536a1e6ea40504e396f19d25a09a (diff) | |
feat(trace/api): Add support for projecting tables
This change adds support for projecting certain columns of a table. This
enables faster reading for tables with high number of columns.
Currently, we support projection in the Parquet-based workload formats.
Other formats are text-based and will probably not benefit much from
projection.
Diffstat (limited to 'opendc-trace/opendc-trace-opendc')
4 files changed, 92 insertions, 13 deletions
diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt index b455a2cf..d45910c6 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/OdcVmTraceFormat.kt @@ -102,14 +102,14 @@ public class OdcVmTraceFormat : TraceFormat { } } - override fun newReader(path: Path, table: String): TableReader { + override fun newReader(path: Path, table: String, projection: List<TableColumn<*>>?): TableReader { return when (table) { TABLE_RESOURCES -> { - val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport()) + val reader = LocalParquetReader(path.resolve("meta.parquet"), ResourceReadSupport(projection)) OdcVmResourceTableReader(reader) } TABLE_RESOURCE_STATES -> { - val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport()) + val reader = LocalParquetReader(path.resolve("trace.parquet"), ResourceStateReadSupport(projection)) OdcVmResourceStateTableReader(reader) } TABLE_INTERFERENCE_GROUPS -> { diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt index 47cce914..0d70446d 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceReadSupport.kt @@ -27,13 +27,49 @@ import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.* +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.* /** * A [ReadSupport] instance for [Resource] objects. */ -internal class ResourceReadSupport : ReadSupport<Resource>() { +internal class ResourceReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<Resource>() { + /** + * Mapping from field names to [TableColumn]s. + */ + private val fieldMap = mapOf<String, TableColumn<*>>( + "id" to RESOURCE_ID, + "submissionTime" to RESOURCE_START_TIME, + "start_time" to RESOURCE_START_TIME, + "endTime" to RESOURCE_STOP_TIME, + "stop_time" to RESOURCE_STOP_TIME, + "maxCores" to RESOURCE_CPU_COUNT, + "cpu_count" to RESOURCE_CPU_COUNT, + "cpu_capacity" to RESOURCE_CPU_CAPACITY, + "requiredMemory" to RESOURCE_MEM_CAPACITY, + "mem_capacity" to RESOURCE_MEM_CAPACITY, + ) + override fun init(context: InitContext): ReadContext { - return ReadContext(READ_SCHEMA) + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val projectionSet = projection.toSet() + + for (field in READ_SCHEMA.fields) { + val col = fieldMap[field.name] ?: continue + if (col in projectionSet) { + addField(field) + } + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + + return ReadContext(projectedSchema) } override fun prepareForRead( diff --git a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt index 17840ceb..97aa00b2 100644 --- a/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt +++ b/opendc-trace/opendc-trace-opendc/src/main/kotlin/org/opendc/trace/opendc/parquet/ResourceStateReadSupport.kt @@ -27,13 +27,47 @@ import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema.* +import org.opendc.trace.TableColumn +import org.opendc.trace.conv.* /** * A [ReadSupport] instance for [ResourceState] objects. */ -internal class ResourceStateReadSupport : ReadSupport<ResourceState>() { +internal class ResourceStateReadSupport(private val projection: List<TableColumn<*>>?) : ReadSupport<ResourceState>() { + /** + * Mapping from field names to [TableColumn]s. + */ + private val fieldMap = mapOf<String, TableColumn<*>>( + "id" to RESOURCE_ID, + "time" to RESOURCE_STATE_TIMESTAMP, + "timestamp" to RESOURCE_STATE_TIMESTAMP, + "duration" to RESOURCE_STATE_DURATION, + "cores" to RESOURCE_CPU_COUNT, + "cpu_count" to RESOURCE_CPU_COUNT, + "cpuUsage" to RESOURCE_STATE_CPU_USAGE, + "cpu_usage" to RESOURCE_STATE_CPU_USAGE, + ) + override fun init(context: InitContext): ReadContext { - return ReadContext(READ_SCHEMA) + val projectedSchema = + if (projection != null) { + Types.buildMessage() + .apply { + val projectionSet = projection.toSet() + + for (field in READ_SCHEMA.fields) { + val col = fieldMap[field.name] ?: continue + if (col in projectionSet) { + addField(field) + } + } + } + .named(READ_SCHEMA.name) + } else { + READ_SCHEMA + } + + return ReadContext(projectedSchema) } override fun prepareForRead( diff --git a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt index dec0fef9..1f4f6195 100644 --- a/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt +++ b/opendc-trace/opendc-trace-opendc/src/test/kotlin/org/opendc/trace/opendc/OdcVmTraceFormatTest.kt @@ -63,11 +63,12 @@ internal class OdcVmTraceFormatTest { @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testResources(name: String) { val path = Paths.get("src/test/resources/$name") - val reader = format.newReader(path, TABLE_RESOURCES) + val reader = format.newReader(path, TABLE_RESOURCES, listOf(RESOURCE_ID, RESOURCE_START_TIME)) assertAll( { assertTrue(reader.nextRow()) }, { assertEquals("1019", reader.get(RESOURCE_ID)) }, + { assertEquals(Instant.ofEpochMilli(1376314846000), reader.get(RESOURCE_START_TIME)) }, { assertTrue(reader.nextRow()) }, { assertEquals("1023", reader.get(RESOURCE_ID)) }, { assertTrue(reader.nextRow()) }, @@ -95,7 +96,7 @@ internal class OdcVmTraceFormatTest { writer.endRow() writer.close() - val reader = format.newReader(path, TABLE_RESOURCES) + val reader = format.newReader(path, TABLE_RESOURCES, null) assertAll( { assertTrue(reader.nextRow()) }, @@ -115,7 +116,11 @@ internal class OdcVmTraceFormatTest { @ValueSource(strings = ["trace-v2.0", "trace-v2.1"]) fun testSmoke(name: String) { val path = Paths.get("src/test/resources/$name") - val reader = format.newReader(path, TABLE_RESOURCE_STATES) + val reader = format.newReader( + path, + TABLE_RESOURCE_STATES, + listOf(RESOURCE_ID, RESOURCE_STATE_TIMESTAMP, RESOURCE_STATE_CPU_USAGE) + ) assertAll( { assertTrue(reader.nextRow()) }, @@ -140,7 +145,7 @@ internal class OdcVmTraceFormatTest { writer.endRow() writer.close() - val reader = format.newReader(path, TABLE_RESOURCE_STATES) + val reader = format.newReader(path, TABLE_RESOURCE_STATES, null) assertAll( { assertTrue(reader.nextRow()) }, @@ -157,7 +162,11 @@ internal class OdcVmTraceFormatTest { @Test fun testInterferenceGroups() { val path = Paths.get("src/test/resources/trace-v2.1") - val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS) + val reader = format.newReader( + path, + TABLE_INTERFERENCE_GROUPS, + listOf(INTERFERENCE_GROUP_MEMBERS, INTERFERENCE_GROUP_TARGET, INTERFERENCE_GROUP_SCORE) + ) assertAll( { assertTrue(reader.nextRow()) }, @@ -177,7 +186,7 @@ internal class OdcVmTraceFormatTest { @Test fun testInterferenceGroupsEmpty() { val path = Paths.get("src/test/resources/trace-v2.0") - val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS) + val reader = format.newReader(path, TABLE_INTERFERENCE_GROUPS, listOf(INTERFERENCE_GROUP_MEMBERS)) assertFalse(reader.nextRow()) reader.close() |
